You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/09/17 09:35:01 UTC

[3/3] git commit: CAMEL-6584: camel-splunk component. Thanks to Preben Asmussen for the contribution.

CAMEL-6584: camel-splunk component. Thanks to Preben Asmussen for the contribution.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e47197b9
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e47197b9
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e47197b9

Branch: refs/heads/master
Commit: e47197b96781e1a919fff4c2024d80f67fe1ea08
Parents: 92681f7
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Sep 17 09:05:44 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Sep 17 09:34:51 2013 +0200

----------------------------------------------------------------------
 components/camel-splunk/pom.xml                 |  94 ++++
 .../camel/component/splunk/ConsumerType.java    |  31 ++
 .../DefaultSplunkConfigurationFactory.java      |  39 ++
 .../camel/component/splunk/ProducerType.java    |  30 +
 .../camel/component/splunk/SplunkComponent.java |  45 ++
 .../component/splunk/SplunkConfiguration.java   | 260 +++++++++
 .../splunk/SplunkConfigurationFactory.java      |  25 +
 .../camel/component/splunk/SplunkConsumer.java  | 106 ++++
 .../camel/component/splunk/SplunkEndpoint.java  | 114 ++++
 .../camel/component/splunk/SplunkProducer.java  | 113 ++++
 .../component/splunk/event/SplunkEvent.java     | 545 +++++++++++++++++++
 .../component/splunk/support/DataWriter.java    |  27 +
 .../splunk/support/SplunkDataReader.java        | 326 +++++++++++
 .../splunk/support/SplunkDataWriter.java        |  81 +++
 .../splunk/support/StreamDataWriter.java        |  61 +++
 .../splunk/support/SubmitDataWriter.java        |  59 ++
 .../component/splunk/support/TcpDataWriter.java |  50 ++
 .../src/main/resources/META-INF/LICENSE.txt     | 203 +++++++
 .../src/main/resources/META-INF/NOTICE.txt      |  11 +
 .../services/org/apache/camel/component/splunk  |  17 +
 .../camel/component/splunk/ConsumerTest.java    |  68 +++
 .../apache/camel/component/splunk/Helper.java   |  86 +++
 .../camel/component/splunk/ProducerTest.java    |  94 ++++
 .../SplunkComponentConfigurationTest.java       |  88 +++
 .../component/splunk/SplunkMockTestSupport.java |  52 ++
 .../splunk/integration/NormalSearchTest.java    |  58 ++
 .../splunk/integration/RealtimeSearchTest.java  |  56 ++
 .../splunk/integration/SavedSearchTest.java     |  59 ++
 .../splunk/integration/SplunkProducerTest.java  |  82 +++
 .../splunk/integration/SplunkTest.java          |  42 ++
 .../integration/TestFieldListSearchTest.java    |  55 ++
 .../src/test/resources/log4j.properties         |  36 ++
 .../test/resources/resultsreader_test_data.json |  40 ++
 components/pom.xml                              |   1 +
 34 files changed, 3054 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-splunk/pom.xml b/components/camel-splunk/pom.xml
