You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2023/08/17 10:29:20 UTC
[camel-quarkus] branch main updated: Expand Splunk test coverage
This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/main by this push:
new 85bbbccbb9 Expand Splunk test coverage
85bbbccbb9 is described below
commit 85bbbccbb933b07a5f1eeebf3c6abce536bcf21f
Author: JiriOndrusek <on...@gmail.com>
AuthorDate: Tue Aug 15 09:06:26 2023 +0200
Expand Splunk test coverage
---
.../splunk/deployment/SplunkProcessor.java | 25 +--
integration-tests/splunk/README.adoc | 6 +
.../component/splunk/it/SplunkResource.java | 165 ++++++-----------
.../quarkus/component/splunk/it/SplunkTest.java | 195 ++++++++++++---------
.../component/splunk/it/SplunkTestResource.java | 22 ++-
5 files changed, 205 insertions(+), 208 deletions(-)
diff --git a/extensions/splunk/deployment/src/main/java/org/apache/camel/quarkus/component/splunk/deployment/SplunkProcessor.java b/extensions/splunk/deployment/src/main/java/org/apache/camel/quarkus/component/splunk/deployment/SplunkProcessor.java
index c79d93282c..724d9892cb 100644
--- a/extensions/splunk/deployment/src/main/java/org/apache/camel/quarkus/component/splunk/deployment/SplunkProcessor.java
+++ b/extensions/splunk/deployment/src/main/java/org/apache/camel/quarkus/component/splunk/deployment/SplunkProcessor.java
@@ -18,10 +18,13 @@ package org.apache.camel.quarkus.component.splunk.deployment;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.LinkedList;
import java.util.List;
-import java.util.stream.Collectors;
+import com.splunk.HttpService;
+import com.splunk.Index;
+import com.splunk.Input;
+import com.splunk.SavedSearch;
+import com.splunk.Service;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem;
@@ -51,25 +54,27 @@ class SplunkProcessor {
@BuildStep
RuntimeInitializedClassBuildItem runtimeInitBcryptUtil() {
// this class uses a SecureRandom which needs to be initialised at run time
- return new RuntimeInitializedClassBuildItem("com.splunk.HttpService");
+ return new RuntimeInitializedClassBuildItem(HttpService.class.getName());
}
@BuildStep
ReflectiveClassBuildItem registerForReflection(CombinedIndexBuildItem combinedIndex) {
IndexView index = combinedIndex.getIndex();
- List<String> dtos = new LinkedList<>();
- dtos.addAll(index.getAllKnownSubclasses(DotName.createSimple("com.splunk.Input")).stream()
- .map(c -> c.name().toString()).collect(Collectors.toList()));
+ List<String> dtos = index.getAllKnownSubclasses(DotName.createSimple(Input.class))
+ .stream()
+ .map(c -> c.name().toString())
+ .toList();
- return ReflectiveClassBuildItem.builder(dtos.toArray(new String[dtos.size()])).build();
+ return ReflectiveClassBuildItem.builder(dtos.toArray(new String[0])).build();
}
@BuildStep
List<ReflectiveClassBuildItem> reflectiveClasses() {
- return Arrays.asList(ReflectiveClassBuildItem.builder("com.splunk.Index").build(),
- ReflectiveClassBuildItem.builder("com.splunk.SavedSearch").build(),
- ReflectiveClassBuildItem.builder("com.splunk.Service").build());
+ return Arrays.asList(ReflectiveClassBuildItem.builder(Index.class.getName()).constructors().build(),
+ ReflectiveClassBuildItem.builder(SavedSearch.class.getName()).constructors().build(),
+ ReflectiveClassBuildItem.builder(Input.class.getName()).constructors().build(),
+ ReflectiveClassBuildItem.builder(Service.class.getName()).constructors().build());
}
@BuildStep
diff --git a/integration-tests/splunk/README.adoc b/integration-tests/splunk/README.adoc
new file mode 100644
index 0000000000..f69cb4ec8a
--- /dev/null
+++ b/integration-tests/splunk/README.adoc
@@ -0,0 +1,6 @@
+=== Splunk Web UI
+
+The Splunk UI is helpful for the testing as you can see (and search) for data inside Splunk. The URL to the Splunk UI
+is output to the console after the test container starts. Watch for the text 'Splunk UI running on'.
+
+You may want to delay the Splunk container shutdown either by adding a `Thread.sleep` into `SplunkTestResource.stop()` or commenting out `container.stop()`.
diff --git a/integration-tests/splunk/src/main/java/org/apache/camel/quarkus/component/splunk/it/SplunkResource.java b/integration-tests/splunk/src/main/java/org/apache/camel/quarkus/component/splunk/it/SplunkResource.java
index 2b762053ab..afe65d099f 100644
--- a/integration-tests/splunk/src/main/java/org/apache/camel/quarkus/component/splunk/it/SplunkResource.java
+++ b/integration-tests/splunk/src/main/java/org/apache/camel/quarkus/component/splunk/it/SplunkResource.java
@@ -17,27 +17,26 @@
package org.apache.camel.quarkus.component.splunk.it;
import java.net.URI;
-import java.util.Arrays;
-import java.util.Collections;
+import java.net.URISyntaxException;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.ws.rs.Consumes;
-import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
+import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
-import org.apache.camel.CamelContext;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.splunk.ProducerType;
import org.apache.camel.component.splunk.SplunkComponent;
import org.apache.camel.component.splunk.SplunkConfiguration;
import org.apache.camel.component.splunk.event.SplunkEvent;
@@ -47,10 +46,10 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
@ApplicationScoped
public class SplunkResource {
+ public static final String SAVED_SEARCH_NAME = "savedSearchForTest";
public static final String PARAM_REMOTE_PORT = "org.apache.camel.quarkus.component.splunk.it.SplunkResource_remotePort";
public static final String PARAM_TCP_PORT = "org.apache.camel.quarkus.component.splunk.it.SplunkResource_tcpPort";
public static final String SOURCE = "test";
- public static final String SOURCE_TYPE = "testSource";
public static final int LOCAL_TCP_PORT = 9998;
@Inject
@@ -65,54 +64,44 @@ public class SplunkResource {
@ConfigProperty(name = PARAM_TCP_PORT)
Integer tcpPort;
- @Inject
- CamelContext camelContext;
-
@Named
SplunkComponent splunk() {
SplunkComponent component = new SplunkComponent();
component.setSplunkConfigurationFactory(parameters -> new SplunkConfiguration());
-
return component;
}
- @Path("/normal")
+ @Path("/results/{name}")
@POST
- @Produces(MediaType.APPLICATION_JSON)
- public List normal(String search) throws Exception {
- String url = String.format(
- "splunk://normal?username=admin&password=changeit&scheme=http&port=%d&delay=5000&initEarliestTime=-10s&search="
- + search,
- port);
-
- final SplunkEvent m1 = consumerTemplate.receiveBody(url, 1000, SplunkEvent.class);
- final SplunkEvent m2 = consumerTemplate.receiveBody(url, 1000, SplunkEvent.class);
- final SplunkEvent m3 = consumerTemplate.receiveBody(url, 1000, SplunkEvent.class);
-
- List result = Arrays.stream(new SplunkEvent[] { m1, m2, m3 })
- .map(m -> m.getEventData().entrySet().stream()
- .filter(e -> !e.getKey().startsWith("_"))
- .collect(Collectors.toMap(
- Map.Entry::getKey,
- Map.Entry::getValue,
- (v1, v2) -> v1)))
- .collect(Collectors.toList());
-
- return result;
- }
+ public String results(@PathParam("name") String mapName) throws Exception {
+ String url;
+ int count = 3;
- @Path("/savedSearch")
- @POST
- public String savedSearch(String name) throws Exception {
- String url = String.format(
- "splunk://savedsearch?username=admin&password=changeit&scheme=http&port=%d&delay=500&initEarliestTime=-1m&savedsearch=%s",
- port, name);
-
- final SplunkEvent m1 = consumerTemplate.receiveBody(url, 5000, SplunkEvent.class);
- final SplunkEvent m2 = consumerTemplate.receiveBody(url, 1000, SplunkEvent.class);
- final SplunkEvent m3 = consumerTemplate.receiveBody(url, 1000, SplunkEvent.class);
+ if ("savedSearch".equals(mapName)) {
+ url = String.format(
+ "splunk://savedsearch?username=admin&password=changeit&scheme=http&port=%d&delay=500&initEarliestTime=-10m&savedsearch=%s",
+ port, SAVED_SEARCH_NAME);
+ } else if ("normalSearch".equals(mapName)) {
+ url = String.format(
+ "splunk://normal?username=admin&password=changeit&scheme=http&port=%d&delay=5000&initEarliestTime=-10s&search="
+ + "search sourcetype=\"SUBMIT\" | rex field=_raw \"Name: (?<name>.*) From: (?<from>.*)\"",
+ port);
+ } else {
+ url = String.format(
+ "splunk://realtime?username=admin&password=changeit&scheme=http&port=%d&delay=3000&initEarliestTime=rt-10s&latestTime=RAW(rt+40s)&search="
+ + "search sourcetype=\"STREAM\" | rex field=_raw \"Name: (?<name>.*) From: (?<from>.*)\"",
+ port, ProducerType.STREAM.name());
+ }
- List result = Arrays.stream(new SplunkEvent[] { m1, m2, m3 })
+ List<SplunkEvent> events = events = new LinkedList<>();
+ for (int i = 0; i < count; i++) {
+ SplunkEvent se = consumerTemplate.receiveBody(url, 5000, SplunkEvent.class);
+ if (se == null) {
+ break;
+ }
+ events.add(se);
+ }
+ List result = events.stream()
.map(m -> {
if (m == null) {
return "null";
@@ -120,84 +109,44 @@ public class SplunkResource {
return m.getEventData().get("_raw");
})
.collect(Collectors.toList());
-
return result.toString();
}
- @Path("/directRealtimePolling")
- @GET
- @Produces(MediaType.APPLICATION_JSON)
- public Map directRealtimePolling() throws Exception {
- final SplunkEvent m1 = consumerTemplate.receiveBody("direct:realtimePolling", 3000, SplunkEvent.class);
-
- if (m1 == null) {
- return Collections.emptyMap();
- }
-
- Map result = m1.getEventData().entrySet().stream()
- .filter(e -> !e.getKey().startsWith("_"))
- .collect(Collectors.toMap(
- Map.Entry::getKey,
- Map.Entry::getValue,
- (v1, v2) -> v1));
-
- return result;
- }
-
- @Path("/startRealtimePolling")
- @POST
- public void startPolling(String search) {
- // use another thread for polling consumer to demonstrate that we can wait before
- // the message is sent to the queue
- Executors.newSingleThreadExecutor().execute(() -> {
- String url = String.format(
- "splunk://realtime?username=admin&password=changeit&scheme=http&port=%d&delay=3000&initEarliestTime=rt-10s&latestTime=RAW(rt+40s)&search="
- + search,
- port);
- SplunkEvent body = consumerTemplate.receiveBody(url, SplunkEvent.class);
- producerTemplate.sendBody("direct:realtimePolling", body);
- });
- }
-
- @Path("/submit")
+ @Path("/write/{producerType}")
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.TEXT_PLAIN)
- public Response submit(Map<String, String> message, @QueryParam("index") String index) throws Exception {
- return post(message, "submit", index, null);
- }
-
- @Path("/stream")
- @POST
- @Consumes(MediaType.APPLICATION_JSON)
- @Produces(MediaType.TEXT_PLAIN)
- public Response stream(Map<String, String> message, @QueryParam("index") String index) throws Exception {
- return post(message, "stream", index, null);
- }
-
- @Path("/tcp")
- @POST
- @Consumes(MediaType.APPLICATION_JSON)
- @Produces(MediaType.TEXT_PLAIN)
- public Response tcp(Map<String, String> message, @QueryParam("index") String index) throws Exception {
- return post(message, "tcp", index, tcpPort);
- }
+ public Response write(Map<String, String> message,
+ @PathParam("producerType") String producerType,
+ @QueryParam("index") String index) throws URISyntaxException {
+ if (message.containsKey("_rawData")) {
+ return writeRaw(message.get("_rawData"), producerType, index);
+ }
- private Response post(Map<String, String> message, String endpoint, String index, Integer tcpPort) throws Exception {
SplunkEvent se = new SplunkEvent();
for (Map.Entry<String, String> e : message.entrySet()) {
se.addPair(e.getKey(), e.getValue());
}
- String url = String.format(
- "splunk:%s?scheme=http&port=%d&index=%s&sourceType=%s&source=%s",
- endpoint, port, index, SOURCE_TYPE, SOURCE);
- if (tcpPort != null) {
+ return writeRaw(se, producerType, index);
+ }
+
+ private Response writeRaw(Object message,
+ String producerType,
+ String index) throws URISyntaxException {
+ String url;
+ if (ProducerType.TCP == ProducerType.valueOf(producerType)) {
+ url = String.format(
+ "splunk:%s?raw=%b&username=admin&password=changeit&scheme=http&port=%d&index=%s&sourceType=%s&source=%s&tcpReceiverLocalPort=%d&tcpReceiverPort=%d",
+ producerType.toLowerCase(), !(message instanceof SplunkEvent), port, index, producerType, SOURCE,
+ LOCAL_TCP_PORT, tcpPort);
+
+ } else {
url = String.format(
- "splunk:%s?username=admin&password=changeit&scheme=http&port=%d&index=%s&sourceType=%s&source=%s&tcpReceiverLocalPort=%d&tcpReceiverPort=%d",
- endpoint, port, index, SOURCE_TYPE, SOURCE, LOCAL_TCP_PORT, tcpPort);
+ "splunk:%s?raw=%b&scheme=http&port=%d&index=%s&sourceType=%s&source=%s",
+ producerType.toLowerCase(), !(message instanceof SplunkEvent), port, index, producerType, SOURCE);
}
- final String response = producerTemplate.requestBody(url, se, String.class);
+ final String response = producerTemplate.requestBody(url, message, String.class);
return Response
.created(new URI("https://camel.apache.org/"))
.entity(response)
diff --git a/integration-tests/splunk/src/test/java/org/apache/camel/quarkus/component/splunk/it/SplunkTest.java b/integration-tests/splunk/src/test/java/org/apache/camel/quarkus/component/splunk/it/SplunkTest.java
index 7280e1cc2d..37d2f7705d 100644
--- a/integration-tests/splunk/src/test/java/org/apache/camel/quarkus/component/splunk/it/SplunkTest.java
+++ b/integration-tests/splunk/src/test/java/org/apache/camel/quarkus/component/splunk/it/SplunkTest.java
@@ -16,24 +16,27 @@
*/
package org.apache.camel.quarkus.component.splunk.it;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
-import io.restassured.common.mapper.TypeRef;
import io.restassured.http.ContentType;
-import org.apache.camel.util.CollectionHelper;
+import org.apache.camel.component.splunk.ProducerType;
import org.eclipse.microprofile.config.ConfigProvider;
-import org.junit.jupiter.api.Assertions;
+import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
@QuarkusTest
@@ -41,113 +44,139 @@ import static org.hamcrest.Matchers.is;
class SplunkTest {
@Test
- public void testWriteTcpAndReadNormal() {
- write("_normal", SplunkTestResource.TEST_INDEX, "tcp");
-
- List<Map<String, String>> result = RestAssured.given()
- .contentType(ContentType.TEXT)
- .body(String.format(
- "search index=%s sourcetype=%s | rex field=_raw \"Name: (?<name>.*) From: (?<from>.*)\"",
- SplunkTestResource.TEST_INDEX, SplunkResource.SOURCE_TYPE))
- .post("/splunk/normal")
- .then()
- .statusCode(200)
- .extract().as(new TypeRef<>() {
- });
+ public void testNormalSearchWithSubmitWithRawData() {
+ String suffix = "_normalSearchOfSubmit";
- Assertions.assertEquals(3, result.size());
- Assertions.assertEquals("Irma_normal", result.get(0).get("name"));
- Assertions.assertEquals("Earth\"", result.get(0).get("from"));
- Assertions.assertEquals("Leonard_normal", result.get(1).get("name"));
- Assertions.assertEquals("Earth 2.0\"", result.get(1).get("from"));
- Assertions.assertEquals("Sheldon_normal", result.get(2).get("name"));
- Assertions.assertEquals("Alpha Centauri\"", result.get(2).get("from"));
- }
+ write(suffix, ProducerType.SUBMIT, 0, true);
- @Test
- public void testWriteSubmitAndReadRealtime() throws InterruptedException, ExecutionException {
+ Awaitility.await().pollInterval(1000, TimeUnit.MILLISECONDS).atMost(60, TimeUnit.SECONDS).until(
+ () -> {
- RestAssured.given()
- .body(String.format(
- "search index=%s sourcetype=%s | rex field=_raw \"Name: (?<name>.*) From: (?<from>.*)\"",
- SplunkTestResource.TEST_INDEX, SplunkResource.SOURCE_TYPE))
- .post("/splunk/startRealtimePolling");
-
- //wait some time to start polling
- TimeUnit.SECONDS.sleep(3);
- write("_realtime1", SplunkTestResource.TEST_INDEX, "submit");
- TimeUnit.SECONDS.sleep(1);
- write("_realtime2", SplunkTestResource.TEST_INDEX, "submit");
- TimeUnit.SECONDS.sleep(1);
- write("_realtime3", SplunkTestResource.TEST_INDEX, "submit");
- //wait some time to gather the pulls from splunk server
- TimeUnit.SECONDS.sleep(3);
- //there should be some data from realtime search in direct (concrete values depends on the speed of writing into index)
- //test is asserting that there are some
- RestAssured.get("/splunk/directRealtimePolling")
- .then()
- .statusCode(200)
- .body(containsString("_realtime"));
+ String result = RestAssured.given()
+ .contentType(ContentType.TEXT)
+ .post("/splunk/results/normalSearch")
+ .then()
+ .statusCode(200)
+ .extract().asString();
+
+ return result.contains("Name: Sheldon" + suffix)
+ && result.contains("Name: Leonard" + suffix)
+ && result.contains("Name: Irma" + suffix);
+ });
}
@Test
- public void testWriteStreamAndReadSaved() throws InterruptedException {
- int defaultPort = RestAssured.port;
- String defaultUri = RestAssured.baseURI;
-
+ public void testSavedSearchWithTcp() throws InterruptedException {
+ String suffix = "_SavedSearchOfTcp";
//create saved search
RestAssured.given()
.baseUri("http://localhost")
.port(ConfigProvider.getConfig().getValue(SplunkResource.PARAM_REMOTE_PORT, Integer.class))
.contentType(ContentType.JSON)
- .param("name", SplunkTestResource.SAVED_SEARCH_NAME)
+ .param("name", SplunkResource.SAVED_SEARCH_NAME)
.param("disabled", "0")
- .param("description", "descritionText")
+ .param("description", "descriptionText")
.param("search",
- "index=" + SplunkTestResource.TEST_INDEX + " sourcetype=" + SplunkResource.SOURCE_TYPE)
+ "sourcetype=\"TCP\" | rex field=_raw \"Name: (?<name>.*) From: (?<from>.*)\"")
.post("/services/saved/searches")
.then()
.statusCode(anyOf(is(201), is(409)));
- write("_s", SplunkTestResource.TEST_INDEX, "stream");
- RestAssured.given()
- .contentType(ContentType.TEXT)
- .body(SplunkTestResource.SAVED_SEARCH_NAME)
- .post("/splunk/savedSearch")
- .then()
- .statusCode(200)
- .body(containsString("Name: Sheldon_s"))
- .body(containsString("Name: Leonard_s"))
- .body(containsString("Name: Irma_s"));
- }
-
- private void write(String suffix, String index, String endpoint) {
- write(CollectionHelper.mapOf("entity", "Name: Sheldon" + suffix + " From: Alpha Centauri"), "submit",
- index);
- write(CollectionHelper.mapOf("entity", "Name: Leonard" + suffix + " From: Earth 2.0"), "submit",
- index);
- write(CollectionHelper.mapOf("entity", "Name: Irma" + suffix + " From: Earth"), "submit", index);
+ //write data via tcp
+ write(suffix, ProducerType.TCP, 0, false);
+
+ //there might by delay in receiving the data
+ Awaitility.await().pollInterval(1000, TimeUnit.MILLISECONDS).atMost(60, TimeUnit.SECONDS).until(
+ () -> {
+ String result = RestAssured.given()
+ .contentType(ContentType.TEXT)
+ .post("/splunk/results/savedSearch")
+ .then()
+ .statusCode(200)
+ .extract().asString();
+
+ return result.contains("Name: Sheldon" + suffix)
+ && result.contains("Name: Leonard" + suffix)
+ && result.contains("Name: Irma" + suffix);
+ });
}
- private void write(Map<String, String> data, String endpoint, String index) {
+ @Test
+ public void testStreamForRealtime() throws InterruptedException, ExecutionException {
+ String suffix = "_RealtimeSearchOfStream";
+ //there is a buffer for stream writing, therefore about 1MB of data has to be written into Splunk
+
+ //data are written in separated thread
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ //execute component server to wait for the result
+ Future<Void> futureResult = executor.submit(
+ () -> {
+ for (int i = 0; i < 5000; i++) {
+ write(suffix + i, ProducerType.STREAM, 100, false);
+ }
+ return null;
+ });
- String expectedResult = expectedResult(data);
+ try {
+ Awaitility.await().pollInterval(1000, TimeUnit.MILLISECONDS).atMost(60, TimeUnit.SECONDS).until(
+ () -> {
+
+ String result = RestAssured.given()
+ .contentType(ContentType.TEXT)
+ .post("/splunk/results/realtimeSearch")
+ .then()
+ .statusCode(200)
+ .extract().asString();
+
+ return result.contains("Name: Sheldon" + suffix)
+ && result.contains("Name: Leonard" + suffix)
+ && result.contains("Name: Irma" + suffix);
+ });
+ } finally {
+ futureResult.cancel(true);
+ }
+ }
- RestAssured.given()
+ private void write(String suffix, ProducerType producerType, int lengthOfRandomString, boolean raw) {
+ Consumer<Map<String, String>> write = data -> RestAssured.given()
.contentType(ContentType.JSON)
- .queryParam("index", index)
+ .queryParam("index", SplunkTestResource.TEST_INDEX)
.body(data)
- .post("/splunk/" + endpoint)
+ .post("/splunk/write/" + producerType.name())
.then()
.statusCode(201)
- .body(containsString(expectedResult));
+ .body(Matchers.containsString(expectedResult(data)));
+
+ Map<String, String> data1;
+ Map<String, String> data2;
+ Map<String, String> data3;
+ if (raw) {
+ data1 = Map.of("_rawData", "Name: Sheldon" + suffix + " From: Alpha Centauri", "data",
+ RandomStringUtils.randomAlphanumeric(lengthOfRandomString));
+ data2 = Map.of("_rawData", "Name: Leonard" + suffix + " From: Earth 2.0", "data",
+ RandomStringUtils.randomAlphanumeric(lengthOfRandomString));
+ data3 = Map.of("_rawData", "Name: Irma" + suffix + " From: Earth", "data",
+ RandomStringUtils.randomAlphanumeric(lengthOfRandomString));
+ } else {
+ data1 = Map.of("entity", "Name: Sheldon" + suffix + " From: Alpha Centauri", "data",
+ RandomStringUtils.randomAlphanumeric(lengthOfRandomString));
+ data2 = Map.of("entity", "Name: Leonard" + suffix + " From: Earth 2.0", "data",
+ RandomStringUtils.randomAlphanumeric(lengthOfRandomString));
+ data3 = Map.of("entity", "Name: Irma" + suffix + " From: Earth", "data",
+ RandomStringUtils.randomAlphanumeric(lengthOfRandomString));
+ }
+
+ write.accept(data1);
+ write.accept(data2);
+ write.accept(data3);
}
private String expectedResult(Map<String, String> data) {
- String expectedResult = data.entrySet().stream()
+ if (data.containsKey("_rawData")) {
+ return data.get("_rawData");
+ }
+ return data.entrySet().stream()
.map(e -> e.getKey() + "=\"" + e.getValue() + "\"")
.collect(Collectors.joining(" "));
- return expectedResult;
}
-
}
diff --git a/integration-tests/splunk/src/test/java/org/apache/camel/quarkus/component/splunk/it/SplunkTestResource.java b/integration-tests/splunk/src/test/java/org/apache/camel/quarkus/component/splunk/it/SplunkTestResource.java
index 792ea16ac3..dee01b4a2e 100644
--- a/integration-tests/splunk/src/test/java/org/apache/camel/quarkus/component/splunk/it/SplunkTestResource.java
+++ b/integration-tests/splunk/src/test/java/org/apache/camel/quarkus/component/splunk/it/SplunkTestResource.java
@@ -21,26 +21,29 @@ import java.util.Map;
import java.util.TimeZone;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
-import org.apache.camel.util.CollectionHelper;
+import org.apache.camel.component.splunk.ProducerType;
+import org.apache.commons.lang3.StringUtils;
import org.eclipse.microprofile.config.ConfigProvider;
+import org.jboss.logging.Logger;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
public class SplunkTestResource implements QuarkusTestResourceLifecycleManager {
public static String TEST_INDEX = "testindex";
- public static String SAVED_SEARCH_NAME = "savedSearchForTest";
private static final String SPLUNK_IMAGE_NAME = ConfigProvider.getConfig().getValue("splunk.container.image", String.class);
private static final int REMOTE_PORT = 8089;
+ private static final int WEB_PORT = 8000;
+ private static final Logger LOG = Logger.getLogger(SplunkTestResource.class);
- private GenericContainer container;
+ private GenericContainer<?> container;
@Override
public Map<String, String> start() {
try {
- container = new GenericContainer(SPLUNK_IMAGE_NAME)
- .withExposedPorts(REMOTE_PORT, SplunkResource.LOCAL_TCP_PORT)
+ container = new GenericContainer<>(SPLUNK_IMAGE_NAME)
+ .withExposedPorts(REMOTE_PORT, SplunkResource.LOCAL_TCP_PORT, WEB_PORT)
.withEnv("SPLUNK_START_ARGS", "--accept-license")
.withEnv("SPLUNK_PASSWORD", "changeit")
.withEnv("SPLUNK_LICENSE_URI", "Free")
@@ -64,9 +67,14 @@ public class SplunkTestResource implements QuarkusTestResourceLifecycleManager {
container.execInContainer("sudo", "./bin/splunk", "add", "index", TEST_INDEX);
container.execInContainer("sudo", "./bin/splunk", "add", "tcp", String.valueOf(SplunkResource.LOCAL_TCP_PORT),
"-sourcetype",
- SplunkResource.SOURCE_TYPE);
+ ProducerType.TCP.name());
- return CollectionHelper.mapOf(
+ String banner = StringUtils.repeat("*", 50);
+ LOG.info(banner);
+ LOG.infof("Splunk UI running on: http://localhost:%d", container.getMappedPort(WEB_PORT));
+ LOG.info(banner);
+
+ return Map.of(
SplunkResource.PARAM_REMOTE_PORT, container.getMappedPort(REMOTE_PORT).toString(),
SplunkResource.PARAM_TCP_PORT, container.getMappedPort(SplunkResource.LOCAL_TCP_PORT).toString());
} catch (Exception e) {