You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2023/08/17 10:29:20 UTC

[camel-quarkus] branch main updated: Expand Splunk test coverage

This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git


The following commit(s) were added to refs/heads/main by this push:
     new 85bbbccbb9 Expand Splunk test coverage
85bbbccbb9 is described below

commit 85bbbccbb933b07a5f1eeebf3c6abce536bcf21f
Author: JiriOndrusek <on...@gmail.com>
AuthorDate: Tue Aug 15 09:06:26 2023 +0200

    Expand Splunk test coverage
---
 .../splunk/deployment/SplunkProcessor.java         |  25 +--
 integration-tests/splunk/README.adoc               |   6 +
 .../component/splunk/it/SplunkResource.java        | 165 ++++++-----------
 .../quarkus/component/splunk/it/SplunkTest.java    | 195 ++++++++++++---------
 .../component/splunk/it/SplunkTestResource.java    |  22 ++-
 5 files changed, 205 insertions(+), 208 deletions(-)

diff --git a/extensions/splunk/deployment/src/main/java/org/apache/camel/quarkus/component/splunk/deployment/SplunkProcessor.java b/extensions/splunk/deployment/src/main/java/org/apache/camel/quarkus/component/splunk/deployment/SplunkProcessor.java
index c79d93282c..724d9892cb 100644
--- a/extensions/splunk/deployment/src/main/java/org/apache/camel/quarkus/component/splunk/deployment/SplunkProcessor.java
+++ b/extensions/splunk/deployment/src/main/java/org/apache/camel/quarkus/component/splunk/deployment/SplunkProcessor.java
@@ -18,10 +18,13 @@ package org.apache.camel.quarkus.component.splunk.deployment;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.stream.Collectors;
 
+import com.splunk.HttpService;
+import com.splunk.Index;
+import com.splunk.Input;
+import com.splunk.SavedSearch;
+import com.splunk.Service;
 import io.quarkus.deployment.annotations.BuildStep;
 import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
 import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem;