new file mode 100644
index 0000000..dc8566a
--- /dev/null
+++ b/components/camel-splunk/pom.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>components</artifactId>
+    <version>2.13-SNAPSHOT</version>
+  </parent>
+
+  <name>Camel :: Splunk</name>
+  <artifactId>camel-splunk</artifactId>
+  <description>Camel :: Splunk component</description>
+  <packaging>bundle</packaging>
+
+  <properties>
+    <camel.osgi.export.pkg>org.apache.camel.component.splunk.*</camel.osgi.export.pkg>
+    <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=splunk</camel.osgi.export.service>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-core</artifactId>
+    </dependency>
+
+    <!-- TODO: use SMX bundle version when its released -->
+    <dependency>
+      <groupId>com.splunk</groupId>
+      <artifactId>splunk</artifactId>
+      <version>1.1.0</version>
+    </dependency>
+
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <version>${jodatime2-bundle-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+      <version>${gson-version}</version>
+    </dependency>
+
+    <!-- testing -->
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-core</artifactId>
+      <version>${hamcrest-version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <!-- TODO: remove me when we use SMX bundle version for splunk -->
+  <repositories>
+    <repository>
+      <id>ext-release-local</id>
+      <url>http://splunk.artifactoryonline.com/splunk/ext-releases-local</url>
+    </repository>
+  </repositories>
+
+</project>

http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/ConsumerType.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/ConsumerType.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/ConsumerType.java
new file mode 100644
index 0000000..fe5f10b
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/ConsumerType.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk;
+
+public enum ConsumerType {
+    NORMAL, REALTIME, SAVEDSEARCH, UNKNOWN;
+
+    public static ConsumerType fromUri(String uri) {
+        for (ConsumerType consumerType : ConsumerType.values()) {
+            if (consumerType.name().equalsIgnoreCase(uri)) {
+                return consumerType;
+            }
+        }
+        return ConsumerType.UNKNOWN;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/DefaultSplunkConfigurationFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/DefaultSplunkConfigurationFactory.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/DefaultSplunkConfigurationFactory.java
new file mode 100644
index 0000000..867b76d
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/DefaultSplunkConfigurationFactory.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk;
+
+import java.util.Map;
+
+import org.apache.camel.util.ObjectHelper;
+
+public class DefaultSplunkConfigurationFactory implements SplunkConfigurationFactory {
+
+    public SplunkConfiguration parseMap(Map<String, Object> parameters) {
+        String host = (String) parameters.get("host");
+        String port = (String) parameters.get("port");
+        String username = (String) parameters.get("username");
+        String password = (String) parameters.get("password");
+        if (ObjectHelper.isEmpty(username) || ObjectHelper.isEmpty(password)) {
+            throw new IllegalArgumentException("Username and password has to be specified");
+        }
+        if (ObjectHelper.isNotEmpty(host) && ObjectHelper.isNotEmpty(port)) {
+            return new SplunkConfiguration(host, Integer.valueOf(port), username, password);
+        }
+        return new SplunkConfiguration(username, password);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/ProducerType.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/ProducerType.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/ProducerType.java
new file mode 100644
index 0000000..e73def3
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/ProducerType.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk;
+
+public enum ProducerType {
+    TCP, SUBMIT, STREAM, UNKNOWN;
+
+    public static ProducerType fromUri(String uri) {
+        for (ProducerType producerType : ProducerType.values()) {
+            if (producerType.name().equalsIgnoreCase(uri)) {
+                return producerType;
+            }
+        }
+        return ProducerType.UNKNOWN;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkComponent.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkComponent.java
new file mode 100644
index 0000000..7f346e2
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkComponent.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk;
+
+import java.util.Map;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+
+/**
+ * Represents the component that manages {@link SplunkEndpoint}.
+ */
+public class SplunkComponent extends DefaultComponent {
+
+    private SplunkConfigurationFactory splunkConfigurationFactory = new DefaultSplunkConfigurationFactory();
+
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        SplunkConfiguration configuration = splunkConfigurationFactory.parseMap(parameters);
+        setProperties(configuration, parameters);
+        return new SplunkEndpoint(uri, this, configuration);
+    }
+
+    public SplunkConfigurationFactory getSplunkConfigurationFactory() {
+        return splunkConfigurationFactory;
+    }
+
+    public void setSplunkConfigurationFactory(DefaultSplunkConfigurationFactory splunkConfigurationFactory) {
+        this.splunkConfigurationFactory = splunkConfigurationFactory;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java
new file mode 100644
index 0000000..20de08c
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import com.splunk.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SplunkConfiguration {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SplunkConfiguration.class);
+
+    private String host = Service.DEFAULT_HOST;
+    private int port = Service.DEFAULT_PORT;
+    private String scheme = Service.DEFAULT_SCHEME;
+    private String app;
+    private String owner;
+    private String username;
+    private String password;
+    private int connectionTimeout = 5000;
+    private String index;
+    private String sourceType;
+    private String source;
+    private int tcpReceiverPort;
+
+    // consumer properties
+    private int count;
+    private String fieldList;
+    private String search;
+    private String savedSearch;
+    private String earliestTime;
+    private String latestTime;
+    private String initEarliestTime;
+
+    public SplunkConfiguration(final String host, final int port, final String username, final String password) {
+        this.host = host;
+        this.port = port;
+        this.username = username;
+        this.password = password;
+    }
+
+    public SplunkConfiguration(final String username, final String password) {
+        this(Service.DEFAULT_HOST, Service.DEFAULT_PORT, username, password);
+    }
+
+    public String getInitEarliestTime() {
+        return initEarliestTime;
+    }
+
+    public void setInitEarliestTime(String initEarliestTime) {
+        this.initEarliestTime = initEarliestTime;
+    }
+
+    public int getCount() {
+        return count;
+    }
+
+    public void setCount(int count) {
+        this.count = count;
+    }
+
+    public String getFieldList() {
+        return fieldList;
+    }
+
+    public void setFieldList(String fieldList) {
+        this.fieldList = fieldList;
+    }
+
+    public String getSearch() {
+        return search;
+    }
+
+    public void setSearch(String search) {
+        this.search = search;
+    }
+
+    public String getEarliestTime() {
+        return earliestTime;
+    }
+
+    public void setEarliestTime(String earliestTime) {
+        this.earliestTime = earliestTime;
+    }
+
+    public String getLatestTime() {
+        return latestTime;
+    }
+
+    public void setLatestTime(String latestTime) {
+        this.latestTime = latestTime;
+    }
+
+    public int getTcpReceiverPort() {
+        return tcpReceiverPort;
+    }
+
+    public void setTcpReceiverPort(int tcpReceiverPort) {
+        this.tcpReceiverPort = tcpReceiverPort;
+    }
+
+    public String getSourceType() {
+        return sourceType;
+    }
+
+    public void setSourceType(String sourceType) {
+        this.sourceType = sourceType;
+    }
+
+    public String getSource() {
+        return source;
+    }
+
+    public void setSource(String source) {
+        this.source = source;
+    }
+
+    public void setIndex(String index) {
+        this.index = index;
+    }
+
+    public String getIndex() {
+        return index;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    public String getScheme() {
+        return scheme;
+    }
+
+    public void setScheme(String scheme) {
+        this.scheme = scheme;
+    }
+
+    public String getApp() {
+        return app;
+    }
+
+    public void setApp(String app) {
+        this.app = app;
+    }
+
+    public String getOwner() {
+        return owner;
+    }
+
+    public void setOwner(String owner) {
+        this.owner = owner;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public int getConnectionTimeout() {
+        return connectionTimeout;
+    }
+
+    public void setConnectionTimeout(int timeout) {
+        this.connectionTimeout = timeout;
+    }
+
+    public String getSavedSearch() {
+        return this.savedSearch;
+    }
+
+    public void setSavedSearch(String savedSearch) {
+        this.savedSearch = savedSearch;
+    }
+
+    public Service createService() {
+        final Map<String, Object> args = new HashMap<String, Object>();
+        if (host != null) {
+            args.put("host", host);
+        }
+        if (port > 0) {
+            args.put("port", port);
+        }
+        if (scheme != null) {
+            args.put("scheme", scheme);
+        }
+        if (app != null) {
+            args.put("app", app);
+        }
+        if (owner != null) {
+            args.put("owner", owner);
+        }
+
+        args.put("username", username);
+        args.put("password", password);
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+
+        Future<Service> future = executor.submit(new Callable<Service>() {
+            public Service call() throws Exception {
+                return Service.connect(args);
+            }
+        });
+        try {
+            Service service;
+            if (connectionTimeout > 0) {
+                service = future.get(connectionTimeout, TimeUnit.MILLISECONDS);
+            } else {
+                service = future.get();
+            }
+            LOG.info("Successfully connected to Splunk");
+            return service;
+        } catch (Exception e) {
+            throw new RuntimeException(String.format("could not connect to Splunk Server @ %s:%d - %s", host, port, e.getMessage()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfigurationFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfigurationFactory.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfigurationFactory.java
new file mode 100644
index 0000000..d92d854
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfigurationFactory.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk;
+
+import java.util.Map;
+
+public interface SplunkConfigurationFactory {
+
+    SplunkConfiguration parseMap(Map<String, Object> parameters);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
new file mode 100644
index 0000000..c3a9ce7
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.splunk.event.SplunkEvent;
+import org.apache.camel.component.splunk.support.SplunkDataReader;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Splunk consumer.
+ */
+public class SplunkConsumer extends ScheduledBatchPollingConsumer {
+    private static final Logger LOG = LoggerFactory.getLogger(SplunkConsumer.class);
+    private SplunkDataReader dataReader;
+    private SplunkEndpoint endpoint;
+
+    public SplunkConsumer(SplunkEndpoint endpoint, Processor processor, ConsumerType consumerType) {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+        if (consumerType.equals(ConsumerType.NORMAL) || consumerType.equals(ConsumerType.REALTIME)) {
+            if (ObjectHelper.isEmpty(endpoint.getConfiguration().getSearch())) {
+                throw new RuntimeException("Missing option 'search' with normal or realtime search");
+            }
+        }
+        if (consumerType.equals(ConsumerType.SAVEDSEARCH) && ObjectHelper.isEmpty(endpoint.getConfiguration().getSavedSearch())) {
+            throw new RuntimeException("Missing option 'savedSearch' with saved search");
+        }
+        dataReader = new SplunkDataReader(endpoint, consumerType);
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        try {
+            List<SplunkEvent> events = dataReader.read();
+            Queue<Exchange> exchanges = createExchanges(events);
+            return processBatch(CastUtils.cast(exchanges));
+        } catch (Exception e) {
+            if (endpoint.reconnectIfPossible(e)) {
+                return 0;
+            } else {
+                getExceptionHandler().handleException(e);
+                return 0;
+            }
+        }
+    }
+
+    protected Queue<Exchange> createExchanges(List<SplunkEvent> splunkEvents) {
+        LOG.trace("Received {} messages in this poll", splunkEvents.size());
+        Queue<Exchange> answer = new LinkedList<Exchange>();
+        for (SplunkEvent splunkEvent : splunkEvents) {
+            Exchange exchange = getEndpoint().createExchange();
+            Message message = exchange.getIn();
+            message.setBody(splunkEvent);
+            answer.add(exchange);
+        }
+        return answer;
+    }
+
+    @Override
+    public int processBatch(Queue<Object> exchanges) throws Exception {
+        int total = exchanges.size();
+
+        for (int index = 0; index < total && isBatchAllowed(); index++) {
+            Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+            exchange.setProperty(Exchange.BATCH_INDEX, index);
+            exchange.setProperty(Exchange.BATCH_SIZE, total);
+            exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
+            try {
+                LOG.trace("Processing exchange [{}]...", exchange);
+                getProcessor().process(exchange);
+            } catch (Exception e) {
+                exchange.setException(e);
+            }
+            if (exchange.getException() != null) {
+                getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
+            }
+        }
+        return total;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkEndpoint.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkEndpoint.java
new file mode 100644
index 0000000..6e0d571
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkEndpoint.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk;
+
+import java.net.SocketException;
+import java.util.regex.Pattern;
+import javax.net.ssl.SSLException;
+
+import com.splunk.HttpException;
+import com.splunk.Service;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.ScheduledPollEndpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents a Splunk endpoint.
+ */
+public class SplunkEndpoint extends ScheduledPollEndpoint {
+    private static final Logger LOG = LoggerFactory.getLogger(SplunkEndpoint.class);
+
+    private SplunkConfiguration configuration;
+    private Service service;
+
+    public SplunkEndpoint() {
+    }
+
+    public SplunkEndpoint(String uri, SplunkComponent component, SplunkConfiguration configuration) {
+        super(uri, component);
+        this.configuration = configuration;
+    }
+
+    public Producer createProducer() throws Exception {
+        String[] uriSplit = splitUri(getEndpointUri());
+        if (uriSplit.length > 0) {
+            ProducerType producerType = ProducerType.fromUri(uriSplit[0]);
+            return new SplunkProducer(this, producerType);
+        }
+        throw new IllegalArgumentException("Cannot create any producer with uri " + getEndpointUri() + ". A producer type was not provided (or an incorrect pairing was used).");
+    }
+
+    public Consumer createConsumer(Processor processor) throws Exception {
+        if (configuration.getInitEarliestTime() == null) {
+            throw new IllegalArgumentException("Required initialEarliestTime option could not be found");
+        }
+        String[] uriSplit = splitUri(getEndpointUri());
+        if (uriSplit.length > 0) {
+            ConsumerType consumerType = ConsumerType.fromUri(uriSplit[0]);
+            SplunkConsumer consumer = new SplunkConsumer(this, processor, consumerType);
+            configureConsumer(consumer);
+            return consumer;
+        }
+        throw new IllegalArgumentException("Cannot create any consumer with uri " + getEndpointUri() + ". A consumer type was not provided (or an incorrect pairing was used).");
+    }
+
+    public boolean isSingleton() {
+        return true;
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        service = null;
+        super.doStop();
+    }
+
+    public Service getService() {
+        if (service == null) {
+            this.service = configuration.createService();
+        }
+        return service;
+    }
+
+    private static String[] splitUri(String uri) {
+        Pattern p1 = Pattern.compile("splunk:(//)*");
+        Pattern p2 = Pattern.compile("\\?.*");
+
+        uri = p1.matcher(uri).replaceAll("");
+        uri = p2.matcher(uri).replaceAll("");
+
+        return uri.split("/");
+    }
+
+    public SplunkConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public synchronized boolean reconnectIfPossible(Exception e) {
+        boolean answer = false;
+        if (e instanceof HttpException && ((HttpException) e).getStatus() == 401 || ((e instanceof SocketException) || (e instanceof SSLException))) {
+            // try and reconnect
+            LOG.warn("Got exception from Splunk. Will try to reconnect");
+            this.service = null;
+            getService();
+            answer = true;
+        }
+        return answer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkProducer.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkProducer.java
new file mode 100644
index 0000000..bfea70c
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkProducer.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk;
+
+import com.splunk.Args;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.splunk.event.SplunkEvent;
+import org.apache.camel.component.splunk.support.DataWriter;
+import org.apache.camel.component.splunk.support.StreamDataWriter;
+import org.apache.camel.component.splunk.support.SubmitDataWriter;
+import org.apache.camel.component.splunk.support.TcpDataWriter;
+import org.apache.camel.impl.DefaultProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Splunk producer.
+ */
+public class SplunkProducer extends DefaultProducer {
+    private static final transient Logger LOG = LoggerFactory.getLogger(SplunkProducer.class);
+    private SplunkEndpoint endpoint;
+    private DataWriter dataWriter;
+
+    public SplunkProducer(SplunkEndpoint endpoint, ProducerType producerType) {
+        super(endpoint);
+        this.endpoint = endpoint;
+        createWriter(producerType);
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        try {
+            dataWriter.write(exchange.getIn().getMandatoryBody(SplunkEvent.class));
+        } catch (Exception e) {
+            if (endpoint.reconnectIfPossible(e)) {
+                dataWriter.start();
+            }
+            throw e;
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        dataWriter.start();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        dataWriter.stop();
+        super.doStop();
+    }
+
+    private void createWriter(ProducerType producerType) {
+        switch (producerType) {
+        case TCP: {
+            LOG.debug("Creating TcpDataWriter");
+            dataWriter = new TcpDataWriter(endpoint, buildSplunkArgs());
+            ((TcpDataWriter) dataWriter).setPort(endpoint.getConfiguration().getTcpReceiverPort());
+            LOG.debug("TcpDataWriter created for endpoint {}", endpoint);
+            break;
+        }
+        case SUBMIT: {
+            LOG.debug("Creating SubmitDataWriter");
+            dataWriter = new SubmitDataWriter(endpoint, buildSplunkArgs());
+            ((SubmitDataWriter) dataWriter).setIndex(endpoint.getConfiguration().getIndex());
+            LOG.debug("SubmitDataWriter created for endpoint {}", endpoint);
+            break;
+        }
+        case STREAM: {
+            LOG.debug("Creating StreamDataWriter");
+            dataWriter = new StreamDataWriter(endpoint, buildSplunkArgs());
+            ((StreamDataWriter) dataWriter).setIndex(endpoint.getConfiguration().getIndex());
+            LOG.debug("StreamDataWriter created for endpoint {}", endpoint);
+            break;
+        }
+        case UNKNOWN: {
+            throw new RuntimeException("unknown producerType");
+        }
+        default: {
+            throw new RuntimeException("unknown producerType");
+        }
+        }
+    }
+
+    private Args buildSplunkArgs() {
+        Args args = new Args();
+        if (endpoint.getConfiguration().getSourceType() != null) {
+            args.put("sourcetype", endpoint.getConfiguration().getSourceType());
+        }
+        if (endpoint.getConfiguration().getSource() != null) {
+            args.put("source", endpoint.getConfiguration().getSource());
+        }
+        return args;
+    }
+
+    protected DataWriter getDataWriter() {
+        return dataWriter;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/event/SplunkEvent.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/event/SplunkEvent.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/event/SplunkEvent.java
new file mode 100644
index 0000000..589a784
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/event/SplunkEvent.java
@@ -0,0 +1,545 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk.event;
+
+import java.io.Serializable;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+@SuppressWarnings("serial")
+public class SplunkEvent implements Serializable {
+
+    // ----------------------------------
+    // Common event fields
+    // ----------------------------------
+
+    /**
+     * A device-specific classification provided as part of the event.
+     */
+    public static final String COMMON_CATEGORY = "category";
+    /**
+     * A device-specific classification provided as part of the event.
+     */
+    public static final String COMMON_COUNT = "count";
+    /**
+     * The free-form description of a particular event.
+     */
+    public static final String COMMON_DESC = "desc";
+    /**
+     * The name of a given DHCP pool on a DHCP server.
+     */
+    public static final String COMMON_DHCP_POOL = "dhcp_pool";
+    /**
+     * The amount of time the event lasted.
+     */
+    public static final String COMMON_DURATION = "duration";
+    /**
+     * The fully qualified domain name of the device transmitting or recording
+     * the log record.
+     */
+    public static final String COMMON_DVC_HOST = "dvc_host";
+    /**
+     * The IPv4 address of the device reporting the event.
+     */
+    public static final String COMMON_DVC_IP = "dvc_ip";
+    /**
+     * The IPv6 address of the device reporting the event.
+     */
+    public static final String COMMON_DVC_IP6 = "dvc_ip6";
+    /**
+     * The free-form description of the device's physical location.
+     */
+    public static final String COMMON_DVC_LOCATION = "dvc_location";
+    /**
+     * The MAC (layer 2) address of the device reporting the event.
+     */
+    public static final String COMMON_DVC_MAC = "dvc_mac";
+    /**
+     * The Windows NT domain of the device recording or transmitting the event.
+     */
+    public static final String COMMON_DVC_NT_DOMAIN = "dvc_nt_domain";
+    /**
+     * The Windows NT host name of the device recording or transmitting the
+     * event.
+     */
+    public static final String COMMON_DVC_NT_HOST = "dvc_nt_host";
+    /**
+     * Time at which the device recorded the event.
+     */
+    public static final String COMMON_DVC_TIME = "dvc_time";
+    /**
+     * The event's specified end time.
+     */
+    public static final String COMMON_END_TIME = "end_time";
+    /**
+     * A unique identifier that identifies the event. This is unique to the
+     * reporting device.
+     */
+    public static final String COMMON_EVENT_ID = "event_id";
+    /**
+     * The length of the datagram, event, message, or packet.
+     */
+    public static final String COMMON_LENGTH = "length";
+    /**
+     * The log-level that was set on the device and recorded in the event.
+     */
+    public static final String COMMON_LOG_LEVEL = "log_level";
+    /**
+     * The name of the event as reported by the device. The name should not
+     * contain information that's already being parsed into other fields from
+     * the event, such as IP addresses.
+     */
+    public static final String COMMON_NAME = "name";
+    /**
+     * An integer assigned by the device operating system to the process
+     * creating the record.
+     */
+    public static final String COMMON_PID = "pid";
+    /**
+     * An environment-specific assessment of the event's importance, based on
+     * elements such as event severity, business function of the affected
+     * system, or other locally defined variables.
+     */
+    public static final String COMMON_PRIORITY = "priority";
+    /**
+     * The product that generated the event.
+     */
+    public static final String COMMON_PRODUCT = "product";
+    /**
+     * The version of the product that generated the event.
+     */
+    public static final String COMMON_PRODUCT_VERSION = "product_version";
+    /**
+     * The result root cause, such as connection refused, timeout, crash, and so
+     * on.
+     */
+    public static final String COMMON_REASON = "reason";
+    /**
+     * The action result. Often is a binary choice: succeeded and failed,
+     * allowed and denied, and so on.
+     */
+    public static final String COMMON_RESULT = "result";
+    /**
+     * The severity (or priority) of an event as reported by the originating
+     * device.
+     */
+    public static final String COMMON_SEVERITY = "severity";
+    /**
+     * The event's specified start time.
+     */
+    public static final String COMMON_START_TIME = "start_time";
+    /**
+     * The transaction identifier.
+     */
+    public static final String COMMON_TRANSACTION_ID = "transaction_id";
+    /**
+     * A uniform record locator (a web address, in other words) included in a
+     * record.
+     */
+    public static final String COMMON_URL = "url";
+    /**
+     * The vendor who made the product that generated the event.
+     */
+    public static final String COMMON_VENDOR = "vendor";
+
+    // ----------------------------------
+    // Update
+    // ----------------------------------
+
+    /**
+     * The name of the installed update.
+     */
+    public static final String UPDATE_PACKAGE = "package";
+
+    /**
+     * default key value delimiter
+     */
+    protected static final String KVDELIM = "=";
+    /**
+     * default pair delimiter
+     */
+    protected static final String PAIRDELIM = " ";
+    /**
+     * default quote char
+     */
+    protected static final char QUOTE = '"';
+    /**
+     * default date format is using internal generated date
+     */
+    protected static final String DATEFORMATPATTERN = "yyyy-MM-dd\tHH:mm:ss:SSSZ";
+    /**
+     * Date Formatter
+     */
+    protected static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern(DATEFORMATPATTERN);
+
+    /**
+     * Event prefix fields
+     */
+    protected static final String PREFIX_NAME = "name";
+    protected static final String PREFIX_EVENT_ID = "event_id";
+
+    /**
+     * Java Throwable type fields
+     */
+    protected static final String THROWABLE_CLASS = "throwable_class";
+    protected static final String THROWABLE_MESSAGE = "throwable_message";
+    protected static final String THROWABLE_STACKTRACE_ELEMENTS = "stacktrace_elements";
+
+    protected static final String LINEBREAK = "\n";
+
+    /**
+     * Whether or not to put quotes around values
+     */
+    protected boolean quoteValues = true;
+
+    /**
+     * Whether or not to add a date to the event string
+     */
+    protected boolean useInternalDate = true;
+
+    /**
+     * Contents of the event message
+     */
+    private StringBuffer eventMessage;
+
+    /**
+     * A Constructor to load data from a Map
+     *
+     * @param data the map
+     */
+    public SplunkEvent(Map<String, String> data) {
+        this.eventMessage = new StringBuffer();
+        for (String key : data.keySet()) {
+            this.addPair(key, data.get(key));
+        }
+    }
+
+    /**
+     * A Copy constructor
+     */
+    public SplunkEvent(SplunkEvent splunkEvent) {
+        this.eventMessage = splunkEvent.eventMessage;
+        this.quoteValues = splunkEvent.quoteValues;
+        this.useInternalDate = splunkEvent.useInternalDate;
+    }
+
+    /**
+     * Constructor to create a generic event
+     *
+     * @param eventName       the event name
+     * @param eventID         the event id
+     * @param useInternalDate whether or not to add a date to the event string
+     * @param quoteValues     whether or not to put quotes around values
+     */
+    public SplunkEvent(String eventName, String eventID, boolean useInternalDate, boolean quoteValues) {
+
+        this.eventMessage = new StringBuffer();
+        this.quoteValues = quoteValues;
+        this.useInternalDate = useInternalDate;
+
+        addPair(PREFIX_NAME, eventName);
+        addPair(PREFIX_EVENT_ID, eventID);
+    }
+
+    /**
+     * Constructor to create a generic event with the default format
+     *
+     * @param eventName the event name
+     * @param eventID   the event ID
+     */
+    public SplunkEvent(String eventName, String eventID) {
+        this(eventName, eventID, true, true);
+    }
+
+    /**
+     * Default constructor
+     */
+    public SplunkEvent() {
+        this.eventMessage = new StringBuffer();
+    }
+
+    public Map<String, String> getEventData() {
+        Map<String, String> eventData = new HashMap<String, String>();
+        String eventEntries = eventMessage.toString();
+
+        String[] entries = eventEntries.split(PAIRDELIM);
+
+        String quote = new String(new char[]{QUOTE});
+
+        for (String entry : entries) {
+            String[] pair = entry.split(KVDELIM);
+
+            if (pair.length != 2) {
+                throw new UnsupportedOperationException(String.format("invalid event data [%s]", entry));
+            }
+
+            String key = pair[0].replaceAll(quote, "");
+            String value = pair[1].replaceAll(quote, "");
+            if ("null".equals(value)) {
+                value = null;
+            }
+
+            eventData.put(key, value);
+        }
+        return eventData;
+    }
+
+    /**
+     * Add a key value pair
+     */
+    public void addPair(String key, char value) {
+        addPair(key, String.valueOf(value));
+    }
+
+    /**
+     * Add a key value pair
+     */
+    public void addPair(String key, boolean value) {
+        addPair(key, String.valueOf(value));
+    }
+
+    /**
+     * Add a key value pair
+     */
+    public void addPair(String key, double value) {
+        addPair(key, String.valueOf(value));
+    }
+
+    /**
+     * Add a key value pair
+     */
+    public void addPair(String key, long value) {
+        addPair(key, String.valueOf(value));
+    }
+
+    /**
+     * Add a key value pair
+     */
+    public void addPair(String key, int value) {
+        addPair(key, String.valueOf(value));
+    }
+
+    /**
+     * Add a key value pair
+     */
+    public void addPair(String key, Object value) {
+        addPair(key, value.toString());
+    }
+
+    /**
+     * Utility method for formatting Throwable,Error,Exception objects in a more
+     * linear and Splunk friendly manner than printStackTrace
+     *
+     * @param throwable the Throwable object to add to the event
+     */
+    public void addThrowable(Throwable throwable) {
+        addThrowableObject(throwable, -1);
+    }
+
+    /**
+     * Utility method for formatting Throwable,Error,Exception objects in a more
+     * linear and Splunk friendly manner than printStackTrace
+     *
+     * @param throwable       the Throwable object to add to the event
+     * @param stackTraceDepth maximum number of stacktrace elements to log
+     */
+    public void addThrowable(Throwable throwable, int stackTraceDepth) {
+        addThrowableObject(throwable, stackTraceDepth);
+    }
+
+    /**
+     * Internal private method for formatting Throwable,Error,Exception objects
+     * in a more linear and Splunk friendly manner than printStackTrace
+     *
+     * @param throwable       the Throwable object to add to the event
+     * @param stackTraceDepth maximum number of stacktrace elements to log, -1
+     *                        for all
+     */
+
+    private void addThrowableObject(Throwable throwable, int stackTraceDepth) {
+        addPair(THROWABLE_CLASS, throwable.getClass().getCanonicalName());
+        addPair(THROWABLE_MESSAGE, throwable.getMessage());
+        StackTraceElement[] elements = throwable.getStackTrace();
+        StringBuilder sb = new StringBuilder();
+        int depth = 0;
+        for (StackTraceElement element : elements) {
+            depth++;
+            if (stackTraceDepth == -1 || stackTraceDepth >= depth) {
+                sb.append(element.toString()).append(",");
+            } else {
+                break;
+            }
+        }
+        addPair(THROWABLE_STACKTRACE_ELEMENTS, sb.toString());
+    }
+
+    /**
+     * Add a key value pair
+     */
+    public void addPair(String key, String value) {
+        if (quoteValues) {
+            this.eventMessage.append(key).append(KVDELIM).append(QUOTE).append(value).append(QUOTE).append(PAIRDELIM);
+        } else {
+            this.eventMessage.append(key).append(KVDELIM).append(value).append(PAIRDELIM);
+        }
+    }
+
+    /**
+     * return the completed event message
+     */
+    @Override
+    public String toString() {
+        String event = "";
+
+        if (useInternalDate) {
+            StringBuilder clonedMessage = new StringBuilder();
+            clonedMessage.append(DATE_FORMATTER.print(new Date().getTime())).append(PAIRDELIM).append(this.eventMessage);
+            event = clonedMessage.toString();
+        } else {
+            event = eventMessage.toString();
+        }
+
+        // trim off trailing pair delim char(s)
+        String result = event.substring(0, event.length() - PAIRDELIM.length()) + LINEBREAK;
+        return result;
+    }
+
+    public void setCommonCategory(String commonCategory) {
+        addPair(COMMON_CATEGORY, commonCategory);
+    }
+
+    public void setCommonCount(String commonCount) {
+        addPair(COMMON_COUNT, commonCount);
+    }
+
+    public void setCommonDesc(String commonDesc) {
+        addPair(COMMON_DESC, commonDesc);
+    }
+
+    public void setCommonDhcpPool(String commonDhcpPool) {
+        addPair(COMMON_DHCP_POOL, commonDhcpPool);
+    }
+
+    public void setCommonDuration(long commonDuration) {
+        addPair(COMMON_DURATION, commonDuration);
+    }
+
+    public void setCommonDvcHost(String commonDvcHost) {
+        addPair(COMMON_DVC_HOST, commonDvcHost);
+    }
+
+    public void setCommonDvcIp(String commonDvcIp) {
+        addPair(COMMON_DVC_IP, commonDvcIp);
+    }
+
+    public void setCommonDvcIp6(String commonDvcIp6) {
+        addPair(COMMON_DVC_IP6, commonDvcIp6);
+    }
+
+    public void setCommonDvcLocation(String commonDvcLocation) {
+        addPair(COMMON_DVC_LOCATION, commonDvcLocation);
+    }
+
+    public void setCommonDvcMac(String commonDvcMac) {
+        addPair(COMMON_DVC_MAC, commonDvcMac);
+    }
+
+    public void setCommonDvcNtDomain(String commonDvcNtDomain) {
+        addPair(COMMON_DVC_NT_DOMAIN, commonDvcNtDomain);
+    }
+
+    public void setCommonDvcNtHost(String commonDvcNtHost) {
+        addPair(COMMON_DVC_NT_HOST, commonDvcNtHost);
+    }
+
+    public void setCommonDvcTime(long commonDvcTime) {
+        addPair(COMMON_DVC_TIME, commonDvcTime);
+    }
+
+    public void setCommonEndTime(long commonEndTime) {
+        addPair(COMMON_END_TIME, commonEndTime);
+    }
+
+    public void setCommonEventId(long commonEventId) {
+        addPair(COMMON_EVENT_ID, commonEventId);
+    }
+
+    public void setCommonLength(long commonLength) {
+        addPair(COMMON_LENGTH, commonLength);
+    }
+
+    public void setCommonLogLevel(String commonLogLevel) {
+        addPair(COMMON_LOG_LEVEL, commonLogLevel);
+    }
+
+    public void setCommonName(String commonName) {
+        addPair(COMMON_NAME, commonName);
+    }
+
+    public void setCommonPid(long commonPid) {
+        addPair(COMMON_PID, commonPid);
+    }
+
+    public void setCommonPriority(long commonPriority) {
+        addPair(COMMON_PRIORITY, commonPriority);
+    }
+
+    public void setCommonProduct(String commonProduct) {
+        addPair(COMMON_PRODUCT, commonProduct);
+    }
+
+    public void setCommonProductVersion(long commonProductVersion) {
+        addPair(COMMON_PRODUCT_VERSION, commonProductVersion);
+    }
+
+    public void setCommonReason(String commonReason) {
+        addPair(COMMON_REASON, commonReason);
+    }
+
+    public void setCommonResult(String commonResult) {
+        addPair(COMMON_RESULT, commonResult);
+    }
+
+    public void setCommonSeverity(String commonSeverity) {
+        addPair(COMMON_SEVERITY, commonSeverity);
+    }
+
+    public void setCommonStartTime(long commonStartTime) {
+        addPair(COMMON_START_TIME, commonStartTime);
+    }
+
+    public void setCommonTransactionId(String commonTransactionId) {
+        addPair(COMMON_TRANSACTION_ID, commonTransactionId);
+    }
+
+    public void setCommonUrl(String commonUrl) {
+        addPair(COMMON_URL, commonUrl);
+    }
+
+    public void setCommonVendor(String commonVendor) {
+        addPair(COMMON_VENDOR, commonVendor);
+    }
+
+    public void setUpdatePackage(String updatePackage) {
+        addPair(UPDATE_PACKAGE, updatePackage);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/DataWriter.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/DataWriter.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/DataWriter.java
new file mode 100644
index 0000000..49cccca
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/DataWriter.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk.support;
+
+import org.apache.camel.component.splunk.event.SplunkEvent;
+
+public interface DataWriter {
+    void write(SplunkEvent data) throws Exception;
+
+    void stop();
+
+    void start();
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataReader.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataReader.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataReader.java
new file mode 100644
index 0000000..f3a0afd
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataReader.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk.support;
+
+import java.io.InputStream;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.splunk.Args;
+import com.splunk.Job;
+import com.splunk.ResultsReader;
+import com.splunk.ResultsReaderJson;
+import com.splunk.SavedSearch;
+import com.splunk.SavedSearchCollection;
+import com.splunk.Service;
+import org.apache.camel.component.splunk.ConsumerType;
+import org.apache.camel.component.splunk.SplunkEndpoint;
+import org.apache.camel.component.splunk.event.SplunkEvent;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SplunkDataReader {
+    private static final Logger LOG = LoggerFactory.getLogger(SplunkDataReader.class);
+    private static final String DATE_FORMAT = "MM/dd/yy HH:mm:ss:SSS";
+    private static final String SPLUNK_TIME_FORMAT = "%m/%d/%y %H:%M:%S:%3N";
+
+    private transient Calendar lastSuccessfulReadTime;
+    private SplunkEndpoint endpoint;
+    private ConsumerType consumerType;
+
+    public SplunkDataReader(SplunkEndpoint endpoint, ConsumerType consumerType) {
+        this.endpoint = endpoint;
+        this.consumerType = consumerType;
+    }
+
+    public int getCount() {
+        return endpoint.getConfiguration().getCount();
+    }
+
+    public String getFieldList() {
+        return endpoint.getConfiguration().getFieldList();
+    }
+
+    public String getSearch() {
+        return endpoint.getConfiguration().getSearch();
+    }
+
+    public String getEarliestTime() {
+        return endpoint.getConfiguration().getEarliestTime();
+    }
+
+    public String getLatestTime() {
+        return endpoint.getConfiguration().getLatestTime();
+    }
+
+    public String getInitEarliestTime() {
+        return endpoint.getConfiguration().getInitEarliestTime();
+    }
+
+    private String getSavedSearch() {
+        return endpoint.getConfiguration().getSavedSearch();
+    }
+
+    public List<SplunkEvent> read() throws Exception {
+        switch (consumerType) {
+        case NORMAL: {
+            return nonBlockingSearch();
+        }
+        case REALTIME: {
+            return realtimeSearch();
+        }
+        case SAVEDSEARCH: {
+            return savedSearch();
+        }
+        default: {
+            throw new RuntimeException("Unknown search mode " + consumerType);
+        }
+        }
+    }
+
+    /**
+     * Get the earliestTime of range search.
+     *
+     * @param startTime the time where search start
+     * @param realtime  if this is realtime search
+     * @return The time of last successful read if not realtime; Time difference
+     *         between last successful read and start time;
+     */
+    private String calculateEarliestTime(Calendar startTime, boolean realtime) {
+        String result;
+        if (realtime) {
+            result = calculateEarliestTimeForRealTime(startTime);
+        } else {
+            DateFormat df = new SimpleDateFormat(DATE_FORMAT);
+            result = df.format(lastSuccessfulReadTime.getTime());
+        }
+        return result;
+    }
+
+    /**
+     * Gets earliest time for realtime search
+     */
+    private String calculateEarliestTimeForRealTime(Calendar startTime) {
+        String result;
+        long diff = startTime.getTimeInMillis() - lastSuccessfulReadTime.getTimeInMillis();
+        result = "-" + diff / 1000 + "s";
+        return result;
+    }
+
+    private void populateArgs(Args queryArgs, Calendar startTime, boolean realtime) {
+        String earliestTime = getEarliestTime(startTime, realtime);
+        if (ObjectHelper.isNotEmpty(earliestTime)) {
+            queryArgs.put("earliest_time", earliestTime);
+        }
+
+        String latestTime = getLatestTime(startTime, realtime);
+        if (ObjectHelper.isNotEmpty(latestTime)) {
+            queryArgs.put("latest_time", latestTime);
+        }
+
+        queryArgs.put("time_format", SPLUNK_TIME_FORMAT);
+
+        if (ObjectHelper.isNotEmpty(getFieldList())) {
+            queryArgs.put("field_list", getFieldList());
+        }
+    }
+
+    private String getLatestTime(Calendar startTime, boolean realtime) {
+        String lTime;
+        if (ObjectHelper.isNotEmpty(getLatestTime())) {
+            lTime = getLatestTime();
+        } else {
+            if (realtime) {
+                lTime = "rt";
+            } else {
+                DateFormat df = new SimpleDateFormat(DATE_FORMAT);
+                lTime = df.format(startTime.getTime());
+            }
+        }
+        return lTime;
+    }
+
+    private String getEarliestTime(Calendar startTime, boolean realtime) {
+        String eTime = null;
+
+        if (lastSuccessfulReadTime == null) {
+            eTime = getInitEarliestTime();
+        } else {
+            if (ObjectHelper.isNotEmpty(getEarliestTime())) {
+                eTime = getEarliestTime();
+            } else {
+                String calculatedEarliestTime = calculateEarliestTime(startTime, realtime);
+                if (calculatedEarliestTime != null) {
+                    if (realtime) {
+                        eTime = "rt" + calculatedEarliestTime;
+                    } else {
+                        eTime = calculatedEarliestTime;
+                    }
+                }
+            }
+        }
+        return eTime;
+    }
+
+    private List<SplunkEvent> savedSearch() throws Exception {
+        LOG.trace("saved search start");
+
+        Args queryArgs = new Args();
+        queryArgs.put("app", "search");
+        if (ObjectHelper.isNotEmpty(endpoint.getConfiguration().getOwner())) {
+            queryArgs.put("owner", endpoint.getConfiguration().getOwner());
+        }
+        if (ObjectHelper.isNotEmpty(endpoint.getConfiguration().getApp())) {
+            queryArgs.put("app", endpoint.getConfiguration().getApp());
+        }
+
+        Calendar startTime = Calendar.getInstance();
+
+        SavedSearch search = null;
+        Job job = null;
+        String latestTime = getLatestTime(startTime, false);
+        String earliestTime = getEarliestTime(startTime, false);
+
+        Service service = endpoint.getService();
+        SavedSearchCollection savedSearches = service.getSavedSearches(queryArgs);
+        for (SavedSearch s : savedSearches.values()) {
+            if (s.getName().equals(getSavedSearch())) {
+                search = s;
+                break;
+            }
+        }
+        if (search != null) {
+            Map<String, String> args = new HashMap<String, String>();
+            args.put("force_dispatch", "true");
+            args.put("dispatch.earliest_time", earliestTime);
+            args.put("dispatch.latest_time", latestTime);
+            job = search.dispatch(args);
+        }
+        while (!job.isDone()) {
+            Thread.sleep(2000);
+        }
+        List<SplunkEvent> data = extractData(job, false);
+        this.lastSuccessfulReadTime = startTime;
+        return data;
+
+    }
+
+    private List<SplunkEvent> nonBlockingSearch() throws Exception {
+        LOG.debug("non block search start");
+
+        Args queryArgs = new Args();
+        queryArgs.put("exec_mode", "normal");
+        Calendar startTime = Calendar.getInstance();
+        populateArgs(queryArgs, startTime, false);
+
+        List<SplunkEvent> data = runQuery(queryArgs, false);
+        lastSuccessfulReadTime = startTime;
+        return data;
+    }
+
+    private List<SplunkEvent> realtimeSearch() throws Exception {
+        LOG.trace("realtime search start");
+
+        Args queryArgs = new Args();
+        // queryArgs.put("exec_mode", "normal");
+        queryArgs.put("search_mode", "realtime");
+        Calendar startTime = Calendar.getInstance();
+        populateArgs(queryArgs, startTime, true);
+
+        List<SplunkEvent> data = runQuery(queryArgs, true);
+        lastSuccessfulReadTime = startTime;
+        return data;
+    }
+
+    private List<SplunkEvent> runQuery(Args queryArgs, boolean realtime) throws Exception {
+        Service service = endpoint.getService();
+        Job job = service.getJobs().create(getSearch(), queryArgs);
+        if (realtime) {
+            while (!job.isReady()) {
+                Thread.sleep(2000);
+            }
+        } else {
+            while (!job.isDone()) {
+                Thread.sleep(2000);
+            }
+        }
+        return extractData(job, realtime);
+    }
+
+    private List<SplunkEvent> extractData(Job job, boolean realtime) throws Exception {
+        List<SplunkEvent> result = new ArrayList<SplunkEvent>();
+        HashMap<String, String> data;
+        SplunkEvent splunkData;
+        ResultsReader resultsReader = null;
+        int total = 0;
+        if (realtime) {
+            // total = job.getResultPreviewCount();
+        } else {
+            total = job.getResultCount();
+        }
+        if (getCount() == 0 || total < getCount()) {
+            InputStream stream = null;
+            Args outputArgs = new Args();
+            outputArgs.put("output_mode", "json");
+            if (realtime) {
+                stream = job.getResultsPreview(outputArgs);
+            } else {
+                stream = job.getResults(outputArgs);
+            }
+
+            resultsReader = new ResultsReaderJson(stream);
+            while ((data = resultsReader.getNextEvent()) != null) {
+                splunkData = new SplunkEvent(data);
+                result.add(splunkData);
+            }
+            IOHelper.close(stream);
+        } else {
+            int offset = 0;
+            while (offset < total) {
+                InputStream stream;
+                Args outputArgs = new Args();
+                outputArgs.put("output_mode", "json");
+                outputArgs.put("count", getCount());
+                outputArgs.put("offset", offset);
+                if (realtime) {
+                    stream = job.getResultsPreview(outputArgs);
+                } else {
+                    stream = job.getResults(outputArgs);
+                }
+                resultsReader = new ResultsReaderJson(stream);
+                while ((data = resultsReader.getNextEvent()) != null) {
+                    splunkData = new SplunkEvent(data);
+                    result.add(splunkData);
+                }
+                offset += getCount();
+                IOHelper.close(stream);
+            }
+        }
+        if (resultsReader != null) {
+            resultsReader.close();
+        }
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java
new file mode 100644
index 0000000..46dba18
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk.support;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.Socket;
+
+import com.splunk.Args;
+import com.splunk.Service;
+import org.apache.camel.component.splunk.SplunkEndpoint;
+import org.apache.camel.component.splunk.event.SplunkEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class SplunkDataWriter implements DataWriter {
+    protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+    protected Socket socket;
+    protected SplunkEndpoint endpoint;
+    protected Args args;
+
+    public SplunkDataWriter(SplunkEndpoint endpoint, Args args) {
+        this.endpoint = endpoint;
+        this.args = args;
+    }
+
+    protected abstract Socket createSocket(Service service) throws IOException;
+
+    public void write(SplunkEvent event) throws Exception {
+        logger.debug("writing event to splunk:" + event);
+        doWrite(event, socket);
+    }
+
+    protected void doWrite(SplunkEvent event, Socket socket) throws IOException {
+        OutputStream ostream = socket.getOutputStream();
+        Writer writer = new OutputStreamWriter(ostream, "UTF8");
+        writer.write(event.toString());
+        writer.flush();
+    }
+
+    public Args getArgs() {
+        return args;
+    }
+
+    @Override
+    public synchronized void start() {
+        try {
+            socket = createSocket(endpoint.getService());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public synchronized void stop() {
+        try {
+            if (socket != null) {
+                socket.close();
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/StreamDataWriter.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/StreamDataWriter.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/StreamDataWriter.java
new file mode 100644
index 0000000..ffc4102
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/StreamDataWriter.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk.support;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import com.splunk.Args;
+import com.splunk.Index;
+import com.splunk.Receiver;
+import com.splunk.Service;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.splunk.SplunkEndpoint;
+
+public class StreamDataWriter extends SplunkDataWriter {
+    private String index;
+
+    public StreamDataWriter(SplunkEndpoint endpoint, Args args) {
+        super(endpoint, args);
+    }
+
+    public void setIndex(String index) {
+        this.index = index;
+    }
+
+    @Override
+    protected Socket createSocket(Service service) throws IOException {
+        Index indexObject = null;
+        Receiver receiver = null;
+        Socket socket = null;
+
+        if (index != null) {
+            indexObject = service.getIndexes().get(index);
+            if (indexObject == null) {
+                throw new RuntimeCamelException(String.format("cannot find index [%s]", index));
+            }
+            socket = indexObject.attach(args);
+        } else {
+            receiver = service.getReceiver();
+            socket = receiver.attach(args);
+        }
+        socket.setTcpNoDelay(true);
+        logger.trace(String.format("created a socket on %s", socket.getRemoteSocketAddress()));
+        return socket;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java
new file mode 100644
index 0000000..1a96569
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk.support;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import com.splunk.Args;
+import com.splunk.Index;
+import com.splunk.Receiver;
+import com.splunk.Service;
+import org.apache.camel.component.splunk.SplunkEndpoint;
+import org.apache.camel.component.splunk.event.SplunkEvent;
+
+public class SubmitDataWriter extends SplunkDataWriter {
+    private String index;
+
+    public SubmitDataWriter(SplunkEndpoint endpoint, Args args) {
+        super(endpoint, args);
+    }
+
+    protected void doWrite(SplunkEvent event, Socket socket) throws IOException {
+        Index index = getIndex();
+        if (index != null) {
+            index.submit(args, event.toString());
+        } else {
+            Receiver receiver = endpoint.getService().getReceiver();
+            receiver.submit(args, event.toString());
+        }
+    }
+
+    @Override
+    protected Socket createSocket(Service service) throws IOException {
+        return null;
+    }
+
+    public void setIndex(String index) {
+        this.index = index;
+    }
+
+    private Index getIndex() {
+        return (index == null) ? null : endpoint.getService().getIndexes().get(index);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/TcpDataWriter.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/TcpDataWriter.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/TcpDataWriter.java
new file mode 100644
index 0000000..511d39b
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/TcpDataWriter.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk.support;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import com.splunk.Args;
+import com.splunk.Input;
+import com.splunk.Service;
+import org.apache.camel.component.splunk.SplunkEndpoint;
+
+public class TcpDataWriter extends SplunkDataWriter {
+    private int port;
+
+    public TcpDataWriter(SplunkEndpoint endpoint, Args args) {
+        super(endpoint, args);
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    @Override
+    protected Socket createSocket(Service service) throws IOException {
+        Input input = service.getInputs().get(String.valueOf(port));
+        if (input == null) {
+            throw new RuntimeException("no input defined for port " + port);
+        }
+        if (input.isDisabled()) {
+            throw new RuntimeException(String.format("input on port %d is disabled", port));
+        }
+        Socket socket = service.open(port);
+        return socket;
+    }
+}