You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2020/10/15 06:19:23 UTC
[camel-quarkus] branch master updated: LevelDB native support #1839
This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/master by this push:
new 4c7e016 LevelDB native support #1839
4c7e016 is described below
commit 4c7e016f65be1b25443990b10ba7e7497eb2f1d6
Author: JiriOndrusek <on...@gmail.com>
AuthorDate: Fri Oct 9 15:27:44 2020 +0200
LevelDB native support #1839
---
.../ROOT/pages/reference/extensions/leveldb.adoc | 20 ++-
.../ROOT/partials/reference/others/leveldb.adoc | 6 +-
.../component/leveldb/it/LeveldbResource.java | 48 ------
extensions-jvm/pom.xml | 1 -
.../leveldb/deployment/pom.xml | 4 +
.../leveldb/deployment/LeveldbProcessor.java | 33 +++--
{extensions-jvm => extensions}/leveldb/pom.xml | 1 -
.../leveldb/runtime/pom.xml | 26 ++++
.../leveldb/runtime/src/main/doc/limitations.adoc | 8 +
.../component/leveldb/MMapLogWriterSubstitute.java | 32 ++--
.../component/leveldb/MMapTableSubstitute.java | 68 +++++++++
.../component/leveldb/ObjectCodecSubstitute.java | 64 ++++++++
.../leveldb/OriginalByteBufferSupport.java | 69 +++++++++
.../main/resources/META-INF/quarkus-extension.yaml | 3 +-
extensions/pom.xml | 1 +
.../leveldb}/pom.xml | 79 +++++++++-
.../component/leveldb/it/LeveldbResource.java | 135 +++++++++++++++++
.../component/leveldb/it/LeveldbRouteBuilder.java | 125 ++++++++++++++++
.../quarkus/component/leveldb/it/LeveldbIT.java | 19 +--
.../quarkus/component/leveldb/it/LeveldbTest.java | 161 +++++++++++++++++++++
integration-tests/pom.xml | 1 +
tooling/scripts/test-categories.yaml | 1 +
22 files changed, 802 insertions(+), 103 deletions(-)
diff --git a/docs/modules/ROOT/pages/reference/extensions/leveldb.adoc b/docs/modules/ROOT/pages/reference/extensions/leveldb.adoc
index ccd8c12..e240947 100644
--- a/docs/modules/ROOT/pages/reference/extensions/leveldb.adoc
+++ b/docs/modules/ROOT/pages/reference/extensions/leveldb.adoc
@@ -2,15 +2,15 @@
// This file was generated by camel-quarkus-maven-plugin:update-extension-doc-page
= LevelDB
:cq-artifact-id: camel-quarkus-leveldb
-:cq-native-supported: false
-:cq-status: Preview
+:cq-native-supported: true
+:cq-status: Stable
:cq-description: Using LevelDB as persistent EIP store
:cq-deprecated: false
:cq-jvm-since: 1.2.0
-:cq-native-since: n/a
+:cq-native-since: 1.2.0
[.badges]
-[.badge-key]##JVM since##[.badge-supported]##1.2.0## [.badge-key]##Native##[.badge-unsupported]##unsupported##
+[.badge-key]##JVM since##[.badge-supported]##1.2.0## [.badge-key]##Native since##[.badge-supported]##1.2.0##
Using LevelDB as persistent EIP store
@@ -31,3 +31,15 @@ Please refer to the above link for usage and configuration details.
----
Check the xref:user-guide/index.adoc[User guide] for more information about writing Camel Quarkus applications.
+
+== Camel Quarkus limitations
+
+In native mode extension uses port of the LevelDB written in Java (https://github.com/dain/leveldb#leveldb-in-java[documentation]),
+which is within 10% of the performance of the C++ original. Please upvote https://github.com/apache/camel-quarkus/issues/1911[this issue]
+if you do not like the present state.
+
+This extension does not suport binary payloads in native mode. Object serialization does not work in native mode
+(see https://github.com/oracle/graal/issues/460[issue]). Extension uses Jackson serializaton/deserialization, which brings
+this limitation. Problem will be solved as soon as camel-leveldb component will be refactored to use Jackson and custom
+serializers (see https://issues.apache.org/jira/browse/CAMEL-15679[issue])
+
diff --git a/docs/modules/ROOT/partials/reference/others/leveldb.adoc b/docs/modules/ROOT/partials/reference/others/leveldb.adoc
index fa67de7..0500f69 100644
--- a/docs/modules/ROOT/partials/reference/others/leveldb.adoc
+++ b/docs/modules/ROOT/partials/reference/others/leveldb.adoc
@@ -2,11 +2,11 @@
// This file was generated by camel-quarkus-maven-plugin:update-extension-doc-page
:cq-artifact-id: camel-quarkus-leveldb
:cq-artifact-id-base: leveldb
-:cq-native-supported: false
-:cq-status: Preview
+:cq-native-supported: true
+:cq-status: Stable
:cq-deprecated: false
:cq-jvm-since: 1.2.0
-:cq-native-since: n/a
+:cq-native-since: 1.2.0
:cq-camel-part-name: leveldb
:cq-camel-part-title: LevelDB
:cq-camel-part-description: Using LevelDB as persistent EIP store
diff --git a/extensions-jvm/leveldb/integration-test/src/main/java/org/apache/camel/quarkus/component/leveldb/it/LeveldbResource.java b/extensions-jvm/leveldb/integration-test/src/main/java/org/apache/camel/quarkus/component/leveldb/it/LeveldbResource.java
deleted file mode 100644
index 7832901..0000000
--- a/extensions-jvm/leveldb/integration-test/src/main/java/org/apache/camel/quarkus/component/leveldb/it/LeveldbResource.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.quarkus.component.leveldb.it;
-
-import javax.enterprise.context.ApplicationScoped;
-import javax.inject.Inject;
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
-import org.apache.camel.CamelContext;
-import org.jboss.logging.Logger;
-
-@Path("/leveldb")
-@ApplicationScoped
-public class LeveldbResource {
-
- private static final Logger LOG = Logger.getLogger(LeveldbResource.class);
-
- private static final String OTHER_LEVELDB = "leveldb";
- @Inject
- CamelContext context;
-
- @Path("/load/other/leveldb")
- @GET
- @Produces(MediaType.TEXT_PLAIN)
- public Response loadOtherLeveldb() throws Exception {
- /* This is an autogenerated test */
- /* No way to test a Camel artifact of kind "other" */
- return Response.ok().build();
- }
-}
diff --git a/extensions-jvm/pom.xml b/extensions-jvm/pom.xml
index 8e13181..fb24f32 100644
--- a/extensions-jvm/pom.xml
+++ b/extensions-jvm/pom.xml
@@ -100,7 +100,6 @@
<module>language</module>
<module>ldap</module>
<module>ldif</module>
- <module>leveldb</module>
<module>lra</module>
<module>lucene</module>
<module>lumberjack</module>
diff --git a/extensions-jvm/leveldb/deployment/pom.xml b/extensions/leveldb/deployment/pom.xml
similarity index 94%
rename from extensions-jvm/leveldb/deployment/pom.xml
rename to extensions/leveldb/deployment/pom.xml
index 864b637..2baf674 100644
--- a/extensions-jvm/leveldb/deployment/pom.xml
+++ b/extensions/leveldb/deployment/pom.xml
@@ -40,6 +40,10 @@
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-leveldb</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-jackson-deployment</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/extensions-jvm/leveldb/deployment/src/main/java/org/apache/camel/quarkus/component/leveldb/deployment/LeveldbProcessor.java b/extensions/leveldb/deployment/src/main/java/org/apache/camel/quarkus/component/leveldb/deployment/LeveldbProcessor.java
similarity index 56%
rename from extensions-jvm/leveldb/deployment/src/main/java/org/apache/camel/quarkus/component/leveldb/deployment/LeveldbProcessor.java
rename to extensions/leveldb/deployment/src/main/java/org/apache/camel/quarkus/component/leveldb/deployment/LeveldbProcessor.java
index 8af086f..cbbe9fb 100644
--- a/extensions-jvm/leveldb/deployment/src/main/java/org/apache/camel/quarkus/component/leveldb/deployment/LeveldbProcessor.java
+++ b/extensions/leveldb/deployment/src/main/java/org/apache/camel/quarkus/component/leveldb/deployment/LeveldbProcessor.java
@@ -16,17 +16,16 @@
*/
package org.apache.camel.quarkus.component.leveldb.deployment;
+import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
-import io.quarkus.deployment.annotations.ExecutionTime;
-import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.FeatureBuildItem;
-import io.quarkus.deployment.pkg.steps.NativeBuild;
-import org.apache.camel.quarkus.core.JvmOnlyRecorder;
-import org.jboss.logging.Logger;
+import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
+import io.quarkus.deployment.builditem.nativeimage.RuntimeInitializedClassBuildItem;
+import org.apache.camel.support.DefaultExchangeHolder;
+import org.iq80.leveldb.impl.Iq80DBFactory;
class LeveldbProcessor {
- private static final Logger LOG = Logger.getLogger(LeveldbProcessor.class);
private static final String FEATURE = "camel-leveldb";
@BuildStep
@@ -34,13 +33,19 @@ class LeveldbProcessor {
return new FeatureBuildItem(FEATURE);
}
- /**
- * Remove this once this extension starts supporting the native mode.
- */
- @BuildStep(onlyIf = NativeBuild.class)
- @Record(value = ExecutionTime.RUNTIME_INIT)
- void warnJvmInNative(JvmOnlyRecorder recorder) {
- JvmOnlyRecorder.warnJvmInNative(LOG, FEATURE); // warn at build time
- recorder.warnJvmInNative(FEATURE); // warn at runtime
+ @BuildStep
+ ReflectiveClassBuildItem registerForReflection() {
+ return new ReflectiveClassBuildItem(false, false, Iq80DBFactory.class.getName());
+ }
+
+ @BuildStep
+ ReflectiveClassBuildItem registerForReflectionWithFields() {
+ return new ReflectiveClassBuildItem(false, true, DefaultExchangeHolder.class.getName());
+ }
+
+ @BuildStep
+ public void registerRuntimeInitializedClasses(BuildProducer<RuntimeInitializedClassBuildItem> resource) {
+ resource.produce(new RuntimeInitializedClassBuildItem(
+ org.iq80.leveldb.table.Table.class.getName()));
}
}
diff --git a/extensions-jvm/leveldb/pom.xml b/extensions/leveldb/pom.xml
similarity index 97%
rename from extensions-jvm/leveldb/pom.xml
rename to extensions/leveldb/pom.xml
index e65ceb9..80905ac 100644
--- a/extensions-jvm/leveldb/pom.xml
+++ b/extensions/leveldb/pom.xml
@@ -35,6 +35,5 @@
<modules>
<module>deployment</module>
<module>runtime</module>
- <module>integration-test</module>
</modules>
</project>
diff --git a/extensions-jvm/leveldb/runtime/pom.xml b/extensions/leveldb/runtime/pom.xml
similarity index 78%
rename from extensions-jvm/leveldb/runtime/pom.xml
rename to extensions/leveldb/runtime/pom.xml
index 8209bf8..16efe7c 100644
--- a/extensions-jvm/leveldb/runtime/pom.xml
+++ b/extensions/leveldb/runtime/pom.xml
@@ -34,6 +34,7 @@
<properties>
<camel.quarkus.jvmSince>1.2.0</camel.quarkus.jvmSince>
+ <camel.quarkus.nativeSince>1.2.0</camel.quarkus.nativeSince>
</properties>
<dependencyManagement>
@@ -56,6 +57,31 @@
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-leveldb</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>leveldbjni-all</artifactId>
+ <groupId>org.fusesource.leveldbjni</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-leveldb</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-jackson</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.graalvm.nativeimage</groupId>
+ <artifactId>svm</artifactId>
+ <scope>provided</scope>
</dependency>
</dependencies>
diff --git a/extensions/leveldb/runtime/src/main/doc/limitations.adoc b/extensions/leveldb/runtime/src/main/doc/limitations.adoc
new file mode 100644
index 0000000..3d13688
--- /dev/null
+++ b/extensions/leveldb/runtime/src/main/doc/limitations.adoc
@@ -0,0 +1,8 @@
+In native mode extension uses port of the LevelDB written in Java (https://github.com/dain/leveldb#leveldb-in-java[documentation]),
+which is within 10% of the performance of the C++ original. Please upvote https://github.com/apache/camel-quarkus/issues/1911[this issue]
+if you do not like the present state.
+
+This extension does not suport binary payloads in native mode. Object serialization does not work in native mode
+(see https://github.com/oracle/graal/issues/460[issue]). Extension uses Jackson serializaton/deserialization, which brings
+this limitation. Problem will be solved as soon as camel-leveldb component will be refactored to use Jackson and custom
+serializers (see https://issues.apache.org/jira/browse/CAMEL-15679[issue])
\ No newline at end of file
diff --git a/extensions-jvm/leveldb/integration-test/src/test/java/org/apache/camel/quarkus/component/leveldb/it/LeveldbTest.java b/extensions/leveldb/runtime/src/main/java/org/apache/camel/quarkus/component/leveldb/MMapLogWriterSubstitute.java
similarity index 56%
copy from extensions-jvm/leveldb/integration-test/src/test/java/org/apache/camel/quarkus/component/leveldb/it/LeveldbTest.java
copy to extensions/leveldb/runtime/src/main/java/org/apache/camel/quarkus/component/leveldb/MMapLogWriterSubstitute.java
index a99a338..5fefa9e 100644
--- a/extensions-jvm/leveldb/integration-test/src/test/java/org/apache/camel/quarkus/component/leveldb/it/LeveldbTest.java
+++ b/extensions/leveldb/runtime/src/main/java/org/apache/camel/quarkus/component/leveldb/MMapLogWriterSubstitute.java
@@ -14,21 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.quarkus.component.leveldb.it;
+package org.apache.camel.quarkus.component.leveldb;
-import io.quarkus.test.junit.QuarkusTest;
-import io.restassured.RestAssured;
-import org.junit.jupiter.api.Test;
+import java.nio.MappedByteBuffer;
-@QuarkusTest
-class LeveldbTest {
+import com.oracle.svm.core.annotate.Alias;
+import com.oracle.svm.core.annotate.Substitute;
+import com.oracle.svm.core.annotate.TargetClass;
+import org.iq80.leveldb.impl.MMapLogWriter;
- @Test
- public void loadOtherLeveldb() {
- /* A simple autogenerated test */
- RestAssured.get("/leveldb/load/other/leveldb")
- .then()
- .statusCode(200);
- }
+/**
+ * Workaround for https://github.com/oracle/graal/issues/2761
+ * (see OriginalByteBufferSupport for more information)
+ */
+@TargetClass(value = MMapLogWriter.class)
+final class MMapLogWriterSubstitute {
+ @Alias
+ private MappedByteBuffer mappedByteBuffer;
+
+ @Substitute
+ private void unmap() {
+ OriginalByteBufferSupport.unmap(mappedByteBuffer);
+ }
}
diff --git a/extensions/leveldb/runtime/src/main/java/org/apache/camel/quarkus/component/leveldb/MMapTableSubstitute.java b/extensions/leveldb/runtime/src/main/java/org/apache/camel/quarkus/component/leveldb/MMapTableSubstitute.java
new file mode 100644
index 0000000..3e3882a
--- /dev/null
+++ b/extensions/leveldb/runtime/src/main/java/org/apache/camel/quarkus/component/leveldb/MMapTableSubstitute.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.leveldb;
+
+import java.io.Closeable;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.Callable;
+
+import com.oracle.svm.core.annotate.Alias;
+import com.oracle.svm.core.annotate.Substitute;
+import com.oracle.svm.core.annotate.TargetClass;
+import org.iq80.leveldb.table.MMapTable;
+import org.iq80.leveldb.util.Closeables;
+
+/**
+ * Workaround for https://github.com/oracle/graal/issues/2761
+ * (see OriginalByteBufferSupport for more information)
+ */
+@TargetClass(value = MMapTable.class)
+final class MMapTableSubstitute {
+
+ @Alias
+ protected String name;
+ @Alias
+ protected FileChannel fileChannel;
+ @Alias
+ private MappedByteBuffer data;
+
+ @Substitute
+ public Callable<?> closer() {
+ return new Closer(name, fileChannel, data);
+ }
+
+ private static class Closer
+ implements Callable<Void> {
+ private final String name;
+ private final Closeable closeable;
+ private final MappedByteBuffer data;
+
+ public Closer(String name, Closeable closeable, MappedByteBuffer data) {
+ this.name = name;
+ this.closeable = closeable;
+ this.data = data;
+ }
+
+ public Void call() {
+ OriginalByteBufferSupport.unmap(data);
+ Closeables.closeQuietly(closeable);
+ return null;
+ }
+ }
+
+}
diff --git a/extensions/leveldb/runtime/src/main/java/org/apache/camel/quarkus/component/leveldb/ObjectCodecSubstitute.java b/extensions/leveldb/runtime/src/main/java/org/apache/camel/quarkus/component/leveldb/ObjectCodecSubstitute.java
new file mode 100644
index 0000000..8ab38a4
--- /dev/null
+++ b/extensions/leveldb/runtime/src/main/java/org/apache/camel/quarkus/component/leveldb/ObjectCodecSubstitute.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.leveldb;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.oracle.svm.core.annotate.Inject;
+import com.oracle.svm.core.annotate.Substitute;
+import com.oracle.svm.core.annotate.TargetClass;
+import org.apache.camel.support.DefaultExchangeHolder;
+import org.fusesource.hawtbuf.codec.ObjectCodec;
+
+/**
+ * This os workaround for serialization of DefaultExchangeHolder.
+ * Once serialization is implemented in graalVM (see https://github.com/oracle/graal/issues/460), this substitution
+ * could
+ * be removed.
+ */
+@TargetClass(value = ObjectCodec.class)
+final class ObjectCodecSubstitute {
+
+ @Inject
+ private ObjectMapper objectMapper;
+
+ @Substitute
+ public void encode(Object object, DataOutput dataOut) throws IOException {
+ if (objectMapper == null) {
+ objectMapper = new ObjectMapper();
+ objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.NONE);
+ objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+ }
+ objectMapper.writeValue(dataOut, object);
+ }
+
+ @Substitute
+ public Object decode(DataInput dataIn) throws IOException {
+ if (objectMapper == null) {
+ objectMapper = new ObjectMapper();
+ objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.NONE);
+ objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+ }
+ return objectMapper.readValue(dataIn, DefaultExchangeHolder.class);
+ }
+
+}
diff --git a/extensions/leveldb/runtime/src/main/java/org/apache/camel/quarkus/component/leveldb/OriginalByteBufferSupport.java b/extensions/leveldb/runtime/src/main/java/org/apache/camel/quarkus/component/leveldb/OriginalByteBufferSupport.java
new file mode 100755
index 0000000..b7ec630
--- /dev/null
+++ b/extensions/leveldb/runtime/src/main/java/org/apache/camel/quarkus/component/leveldb/OriginalByteBufferSupport.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.leveldb;
+
+import java.lang.reflect.Method;
+import java.nio.MappedByteBuffer;
+
+import com.google.common.base.Throwables;
+
+/**
+ * Unmap support was changed because of jdk9+ (see
+ * https://github.com/dain/leveldb/commit/39b6e0c38045281fba5f6532c52dc06905890cad)
+ * Current version of levelDB is using MethodHandle, which is not supported by GraalVM (see
+ * https://github.com/oracle/graal/issues/2761)
+ * Original way of using Method (instead of MethodHandle) is working in native mode,
+ * therefore this class contains code from levelDB class `ByteBufferSupport` from the time before mentioned change and
+ * is used via substitutions.
+ * Issue https://github.com/apache/camel-quarkus/issues/1908 is reported to remove class once it is possible.
+ */
+public final class OriginalByteBufferSupport {
+ private static final Method getCleaner;
+ private static final Method clean;
+
+ static {
+ try {
+ getCleaner = Class.forName("java.nio.DirectByteBuffer").getDeclaredMethod("cleaner");
+ getCleaner.setAccessible(true);
+ } catch (ReflectiveOperationException e) {
+ throw new AssertionError(e);
+ }
+
+ try {
+ Class<?> returnType = getCleaner.getReturnType();
+ if (Runnable.class.isAssignableFrom(returnType)) {
+ clean = Runnable.class.getMethod("run");
+ } else {
+ clean = returnType.getMethod("clean");
+ }
+ } catch (NoSuchMethodException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ private OriginalByteBufferSupport() {
+ }
+
+ public static void unmap(MappedByteBuffer buffer) {
+ try {
+ Object cleaner = getCleaner.invoke(buffer);
+ clean.invoke(cleaner);
+ } catch (Exception ignored) {
+ throw Throwables.propagate(ignored);
+ }
+ }
+}
diff --git a/extensions-jvm/leveldb/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/extensions/leveldb/runtime/src/main/resources/META-INF/quarkus-extension.yaml
similarity index 97%
rename from extensions-jvm/leveldb/runtime/src/main/resources/META-INF/quarkus-extension.yaml
rename to extensions/leveldb/runtime/src/main/resources/META-INF/quarkus-extension.yaml
index dbb788f..e12ad37 100644
--- a/extensions-jvm/leveldb/runtime/src/main/resources/META-INF/quarkus-extension.yaml
+++ b/extensions/leveldb/runtime/src/main/resources/META-INF/quarkus-extension.yaml
@@ -24,9 +24,8 @@
name: "Camel LevelDB"
description: "Using LevelDB as persistent EIP store"
metadata:
- unlisted: true
guide: "https://camel.apache.org/camel-quarkus/latest/reference/extensions/leveldb.html"
categories:
- "integration"
status:
- - "preview"
+ - "stable"
diff --git a/extensions/pom.xml b/extensions/pom.xml
index 7f32cc9..82f42a2 100644
--- a/extensions/pom.xml
+++ b/extensions/pom.xml
@@ -139,6 +139,7 @@
<module>kotlin</module>
<module>kubernetes</module>
<module>kudu</module>
+ <module>leveldb</module>
<module>log</module>
<module>lzf</module>
<module>mail</module>
diff --git a/extensions-jvm/leveldb/integration-test/pom.xml b/integration-tests/leveldb/pom.xml
similarity index 55%
rename from extensions-jvm/leveldb/integration-test/pom.xml
rename to integration-tests/leveldb/pom.xml
index aa7b7ff..f678ea1 100644
--- a/extensions-jvm/leveldb/integration-test/pom.xml
+++ b/integration-tests/leveldb/pom.xml
@@ -23,13 +23,13 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.camel.quarkus</groupId>
- <artifactId>camel-quarkus-build-parent-it</artifactId>
+ <artifactId>camel-quarkus-integration-tests</artifactId>
<version>1.2.0-SNAPSHOT</version>
- <relativePath>../../../poms/build-parent-it/pom.xml</relativePath>
+ <relativePath>../pom.xml</relativePath>
</parent>
- <artifactId>camel-quarkus-leveldb-integration-test</artifactId>
- <name>Camel Quarkus :: LevelDB :: Integration Test</name>
+ <artifactId>camel-quarkus-integration-test-leveldb</artifactId>
+ <name>Camel Quarkus :: Integration Tests :: LevelDB</name>
<description>Integration tests for Camel Quarkus LevelDB extension</description>
<dependencyManagement>
@@ -53,6 +53,18 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-resteasy-jackson</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-direct</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-mock</artifactId>
+ </dependency>
<!-- test dependencies -->
<dependency>
@@ -69,6 +81,21 @@
<!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory -->
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-direct-deployment</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory -->
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-leveldb-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
@@ -80,6 +107,19 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-mock-deployment</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
@@ -97,4 +137,35 @@
</plugin>
</plugins>
</build>
+
+ <profiles>
+ <profile>
+ <id>native</id>
+ <activation>
+ <property>
+ <name>native</name>
+ </property>
+ </activation>
+ <properties>
+ <quarkus.package.type>native</quarkus.package.type>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
</project>
diff --git a/integration-tests/leveldb/src/main/java/org/apache/camel/quarkus/component/leveldb/it/LeveldbResource.java b/integration-tests/leveldb/src/main/java/org/apache/camel/quarkus/component/leveldb/it/LeveldbResource.java
new file mode 100644
index 0000000..42b2298
--- /dev/null
+++ b/integration-tests/leveldb/src/main/java/org/apache/camel/quarkus/component/leveldb/it/LeveldbResource.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.leveldb.it;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.mock.MockEndpoint;
+
+import static java.util.stream.Collectors.joining;
+
+@Path("/leveldb")
+@ApplicationScoped
+public class LeveldbResource {
+
+ public static String PARAMETER_BODY = "body";
+ public static String PARAMETER_FROM_ENDPOINT = "fromEndpoint";
+ public static String MOCKS_DELIMITER = ",";
+
+ @Inject
+ ProducerTemplate producerTemplate;
+
+ @Inject
+ CamelContext context;
+
+ @Path("/aggregate")
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response aggregateTest(List<String> messages,
+ @QueryParam("path") String path,
+ @DefaultValue(LeveldbRouteBuilder.MOCK_RESULT) @QueryParam("mocks") String mockNames) throws Exception {
+
+ String[] mockNamesArray = mockNames.split(MOCKS_DELIMITER);
+ MockEndpoint[] mocks = new MockEndpoint[mockNamesArray.length];
+
+ for (int i = 0; i < mocks.length; i++) {
+ mocks[i] = context.getEndpoint(mockNamesArray[i], MockEndpoint.class);
+ mocks[i].reset();
+
+ if (i == 0) {
+ mocks[i].expectedBodiesReceived(messages.stream().sequential().collect(joining("+")));
+ }
+ }
+
+ for (String message : messages) {
+ producerTemplate.sendBodyAndHeader(path, message, "id", 123);
+ }
+
+ mocks[0].assertIsSatisfied(context, 30, TimeUnit.SECONDS);
+
+ Map<String, List<Map<String, Object>>> data = new HashMap();
+ for (int i = 0; i < mocks.length; i++) {
+ data.put(mockNamesArray[i], extractDataFromMock(mocks[i]));
+ }
+
+ return Response
+ .created(new URI("https://camel.apache.org/"))
+ .entity(data)
+ .build();
+ }
+
+ @Path("/aggregateBinary")
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response aggregateBinaryTest(String[] messages) throws Exception {
+
+ MockEndpoint mockResult = context.getEndpoint(LeveldbRouteBuilder.MOCK_RESULT, MockEndpoint.class);
+ mockResult.reset();
+
+ byte[] longestBytes = messages[0].getBytes();
+ for (String message : messages) {
+ byte[] b = message.getBytes();
+ producerTemplate.sendBodyAndHeader(LeveldbRouteBuilder.DIRECT_BINARY, message.getBytes(), "id", 123);
+ if (b.length > longestBytes.length) {
+ longestBytes = b;
+ }
+ }
+
+ mockResult.assertIsSatisfied(context, 30, TimeUnit.SECONDS);
+
+ byte[] result = mockResult.getExchanges().get(0).getIn().getBody(byte[].class);
+
+ return Response
+ .created(new URI("https://camel.apache.org/"))
+ .entity(Arrays.equals(longestBytes, result))
+ .build();
+
+ }
+
+ private List<Map<String, Object>> extractDataFromMock(MockEndpoint mockEndpoint) {
+ List<Map<String, Object>> data = mockEndpoint.getReceivedExchanges().stream().sequential()
+ .map(exchange -> {
+ Map<String, Object> map = new HashMap<>(exchange.getIn().getHeaders());
+ map.put(PARAMETER_FROM_ENDPOINT, exchange.getFromEndpoint().getEndpointUri());
+ map.put(PARAMETER_BODY, String.valueOf(exchange.getIn().getBody()));
+ return map;
+ })
+ .collect(Collectors.toList());
+ return data;
+ }
+}
diff --git a/integration-tests/leveldb/src/main/java/org/apache/camel/quarkus/component/leveldb/it/LeveldbRouteBuilder.java b/integration-tests/leveldb/src/main/java/org/apache/camel/quarkus/component/leveldb/it/LeveldbRouteBuilder.java
new file mode 100644
index 0000000..ed9674f
--- /dev/null
+++ b/integration-tests/leveldb/src/main/java/org/apache/camel/quarkus/component/leveldb/it/LeveldbRouteBuilder.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.quarkus.component.leveldb.it;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.leveldb.LevelDBAggregationRepository;
+
+public class LeveldbRouteBuilder extends RouteBuilder {
+ public static final String DIRECT_START = "direct:start";
+ public static final String DIRECT_BINARY = "direct:binary";
+ public static final String DIRECT_START_WITH_FAILURE = "direct:startWithFailure";
+ public static final String DIRECT_START_DEAD_LETTER = "direct:startDeadLetter";
+ public static final String MOCK_AGGREGATED = "mock:aggregated";
+ public static final String MOCK_RESULT = "mock:result";
+ public static final String MOCK_DEAD = "mock:dead";
+ public static final String DATA_FOLDER = "target/data";
+
+ private static AtomicInteger counter = new AtomicInteger(0);
+
+ @Override
+ public void configure() throws Exception {
+ LevelDBAggregationRepository repo = new LevelDBAggregationRepository("repo", DATA_FOLDER + "leveldb.dat");
+
+ from(DIRECT_START)
+ .aggregate(header("id"), new MyAggregationStrategy())
+ .completionSize(7).aggregationRepository(repo)
+ .to(MOCK_RESULT);
+
+ LevelDBAggregationRepository repoBinary = new LevelDBAggregationRepository("repo", DATA_FOLDER + "levelBinarydb.dat");
+
+ from(DIRECT_BINARY)
+ .aggregate(header("id"), new BinaryAggregationStrategy())
+ .completionSize(3).aggregationRepository(repoBinary)
+ .to(MOCK_RESULT);
+
+ LevelDBAggregationRepository repoWithFailure = new LevelDBAggregationRepository("repoWithFailure",
+ DATA_FOLDER + "leveldbWithFailure.dat");
+
+ repoWithFailure.setUseRecovery(true);
+ repoWithFailure.setRecoveryInterval(500, TimeUnit.MILLISECONDS);
+
+ from(DIRECT_START_WITH_FAILURE)
+ .aggregate(header("id"), new MyAggregationStrategy())
+ .completionSize(7).aggregationRepository(repoWithFailure)
+ .to(MOCK_AGGREGATED)
+ .process(exchange -> {
+ int count = counter.incrementAndGet();
+ if (count <= 2) {
+ throw new IllegalArgumentException("Failure");
+ }
+ })
+ .to(MOCK_RESULT)
+ .end();
+
+ LevelDBAggregationRepository repoDeadLetter = new LevelDBAggregationRepository("repoDeadLetter",
+ DATA_FOLDER + "leveldbDeadLetter.dat");
+
+ repoDeadLetter.setUseRecovery(true);
+ repoDeadLetter.setRecoveryInterval(500, TimeUnit.MILLISECONDS);
+ repoDeadLetter.setMaximumRedeliveries(3);
+ repoDeadLetter.setDeadLetterUri(MOCK_DEAD);
+
+ from(DIRECT_START_DEAD_LETTER)
+ .aggregate(header("id"), new MyAggregationStrategy())
+ .completionSize(7).aggregationRepository(repoDeadLetter)
+ .to(MOCK_AGGREGATED)
+ .process(e -> {
+ throw new IllegalArgumentException("Failure");
+ })
+ .log("XXX: result exchange id ${exchangeId} with ${body}")
+ .to(MOCK_RESULT)
+ .end();
+ }
+
+ public static class MyAggregationStrategy implements AggregationStrategy {
+
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+ String body1 = oldExchange.getIn().getBody(String.class);
+ String body2 = newExchange.getIn().getBody(String.class);
+
+ oldExchange.getIn().setBody(body1 + "+" + body2);
+ return oldExchange;
+ }
+ }
+
+ public static class BinaryAggregationStrategy implements AggregationStrategy {
+
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+ byte[] body1 = oldExchange.getIn().getBody(byte[].class);
+ byte[] body2 = newExchange.getIn().getBody(byte[].class);
+
+ //keeps longer byte[]
+ oldExchange.getIn().setBody(body1.length > body2.length ? body1 : body2);
+ return oldExchange;
+ }
+ }
+}
diff --git a/extensions-jvm/leveldb/integration-test/src/test/java/org/apache/camel/quarkus/component/leveldb/it/LeveldbTest.java b/integration-tests/leveldb/src/test/java/org/apache/camel/quarkus/component/leveldb/it/LeveldbIT.java
similarity index 71%
rename from extensions-jvm/leveldb/integration-test/src/test/java/org/apache/camel/quarkus/component/leveldb/it/LeveldbTest.java
rename to integration-tests/leveldb/src/test/java/org/apache/camel/quarkus/component/leveldb/it/LeveldbIT.java
index a99a338..e4fc4ba 100644
--- a/extensions-jvm/leveldb/integration-test/src/test/java/org/apache/camel/quarkus/component/leveldb/it/LeveldbTest.java
+++ b/integration-tests/leveldb/src/test/java/org/apache/camel/quarkus/component/leveldb/it/LeveldbIT.java
@@ -16,19 +16,12 @@
*/
package org.apache.camel.quarkus.component.leveldb.it;
-import io.quarkus.test.junit.QuarkusTest;
-import io.restassured.RestAssured;
-import org.junit.jupiter.api.Test;
+import io.quarkus.test.junit.NativeImageTest;
-@QuarkusTest
-class LeveldbTest {
-
- @Test
- public void loadOtherLeveldb() {
- /* A simple autogenerated test */
- RestAssured.get("/leveldb/load/other/leveldb")
- .then()
- .statusCode(200);
+@NativeImageTest
+class LeveldbIT extends LeveldbTest {
+ @Override
+ boolean doeasBinaryDataWork() {
+ return false;
}
-
}
diff --git a/integration-tests/leveldb/src/test/java/org/apache/camel/quarkus/component/leveldb/it/LeveldbTest.java b/integration-tests/leveldb/src/test/java/org/apache/camel/quarkus/component/leveldb/it/LeveldbTest.java
new file mode 100644
index 0000000..1ffa037
--- /dev/null
+++ b/integration-tests/leveldb/src/test/java/org/apache/camel/quarkus/component/leveldb/it/LeveldbTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.leveldb.it;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import io.quarkus.test.junit.QuarkusTest;
+import io.restassured.RestAssured;
+import io.restassured.http.ContentType;
+import io.restassured.specification.RequestSpecification;
+import org.apache.camel.Exchange;
+import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@QuarkusTest
+class LeveldbTest {
+
+ @Test
+ public void testAggregate() {
+ Map<String, List<Map<String, Object>>> data = testAggregate(LeveldbRouteBuilder.DIRECT_START,
+ Arrays.asList("S", "H", "E", "L", "D", "O", "N"));
+
+ List<Map<String, Object>> resultData = data.get(LeveldbRouteBuilder.MOCK_RESULT);
+
+ assertEquals("direct://start", resultData.get(0).get(LeveldbResource.PARAMETER_FROM_ENDPOINT));
+ }
+
+ @Test
+ public void testAggregateRecovery() {
+ Map<String, List<Map<String, Object>>> data = testAggregate(LeveldbRouteBuilder.DIRECT_START_WITH_FAILURE,
+ Arrays.asList("S", "H", "E", "L", "D", "O", "N"));
+
+ List<Map<String, Object>> resultData = data.get(LeveldbRouteBuilder.MOCK_RESULT);
+
+ assertEquals(Boolean.TRUE, resultData.get(0).get(Exchange.REDELIVERED));
+ assertEquals(2, resultData.get(0).get(Exchange.REDELIVERY_COUNTER));
+ assertEquals("direct://startWithFailure", resultData.get(0).get(LeveldbResource.PARAMETER_FROM_ENDPOINT));
+ }
+
+ @Test
+ public void testDeadLetter() {
+ Map<String, List<Map<String, Object>>> data = testAggregate(LeveldbRouteBuilder.DIRECT_START_DEAD_LETTER,
+ Arrays.asList("S", "H", "E", "L", "D", "O", "N"),
+ LeveldbRouteBuilder.MOCK_DEAD + "," + LeveldbRouteBuilder.MOCK_RESULT + ","
+ + LeveldbRouteBuilder.MOCK_AGGREGATED);
+
+ List<Map<String, Object>> deadData = data.get(LeveldbRouteBuilder.MOCK_DEAD);
+ List<Map<String, Object>> resultData = data.get(LeveldbRouteBuilder.MOCK_RESULT);
+ List<Map<String, Object>> agreggatedData = data.get(LeveldbRouteBuilder.MOCK_AGGREGATED);
+
+ assertTrue(resultData.isEmpty());
+
+ assertFalse(agreggatedData.get(0).containsKey(Exchange.REDELIVERED));
+ assertEquals(Boolean.TRUE, agreggatedData.get(1).containsKey(Exchange.REDELIVERED));
+ assertEquals(1, agreggatedData.get(1).get(Exchange.REDELIVERY_COUNTER));
+ assertEquals(3, agreggatedData.get(1).get(Exchange.REDELIVERY_MAX_COUNTER));
+ assertEquals(Boolean.TRUE, agreggatedData.get(2).containsKey(Exchange.REDELIVERED));
+ assertEquals(2, agreggatedData.get(2).get(Exchange.REDELIVERY_COUNTER));
+ assertEquals(3, agreggatedData.get(2).get(Exchange.REDELIVERY_MAX_COUNTER));
+ assertEquals(Boolean.TRUE, agreggatedData.get(3).containsKey(Exchange.REDELIVERED));
+ assertEquals(3, agreggatedData.get(3).get(Exchange.REDELIVERY_COUNTER));
+ assertEquals(3, agreggatedData.get(3).get(Exchange.REDELIVERY_MAX_COUNTER));
+
+ assertEquals(Boolean.TRUE, deadData.get(0).containsKey(Exchange.REDELIVERED));
+ assertEquals(3, deadData.get(0).get(Exchange.REDELIVERY_COUNTER));
+ assertFalse(deadData.get(0).containsKey(Exchange.REDELIVERY_MAX_COUNTER));
+ }
+
+ @Test
+ public void testBinaryData() throws Exception {
+
+ boolean theSame = RestAssured.given()
+ .contentType(ContentType.JSON)
+ .body(Arrays.asList("ab", "Sheldon", "cde"))
+ .post("/leveldb/aggregateBinary")
+ .then()
+ .statusCode(201)
+ .extract().as(Boolean.class);
+
+ if (doeasBinaryDataWork()) {
+ assertTrue(theSame);
+ } else {
+ assertFalse(theSame);
+ }
+ }
+
+ /**
+ * Until binary payload is not supported, in native binary test will fail.
+ * Needs https://issues.apache.org/jira/browse/CAMEL-15679
+ */
+ boolean doeasBinaryDataWork() {
+ return true;
+ }
+
+ private Map<String, List<Map<String, Object>>> testAggregate(String path, List<String> messages) {
+ return testAggregate(path, messages, null);
+ }
+
+ private Map<String, List<Map<String, Object>>> testAggregate(String path, List<String> messages, String mocks) {
+ RequestSpecification rs = RestAssured.given()
+ .queryParam("path", path);
+
+ if (mocks != null) {
+ rs = rs.queryParam("mocks", mocks);
+ }
+
+ return (Map<String, List<Map<String, Object>>>) rs.contentType(ContentType.JSON)
+ .body(messages)
+ .post("/leveldb/aggregate")
+ .then()
+ .statusCode(201)
+ .extract().as(Map.class);
+ }
+
+ @AfterAll
+ public static void afterAll() throws Exception {
+ File data = new File(LeveldbRouteBuilder.DATA_FOLDER);
+ FileUtils.deleteDirectory(data);
+ }
+
+ private byte[] readQuarkusFile(String fileName) throws Exception {
+ try (InputStream is = getClass().getClassLoader().getResourceAsStream(fileName)) {
+ return readBytes(is);
+ }
+ }
+
+ static byte[] readBytes(InputStream is) throws Exception {
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ byte[] buffer = new byte[4096];
+ int len;
+ while ((len = is.read(buffer)) != -1) {
+ os.write(buffer, 0, len);
+ }
+ return os.toByteArray();
+ }
+
+}
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index b135a0c..bb9e5e8 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -119,6 +119,7 @@
<module>kotlin</module>
<module>kubernetes</module>
<module>kudu</module>
+ <module>leveldb</module>
<module>mail</module>
<module>master</module>
<module>messaging</module>
diff --git a/tooling/scripts/test-categories.yaml b/tooling/scripts/test-categories.yaml
index f171e5e..966681e 100644
--- a/tooling/scripts/test-categories.yaml
+++ b/tooling/scripts/test-categories.yaml
@@ -49,6 +49,7 @@ database:
- jdbc
- jpa
- kudu
+ - leveldb
- mongodb
- pgevent
- pg-replication-slot