@@ -51,25 +54,27 @@ class SplunkProcessor {
     @BuildStep
     RuntimeInitializedClassBuildItem runtimeInitBcryptUtil() {
         // this class uses a SecureRandom which needs to be initialised at run time
-        return new RuntimeInitializedClassBuildItem("com.splunk.HttpService");
+        return new RuntimeInitializedClassBuildItem(HttpService.class.getName());
     }
 
     @BuildStep
     ReflectiveClassBuildItem registerForReflection(CombinedIndexBuildItem combinedIndex) {
         IndexView index = combinedIndex.getIndex();
 
-        List<String> dtos = new LinkedList<>();
-        dtos.addAll(index.getAllKnownSubclasses(DotName.createSimple("com.splunk.Input")).stream()
-                .map(c -> c.name().toString()).collect(Collectors.toList()));
+        List<String> dtos = index.getAllKnownSubclasses(DotName.createSimple(Input.class))
+                .stream()
+                .map(c -> c.name().toString())
+                .toList();
 
-        return ReflectiveClassBuildItem.builder(dtos.toArray(new String[dtos.size()])).build();
+        return ReflectiveClassBuildItem.builder(dtos.toArray(new String[0])).build();
     }
 
     @BuildStep
     List<ReflectiveClassBuildItem> reflectiveClasses() {
-        return Arrays.asList(ReflectiveClassBuildItem.builder("com.splunk.Index").build(),
-                ReflectiveClassBuildItem.builder("com.splunk.SavedSearch").build(),
-                ReflectiveClassBuildItem.builder("com.splunk.Service").build());
+        return Arrays.asList(ReflectiveClassBuildItem.builder(Index.class.getName()).constructors().build(),
+                ReflectiveClassBuildItem.builder(SavedSearch.class.getName()).constructors().build(),
+                ReflectiveClassBuildItem.builder(Input.class.getName()).constructors().build(),
+                ReflectiveClassBuildItem.builder(Service.class.getName()).constructors().build());
     }
 
     @BuildStep
diff --git a/integration-tests/splunk/README.adoc b/integration-tests/splunk/README.adoc
new file mode 100644
index 0000000000..f69cb4ec8a
--- /dev/null
+++ b/integration-tests/splunk/README.adoc
@@ -0,0 +1,6 @@
+=== Splunk Web UI
+
+The Splunk UI is helpful for the testing as you can see (and search) for data inside Splunk. The URL to the Splunk UI
+is output to the console after the test container starts. Watch for the text 'Splunk UI running on'.
+
+You may want to delay the Splunk container shutdown either by adding a `Thread.sleep` into `SplunkTestResource.stop()` or commenting out `container.stop()`.
diff --git a/integration-tests/splunk/src/main/java/org/apache/camel/quarkus/component/splunk/it/SplunkResource.java b/integration-tests/splunk/src/main/java/org/apache/camel/quarkus/component/splunk/it/SplunkResource.java
index 2b762053ab..afe65d099f 100644
--- a/integration-tests/splunk/src/main/java/org/apache/camel/quarkus/component/splunk/it/SplunkResource.java
+++ b/integration-tests/splunk/src/main/java/org/apache/camel/quarkus/component/splunk/it/SplunkResource.java
@@ -17,27 +17,26 @@
 package org.apache.camel.quarkus.component.splunk.it;
 
 import java.net.URI;
-import java.util.Arrays;
-import java.util.Collections;
+import java.net.URISyntaxException;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
 import jakarta.enterprise.context.ApplicationScoped;
 import jakarta.inject.Inject;
 import jakarta.inject.Named;
 import jakarta.ws.rs.Consumes;
-import jakarta.ws.rs.GET;
 import jakarta.ws.rs.POST;
 import jakarta.ws.rs.Path;
+import jakarta.ws.rs.PathParam;
 import jakarta.ws.rs.Produces;
 import jakarta.ws.rs.QueryParam;
 import jakarta.ws.rs.core.MediaType;
 import jakarta.ws.rs.core.Response;
-import org.apache.camel.CamelContext;
 import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.splunk.ProducerType;
 import org.apache.camel.component.splunk.SplunkComponent;
 import org.apache.camel.component.splunk.SplunkConfiguration;
 import org.apache.camel.component.splunk.event.SplunkEvent;
@@ -47,10 +46,10 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
 @ApplicationScoped
 public class SplunkResource {
 
+    public static final String SAVED_SEARCH_NAME = "savedSearchForTest";
     public static final String PARAM_REMOTE_PORT = "org.apache.camel.quarkus.component.splunk.it.SplunkResource_remotePort";
     public static final String PARAM_TCP_PORT = "org.apache.camel.quarkus.component.splunk.it.SplunkResource_tcpPort";
     public static final String SOURCE = "test";
-    public static final String SOURCE_TYPE = "testSource";
     public static final int LOCAL_TCP_PORT = 9998;
 
     @Inject
@@ -65,54 +64,44 @@ public class SplunkResource {
     @ConfigProperty(name = PARAM_TCP_PORT)
     Integer tcpPort;
 
-    @Inject
-    CamelContext camelContext;
-
     @Named
     SplunkComponent splunk() {
         SplunkComponent component = new SplunkComponent();
         component.setSplunkConfigurationFactory(parameters -> new SplunkConfiguration());
-
         return component;
     }
 
-    @Path("/normal")
+    @Path("/results/{name}")
     @POST
-    @Produces(MediaType.APPLICATION_JSON)
-    public List normal(String search) throws Exception {
-        String url = String.format(
-                "splunk://normal?username=admin&password=changeit&scheme=http&port=%d&delay=5000&initEarliestTime=-10s&search="
-                        + search,
-                port);
-
-        final SplunkEvent m1 = consumerTemplate.receiveBody(url, 1000, SplunkEvent.class);
-        final SplunkEvent m2 = consumerTemplate.receiveBody(url, 1000, SplunkEvent.class);
-        final SplunkEvent m3 = consumerTemplate.receiveBody(url, 1000, SplunkEvent.class);
-
-        List result = Arrays.stream(new SplunkEvent[] { m1, m2, m3 })
-                .map(m -> m.getEventData().entrySet().stream()
-                        .filter(e -> !e.getKey().startsWith("_"))
-                        .collect(Collectors.toMap(
-                                Map.Entry::getKey,
-                                Map.Entry::getValue,
-                                (v1, v2) -> v1)))
-                .collect(Collectors.toList());
-
-        return result;
-    }
+    public String results(@PathParam("name") String mapName) throws Exception {
+        String url;
+        int count = 3;
 
-    @Path("/savedSearch")
-    @POST
-    public String savedSearch(String name) throws Exception {
-        String url = String.format(
-                "splunk://savedsearch?username=admin&password=changeit&scheme=http&port=%d&delay=500&initEarliestTime=-1m&savedsearch=%s",
-                port, name);
-
-        final SplunkEvent m1 = consumerTemplate.receiveBody(url, 5000, SplunkEvent.class);
-        final SplunkEvent m2 = consumerTemplate.receiveBody(url, 1000, SplunkEvent.class);
-        final SplunkEvent m3 = consumerTemplate.receiveBody(url, 1000, SplunkEvent.class);
+        if ("savedSearch".equals(mapName)) {
+            url = String.format(
+                    "splunk://savedsearch?username=admin&password=changeit&scheme=http&port=%d&delay=500&initEarliestTime=-10m&savedsearch=%s",
+                    port, SAVED_SEARCH_NAME);
+        } else if ("normalSearch".equals(mapName)) {
+            url = String.format(
+                    "splunk://normal?username=admin&password=changeit&scheme=http&port=%d&delay=5000&initEarliestTime=-10s&search="
+                            + "search sourcetype=\"SUBMIT\" | rex field=_raw \"Name: (?<name>.*) From: (?<from>.*)\"",
+                    port);
+        } else {
+            url = String.format(
+                    "splunk://realtime?username=admin&password=changeit&scheme=http&port=%d&delay=3000&initEarliestTime=rt-10s&latestTime=RAW(rt+40s)&search="
+                            + "search sourcetype=\"STREAM\" | rex field=_raw \"Name: (?<name>.*) From: (?<from>.*)\"",
+                    port, ProducerType.STREAM.name());
+        }
 
-        List result = Arrays.stream(new SplunkEvent[] { m1, m2, m3 })
+        List<SplunkEvent> events = events = new LinkedList<>();
+        for (int i = 0; i < count; i++) {
+            SplunkEvent se = consumerTemplate.receiveBody(url, 5000, SplunkEvent.class);
+            if (se == null) {
+                break;
+            }
+            events.add(se);
+        }
+        List result = events.stream()
                 .map(m -> {
                     if (m == null) {
                         return "null";
@@ -120,84 +109,44 @@ public class SplunkResource {
                     return m.getEventData().get("_raw");
                 })
                 .collect(Collectors.toList());
-
         return result.toString();
     }
 
-    @Path("/directRealtimePolling")
-    @GET
-    @Produces(MediaType.APPLICATION_JSON)
-    public Map directRealtimePolling() throws Exception {
-        final SplunkEvent m1 = consumerTemplate.receiveBody("direct:realtimePolling", 3000, SplunkEvent.class);
-
-        if (m1 == null) {
-            return Collections.emptyMap();
-        }
-
-        Map result = m1.getEventData().entrySet().stream()
-                .filter(e -> !e.getKey().startsWith("_"))
-                .collect(Collectors.toMap(
-                        Map.Entry::getKey,
-                        Map.Entry::getValue,
-                        (v1, v2) -> v1));
-
-        return result;
-    }
-
-    @Path("/startRealtimePolling")
-    @POST
-    public void startPolling(String search) {
-        // use another thread for polling consumer to demonstrate that we can wait before
-        // the message is sent to the queue
-        Executors.newSingleThreadExecutor().execute(() -> {
-            String url = String.format(
-                    "splunk://realtime?username=admin&password=changeit&scheme=http&port=%d&delay=3000&initEarliestTime=rt-10s&latestTime=RAW(rt+40s)&search="
-                            + search,
-                    port);
-            SplunkEvent body = consumerTemplate.receiveBody(url, SplunkEvent.class);
-            producerTemplate.sendBody("direct:realtimePolling", body);
-        });
-    }
-
-    @Path("/submit")
+    @Path("/write/{producerType}")
     @POST
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.TEXT_PLAIN)
-    public Response submit(Map<String, String> message, @QueryParam("index") String index) throws Exception {
-        return post(message, "submit", index, null);
-    }
-
-    @Path("/stream")
-    @POST
-    @Consumes(MediaType.APPLICATION_JSON)
-    @Produces(MediaType.TEXT_PLAIN)
-    public Response stream(Map<String, String> message, @QueryParam("index") String index) throws Exception {
-        return post(message, "stream", index, null);
-    }
-
-    @Path("/tcp")
-    @POST
-    @Consumes(MediaType.APPLICATION_JSON)
-    @Produces(MediaType.TEXT_PLAIN)
-    public Response tcp(Map<String, String> message, @QueryParam("index") String index) throws Exception {
-        return post(message, "tcp", index, tcpPort);
-    }
+    public Response write(Map<String, String> message,
+            @PathParam("producerType") String producerType,
+            @QueryParam("index") String index) throws URISyntaxException {
+        if (message.containsKey("_rawData")) {
+            return writeRaw(message.get("_rawData"), producerType, index);
+        }
 
-    private Response post(Map<String, String> message, String endpoint, String index, Integer tcpPort) throws Exception {
         SplunkEvent se = new SplunkEvent();
         for (Map.Entry<String, String> e : message.entrySet()) {
             se.addPair(e.getKey(), e.getValue());
         }
 
-        String url = String.format(
-                "splunk:%s?scheme=http&port=%d&index=%s&sourceType=%s&source=%s",
-                endpoint, port, index, SOURCE_TYPE, SOURCE);
-        if (tcpPort != null) {
+        return writeRaw(se, producerType, index);
+    }
+
+    private Response writeRaw(Object message,
+            String producerType,
+            String index) throws URISyntaxException {
+        String url;
+        if (ProducerType.TCP == ProducerType.valueOf(producerType)) {
+            url = String.format(
+                    "splunk:%s?raw=%b&username=admin&password=changeit&scheme=http&port=%d&index=%s&sourceType=%s&source=%s&tcpReceiverLocalPort=%d&tcpReceiverPort=%d",
+                    producerType.toLowerCase(), !(message instanceof SplunkEvent), port, index, producerType, SOURCE,
+                    LOCAL_TCP_PORT, tcpPort);
+
+        } else {
             url = String.format(
-                    "splunk:%s?username=admin&password=changeit&scheme=http&port=%d&index=%s&sourceType=%s&source=%s&tcpReceiverLocalPort=%d&tcpReceiverPort=%d",
-                    endpoint, port, index, SOURCE_TYPE, SOURCE, LOCAL_TCP_PORT, tcpPort);
+                    "splunk:%s?raw=%b&scheme=http&port=%d&index=%s&sourceType=%s&source=%s",
+                    producerType.toLowerCase(), !(message instanceof SplunkEvent), port, index, producerType, SOURCE);
         }
-        final String response = producerTemplate.requestBody(url, se, String.class);
+        final String response = producerTemplate.requestBody(url, message, String.class);
         return Response
                 .created(new URI("https://camel.apache.org/"))
                 .entity(response)
diff --git a/integration-tests/splunk/src/test/java/org/apache/camel/quarkus/component/splunk/it/SplunkTest.java b/integration-tests/splunk/src/test/java/org/apache/camel/quarkus/component/splunk/it/SplunkTest.java
index 7280e1cc2d..37d2f7705d 100644
--- a/integration-tests/splunk/src/test/java/org/apache/camel/quarkus/component/splunk/it/SplunkTest.java
+++ b/integration-tests/splunk/src/test/java/org/apache/camel/quarkus/component/splunk/it/SplunkTest.java
@@ -16,24 +16,27 @@
  */
 package org.apache.camel.quarkus.component.splunk.it;
 
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import io.quarkus.test.common.QuarkusTestResource;
 import io.quarkus.test.junit.QuarkusTest;
 import io.restassured.RestAssured;
-import io.restassured.common.mapper.TypeRef;
 import io.restassured.http.ContentType;
-import org.apache.camel.util.CollectionHelper;
+import org.apache.camel.component.splunk.ProducerType;
 import org.eclipse.microprofile.config.ConfigProvider;
-import org.junit.jupiter.api.Assertions;
+import org.hamcrest.Matchers;
 import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.is;
 
 @QuarkusTest
@@ -41,113 +44,139 @@ import static org.hamcrest.Matchers.is;
 class SplunkTest {
 
     @Test
-    public void testWriteTcpAndReadNormal() {
-        write("_normal", SplunkTestResource.TEST_INDEX, "tcp");
-
-        List<Map<String, String>> result = RestAssured.given()
-                .contentType(ContentType.TEXT)
-                .body(String.format(
-                        "search index=%s sourcetype=%s | rex field=_raw \"Name: (?<name>.*) From: (?<from>.*)\"",
-                        SplunkTestResource.TEST_INDEX, SplunkResource.SOURCE_TYPE))
-                .post("/splunk/normal")
-                .then()
-                .statusCode(200)
-                .extract().as(new TypeRef<>() {
-                });
+    public void testNormalSearchWithSubmitWithRawData() {
+        String suffix = "_normalSearchOfSubmit";
 
-        Assertions.assertEquals(3, result.size());
-        Assertions.assertEquals("Irma_normal", result.get(0).get("name"));
-        Assertions.assertEquals("Earth\"", result.get(0).get("from"));
-        Assertions.assertEquals("Leonard_normal", result.get(1).get("name"));
-        Assertions.assertEquals("Earth 2.0\"", result.get(1).get("from"));
-        Assertions.assertEquals("Sheldon_normal", result.get(2).get("name"));
-        Assertions.assertEquals("Alpha Centauri\"", result.get(2).get("from"));
-    }
+        write(suffix, ProducerType.SUBMIT, 0, true);
 
-    @Test
-    public void testWriteSubmitAndReadRealtime() throws InterruptedException, ExecutionException {
+        Awaitility.await().pollInterval(1000, TimeUnit.MILLISECONDS).atMost(60, TimeUnit.SECONDS).until(
+                () -> {
 
-        RestAssured.given()
-                .body(String.format(
-                        "search index=%s sourcetype=%s | rex field=_raw \"Name: (?<name>.*) From: (?<from>.*)\"",
-                        SplunkTestResource.TEST_INDEX, SplunkResource.SOURCE_TYPE))
-                .post("/splunk/startRealtimePolling");
-
-        //wait some time to start polling
-        TimeUnit.SECONDS.sleep(3);
-        write("_realtime1", SplunkTestResource.TEST_INDEX, "submit");
-        TimeUnit.SECONDS.sleep(1);
-        write("_realtime2", SplunkTestResource.TEST_INDEX, "submit");
-        TimeUnit.SECONDS.sleep(1);
-        write("_realtime3", SplunkTestResource.TEST_INDEX, "submit");
-        //wait some time to gather the pulls from splunk server
-        TimeUnit.SECONDS.sleep(3);
-        //there should be some data from realtime search in direct (concrete values depends on the speed of writing into index)
-        //test is asserting that there are some
-        RestAssured.get("/splunk/directRealtimePolling")
-                .then()
-                .statusCode(200)
-                .body(containsString("_realtime"));
+                    String result = RestAssured.given()
+                            .contentType(ContentType.TEXT)
+                            .post("/splunk/results/normalSearch")
+                            .then()
+                            .statusCode(200)
+                            .extract().asString();
+
+                    return result.contains("Name: Sheldon" + suffix)
+                            && result.contains("Name: Leonard" + suffix)
+                            && result.contains("Name: Irma" + suffix);
+                });
     }
 
     @Test
-    public void testWriteStreamAndReadSaved() throws InterruptedException {
-        int defaultPort = RestAssured.port;
-        String defaultUri = RestAssured.baseURI;
-
+    public void testSavedSearchWithTcp() throws InterruptedException {
+        String suffix = "_SavedSearchOfTcp";
         //create saved search
         RestAssured.given()
                 .baseUri("http://localhost")
                 .port(ConfigProvider.getConfig().getValue(SplunkResource.PARAM_REMOTE_PORT, Integer.class))
                 .contentType(ContentType.JSON)
-                .param("name", SplunkTestResource.SAVED_SEARCH_NAME)
+                .param("name", SplunkResource.SAVED_SEARCH_NAME)
                 .param("disabled", "0")
-                .param("description", "descritionText")
+                .param("description", "descriptionText")
                 .param("search",
-                        "index=" + SplunkTestResource.TEST_INDEX + " sourcetype=" + SplunkResource.SOURCE_TYPE)
+                        "sourcetype=\"TCP\" | rex field=_raw \"Name: (?<name>.*) From: (?<from>.*)\"")
                 .post("/services/saved/searches")
                 .then()
                 .statusCode(anyOf(is(201), is(409)));
-        write("_s", SplunkTestResource.TEST_INDEX, "stream");
 
-        RestAssured.given()
-                .contentType(ContentType.TEXT)
-                .body(SplunkTestResource.SAVED_SEARCH_NAME)
-                .post("/splunk/savedSearch")
-                .then()
-                .statusCode(200)
-                .body(containsString("Name: Sheldon_s"))
-                .body(containsString("Name: Leonard_s"))
-                .body(containsString("Name: Irma_s"));
-    }
-
-    private void write(String suffix, String index, String endpoint) {
-        write(CollectionHelper.mapOf("entity", "Name: Sheldon" + suffix + " From: Alpha Centauri"), "submit",
-                index);
-        write(CollectionHelper.mapOf("entity", "Name: Leonard" + suffix + " From: Earth 2.0"), "submit",
-                index);
-        write(CollectionHelper.mapOf("entity", "Name: Irma" + suffix + " From: Earth"), "submit", index);
+        //write data via tcp
+        write(suffix, ProducerType.TCP, 0, false);
+
+        //there might by delay in receiving the data
+        Awaitility.await().pollInterval(1000, TimeUnit.MILLISECONDS).atMost(60, TimeUnit.SECONDS).until(
+                () -> {
+                    String result = RestAssured.given()
+                            .contentType(ContentType.TEXT)
+                            .post("/splunk/results/savedSearch")
+                            .then()
+                            .statusCode(200)
+                            .extract().asString();
+
+                    return result.contains("Name: Sheldon" + suffix)
+                            && result.contains("Name: Leonard" + suffix)
+                            && result.contains("Name: Irma" + suffix);
+                });
     }
 
-    private void write(Map<String, String> data, String endpoint, String index) {
+    @Test
+    public void testStreamForRealtime() throws InterruptedException, ExecutionException {
+        String suffix = "_RealtimeSearchOfStream";
+        //there is a buffer for stream writing, therefore about 1MB of data has to be written into Splunk
+
+        //data are written in separated thread
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        //execute component server to wait for the result
+        Future<Void> futureResult = executor.submit(
+                () -> {
+                    for (int i = 0; i < 5000; i++) {
+                        write(suffix + i, ProducerType.STREAM, 100, false);
+                    }
+                    return null;
+                });
 
-        String expectedResult = expectedResult(data);
+        try {
+            Awaitility.await().pollInterval(1000, TimeUnit.MILLISECONDS).atMost(60, TimeUnit.SECONDS).until(
+                    () -> {
+
+                        String result = RestAssured.given()
+                                .contentType(ContentType.TEXT)
+                                .post("/splunk/results/realtimeSearch")
+                                .then()
+                                .statusCode(200)
+                                .extract().asString();
+
+                        return result.contains("Name: Sheldon" + suffix)
+                                && result.contains("Name: Leonard" + suffix)
+                                && result.contains("Name: Irma" + suffix);
+                    });
+        } finally {
+            futureResult.cancel(true);
+        }
+    }
 
-        RestAssured.given()
+    private void write(String suffix, ProducerType producerType, int lengthOfRandomString, boolean raw) {
+        Consumer<Map<String, String>> write = data -> RestAssured.given()
                 .contentType(ContentType.JSON)
-                .queryParam("index", index)
+                .queryParam("index", SplunkTestResource.TEST_INDEX)
                 .body(data)
-                .post("/splunk/" + endpoint)
+                .post("/splunk/write/" + producerType.name())
                 .then()
                 .statusCode(201)
-                .body(containsString(expectedResult));
+                .body(Matchers.containsString(expectedResult(data)));
+
+        Map<String, String> data1;
+        Map<String, String> data2;
+        Map<String, String> data3;
+        if (raw) {
+            data1 = Map.of("_rawData", "Name: Sheldon" + suffix + " From: Alpha Centauri", "data",
+                    RandomStringUtils.randomAlphanumeric(lengthOfRandomString));
+            data2 = Map.of("_rawData", "Name: Leonard" + suffix + " From: Earth 2.0", "data",
+                    RandomStringUtils.randomAlphanumeric(lengthOfRandomString));
+            data3 = Map.of("_rawData", "Name: Irma" + suffix + " From: Earth", "data",
+                    RandomStringUtils.randomAlphanumeric(lengthOfRandomString));
+        } else {
+            data1 = Map.of("entity", "Name: Sheldon" + suffix + " From: Alpha Centauri", "data",
+                    RandomStringUtils.randomAlphanumeric(lengthOfRandomString));
+            data2 = Map.of("entity", "Name: Leonard" + suffix + " From: Earth 2.0", "data",
+                    RandomStringUtils.randomAlphanumeric(lengthOfRandomString));
+            data3 = Map.of("entity", "Name: Irma" + suffix + " From: Earth", "data",
+                    RandomStringUtils.randomAlphanumeric(lengthOfRandomString));
+        }
+
+        write.accept(data1);
+        write.accept(data2);
+        write.accept(data3);
     }
 
     private String expectedResult(Map<String, String> data) {
-        String expectedResult = data.entrySet().stream()
+        if (data.containsKey("_rawData")) {
+            return data.get("_rawData");
+        }
+        return data.entrySet().stream()
                 .map(e -> e.getKey() + "=\"" + e.getValue() + "\"")
                 .collect(Collectors.joining(" "));
-        return expectedResult;
     }
-
 }
diff --git a/integration-tests/splunk/src/test/java/org/apache/camel/quarkus/component/splunk/it/SplunkTestResource.java b/integration-tests/splunk/src/test/java/org/apache/camel/quarkus/component/splunk/it/SplunkTestResource.java
index 792ea16ac3..dee01b4a2e 100644
--- a/integration-tests/splunk/src/test/java/org/apache/camel/quarkus/component/splunk/it/SplunkTestResource.java
+++ b/integration-tests/splunk/src/test/java/org/apache/camel/quarkus/component/splunk/it/SplunkTestResource.java
@@ -21,26 +21,29 @@ import java.util.Map;
 import java.util.TimeZone;
 
 import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
-import org.apache.camel.util.CollectionHelper;
+import org.apache.camel.component.splunk.ProducerType;
+import org.apache.commons.lang3.StringUtils;
 import org.eclipse.microprofile.config.ConfigProvider;
+import org.jboss.logging.Logger;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.wait.strategy.Wait;
 
 public class SplunkTestResource implements QuarkusTestResourceLifecycleManager {
 
     public static String TEST_INDEX = "testindex";
-    public static String SAVED_SEARCH_NAME = "savedSearchForTest";
     private static final String SPLUNK_IMAGE_NAME = ConfigProvider.getConfig().getValue("splunk.container.image", String.class);
     private static final int REMOTE_PORT = 8089;
+    private static final int WEB_PORT = 8000;
+    private static final Logger LOG = Logger.getLogger(SplunkTestResource.class);
 
-    private GenericContainer container;
+    private GenericContainer<?> container;
 
     @Override
     public Map<String, String> start() {
 
         try {
-            container = new GenericContainer(SPLUNK_IMAGE_NAME)
-                    .withExposedPorts(REMOTE_PORT, SplunkResource.LOCAL_TCP_PORT)
+            container = new GenericContainer<>(SPLUNK_IMAGE_NAME)
+                    .withExposedPorts(REMOTE_PORT, SplunkResource.LOCAL_TCP_PORT, WEB_PORT)
                     .withEnv("SPLUNK_START_ARGS", "--accept-license")
                     .withEnv("SPLUNK_PASSWORD", "changeit")
                     .withEnv("SPLUNK_LICENSE_URI", "Free")
@@ -64,9 +67,14 @@ public class SplunkTestResource implements QuarkusTestResourceLifecycleManager {
             container.execInContainer("sudo", "./bin/splunk", "add", "index", TEST_INDEX);
             container.execInContainer("sudo", "./bin/splunk", "add", "tcp", String.valueOf(SplunkResource.LOCAL_TCP_PORT),
                     "-sourcetype",
-                    SplunkResource.SOURCE_TYPE);
+                    ProducerType.TCP.name());
 
-            return CollectionHelper.mapOf(
+            String banner = StringUtils.repeat("*", 50);
+            LOG.info(banner);
+            LOG.infof("Splunk UI running on: http://localhost:%d", container.getMappedPort(WEB_PORT));
+            LOG.info(banner);
+
+            return Map.of(
                     SplunkResource.PARAM_REMOTE_PORT, container.getMappedPort(REMOTE_PORT).toString(),
                     SplunkResource.PARAM_TCP_PORT, container.getMappedPort(SplunkResource.LOCAL_TCP_PORT).toString());
         } catch (Exception e) {