You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2020/09/08 10:30:20 UTC

[camel-kafka-connector] branch master updated (c26a551 -> 0f26f98)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


    from c26a551  Fixed comment in the automatic changelog gh action
     new e76a792  Added performance test infrastructure for RabbitMQ
     new 0f26f98  Adjust the CamelSourceTask code to reduce CPU usage and heap allocations

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../camel/kafkaconnector/CamelSourceTask.java      | 80 ++++++++++++----------
 docs/modules/ROOT/pages/testing.adoc               | 24 +++++++
 parent/pom.xml                                     |  7 ++
 .../{itests-aws-v2 => perf-tests-rabbitmq}/pom.xml | 27 ++++----
 .../rabbitmq/services/ConnectionProperties.java}   | 10 +--
 .../services/RabbitMQLocalContainerService.java    | 67 ++++++++++++++++++
 .../rabbitmq/services/RabbitMQRemoteService.java   | 24 ++++---
 .../rabbitmq/services/RabbitMQService.java}        | 22 +++---
 .../rabbitmq/services/RabbitMQServiceFactory.java} | 23 +++----
 .../source/CamelRabbitMQPropertyFactory.java       | 76 ++++++++++++++++++++
 .../source/RabbitMQSourcePerformanceITCase.java    | 72 +++++++++++++++++++
 tests/pom.xml                                      |  1 +
 12 files changed, 348 insertions(+), 85 deletions(-)
 copy tests/{itests-aws-v2 => perf-tests-rabbitmq}/pom.xml (73%)
 copy tests/{itests-azure-common/src/test/java/org/apache/camel/kafkaconnector/azure/common/AzureCredentialsHolder.java => perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/ConnectionProperties.java} (81%)
 create mode 100644 tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQLocalContainerService.java
 copy core/src/main/java/org/apache/camel/kafkaconnector/VersionUtil.java => tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQRemoteService.java (72%)
 copy tests/{itests-aws-common/src/test/java/org/apache/camel/kafkaconnector/aws/common/services/AWSService.java => perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQService.java} (84%)
 copy tests/{itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/services/HDFSServiceFactory.java => perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQServiceFactory.java} (59%)
 create mode 100644 tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/CamelRabbitMQPropertyFactory.java
 create mode 100644 tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourcePerformanceITCase.java


[camel-kafka-connector] 01/02: Added performance test infrastructure for RabbitMQ

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit e76a79244bb2ad0174cdb3ad7a3f39ab246d2a39
Author: Otavio R. Piske <an...@gmail.com>
AuthorDate: Fri Sep 4 07:32:46 2020 +0200

    Added performance test infrastructure for RabbitMQ
    
    This adds a reference implementation for checking the resource usage of
    the RabbitMQ component while iddle. The motivation for this is related
    to the github issue #414.
---
 docs/modules/ROOT/pages/testing.adoc               | 24 +++++++
 parent/pom.xml                                     |  7 ++
 tests/perf-tests-rabbitmq/pom.xml                  | 74 +++++++++++++++++++++
 .../rabbitmq/services/ConnectionProperties.java    | 25 +++++++
 .../services/RabbitMQLocalContainerService.java    | 67 +++++++++++++++++++
 .../rabbitmq/services/RabbitMQRemoteService.java   | 37 +++++++++++
 .../rabbitmq/services/RabbitMQService.java         | 54 +++++++++++++++
 .../rabbitmq/services/RabbitMQServiceFactory.java  | 45 +++++++++++++
 .../source/CamelRabbitMQPropertyFactory.java       | 76 ++++++++++++++++++++++
 .../source/RabbitMQSourcePerformanceITCase.java    | 72 ++++++++++++++++++++
 tests/pom.xml                                      |  1 +
 11 files changed, 482 insertions(+)

diff --git a/docs/modules/ROOT/pages/testing.adoc b/docs/modules/ROOT/pages/testing.adoc
index 9b59983..ce4de58 100644
--- a/docs/modules/ROOT/pages/testing.adoc
+++ b/docs/modules/ROOT/pages/testing.adoc
@@ -69,6 +69,8 @@ remote instances. At the moment, the following properties can be set for remote
 
 It is possible to use a properties file to set these. To do so use `-Dtest.properties=/path/to/file.properties`.
 
+=== Manual Tests
+
 A few manual tests can be enabled and executed with adequate configuration on the accounts and environments
 used by those services. This is very specific to the nature of each of those services, therefore please consult
 the comments on each of those test cases for the details related to their setup.
@@ -79,6 +81,28 @@ these flags along with others that are specific to that service:
 * it.test.salesforce.enable
 * it.test.slack.enable
 
+=== Performance Tests
+
+There is also a reference test implementation for checking the performance and resource usage in some situations.
+The current implementation, in the perf-tests-rabbitmq, can be run using:
+
+[source,bash]
+----
+mvn -DskipIntegrationTests=false -Dit.test.perf.enabled=true clean verify
+----
+
+Additional JVM settings, such as those to allow usage of the https://github.com/jvm-profiling-tools/async-profiler[Async Profiler]
+can be passed using the `jvm.user.settings` property. For example:
+
+[source,bash]
+----
+mvn -DskipIntegrationTests=false -Dit.test.perf.enabled=true -Djvm.user.settings="-agentpath:/path/to/asyncProfiler.so=start,file=/path/to/profile.svg" clean verify
+----
+
+The duration of the test can be adjusted with the `rabbitmq.test.duration`. This option takes the number of minutes for
+the test duration.
+
+
 [#writing-new-tests]
 == Writing New Tests
 
diff --git a/parent/pom.xml b/parent/pom.xml
index c423459..81ce35a 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -319,6 +319,13 @@
                 <version>${version.postgres}</version>
                 <scope>test</scope>
             </dependency>
+
+            <dependency>
+                <groupId>org.testcontainers</groupId>
+                <artifactId>rabbitmq</artifactId>
+                <version>${testcontainers-version}</version>
+                <scope>test</scope>
+            </dependency>
         </dependencies>
     </dependencyManagement>
     <build>
diff --git a/tests/perf-tests-rabbitmq/pom.xml b/tests/perf-tests-rabbitmq/pom.xml
new file mode 100644
index 0000000..9379e0a
--- /dev/null
+++ b/tests/perf-tests-rabbitmq/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+        <artifactId>itests-parent</artifactId>
+        <version>0.5.0-SNAPSHOT</version>
+        <relativePath>../itests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>perf-tests-rabbitmq</artifactId>
+    <name>Camel-Kafka-Connector :: Tests :: Performance :: RabbitMQ</name>
+
+    <properties>
+        <jmx.port>9010</jmx.port>
+        <rmi.server>localhost</rmi.server>
+        <jvm.user.settings></jvm.user.settings>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-rabbitmq</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>rabbitmq</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+                <configuration>
+                    <argLine>${common.failsafe.args} ${jvm.user.settings} -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Djava.rmi.server.hostname=${rmi.server} -Dcom.sun.management.jmxremote.rmi.port=${jmx.port}</argLine>
+                    <skipTests>${skipIntegrationTests}</skipTests>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+
+</project>
\ No newline at end of file
diff --git a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/ConnectionProperties.java b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/ConnectionProperties.java
new file mode 100644
index 0000000..15b7f8d
--- /dev/null
+++ b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/ConnectionProperties.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.services;
+
+public interface ConnectionProperties {
+    String username();
+    String password();
+    String hostname();
+    int port();
+}
diff --git a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQLocalContainerService.java b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQLocalContainerService.java
new file mode 100644
index 0000000..b6ae6d1
--- /dev/null
+++ b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQLocalContainerService.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.services;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.RabbitMQContainer;
+
+public class RabbitMQLocalContainerService implements RabbitMQService {
+    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQLocalContainerService.class);
+
+    private final RabbitMQContainer container = new RabbitMQContainer("rabbitmq:3.8-management");
+
+    public RabbitMQLocalContainerService() {
+        container.start();
+    }
+
+    @Override
+    public ConnectionProperties connectionProperties() {
+        return new ConnectionProperties() {
+            @Override
+            public String username() {
+                return container.getAdminUsername();
+            }
+
+            @Override
+            public String password() {
+                return container.getAdminPassword();
+            }
+
+            @Override
+            public String hostname() {
+                return container.getHost();
+            }
+
+            @Override
+            public int port() {
+                return container.getAmqpPort();
+            }
+        };
+    }
+
+    @Override
+    public void initialize() {
+        LOG.info("RabbitMQ container running on {}", container.getAmqpUrl());
+    }
+
+    @Override
+    public void shutdown() {
+        container.stop();
+    }
+}
diff --git a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQRemoteService.java b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQRemoteService.java
new file mode 100644
index 0000000..b6957be
--- /dev/null
+++ b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQRemoteService.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.services;
+
+public class RabbitMQRemoteService implements RabbitMQService {
+
+
+    @Override
+    public ConnectionProperties connectionProperties() {
+        return null;
+    }
+
+    @Override
+    public void initialize() {
+
+    }
+
+    @Override
+    public void shutdown() {
+
+    }
+}
diff --git a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQService.java b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQService.java
new file mode 100644
index 0000000..94e12bd
--- /dev/null
+++ b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQService.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.services;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public interface RabbitMQService extends BeforeAllCallback, AfterAllCallback {
+
+
+    /**
+     * The connection properties for the service
+     * @return
+     */
+    ConnectionProperties connectionProperties();
+
+
+    /**
+     * Perform any initialization necessary
+     */
+    void initialize();
+
+    /**
+     * Shuts down the service after the test has completed
+     */
+    void shutdown();
+
+
+    @Override
+    default void afterAll(ExtensionContext extensionContext) throws Exception {
+        shutdown();
+    }
+
+    @Override
+    default void beforeAll(ExtensionContext extensionContext) throws Exception {
+        initialize();
+    }
+}
diff --git a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQServiceFactory.java b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQServiceFactory.java
new file mode 100644
index 0000000..50013dd
--- /dev/null
+++ b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQServiceFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.services;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class RabbitMQServiceFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQServiceFactory.class);
+
+    private RabbitMQServiceFactory() {
+
+    }
+
+    public static RabbitMQService createService() {
+        String instanceType = System.getProperty("rabbitmq.instance.type");
+
+        if (instanceType == null || instanceType.equals("local-rabbitmq-container")) {
+            return new RabbitMQLocalContainerService();
+        }
+
+        if (instanceType.equals("remote")) {
+            return new RabbitMQRemoteService();
+        }
+
+        LOG.error("rabbit-mq instance must be one of 'local-rabbitmq-container' or 'remote");
+        throw new UnsupportedOperationException(String.format("Invalid rabbitmq instance type: %s", instanceType));
+
+    }
+}
diff --git a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/CamelRabbitMQPropertyFactory.java b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/CamelRabbitMQPropertyFactory.java
new file mode 100644
index 0000000..abd8ed6
--- /dev/null
+++ b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/CamelRabbitMQPropertyFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.source;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
+
+public class CamelRabbitMQPropertyFactory extends SourceConnectorPropertyFactory<CamelRabbitMQPropertyFactory> {
+    public CamelRabbitMQPropertyFactory withHostname(String value) {
+        return setProperty("camel.component.rabbitmq.hostname", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withPortNumber(int value) {
+        return setProperty("camel.component.rabbitmq.portNumber", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withUsername(String value) {
+        return setProperty("camel.component.rabbitmq.username", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withPassword(String value) {
+        return setProperty("camel.component.rabbitmq.password", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withExchangeName(String value) {
+        return setProperty("camel.source.path.exchangeName", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withExchangeType(String value) {
+        return setProperty("camel.source.endpoint.exchangeType", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withAutoDelete(boolean value) {
+        return setProperty("camel.source.endpoint.autoDelete", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withQueue(String value) {
+        return setProperty("camel.source.endpoint.queue", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withRoutingKey(String value) {
+        return setProperty("camel.source.endpoint.routingKey", value);
+    }
+
+    public EndpointUrlBuilder<CamelRabbitMQPropertyFactory> withUrl(String hostname, int port, String exchangeName) {
+        String sourceUrl = String.format("rabbitmq://%s:%d/%s", hostname, port, exchangeName);
+
+        return new EndpointUrlBuilder<>(this::withSourceUrl, sourceUrl);
+    }
+
+    public static CamelRabbitMQPropertyFactory basic() {
+        return new CamelRabbitMQPropertyFactory()
+                    .withTasksMax(1)
+                    .withName("CamelRabbitmqSourceConnector")
+                    .withConnectorClass("org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSourceConnector")
+                    .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                    .withValueConverterClass("org.apache.kafka.connect.converters.ByteArrayConverter");
+    
+    }
+
+}
diff --git a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourcePerformanceITCase.java b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourcePerformanceITCase.java
new file mode 100644
index 0000000..d3b04ab
--- /dev/null
+++ b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourcePerformanceITCase.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.source;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.rabbitmq.services.RabbitMQService;
+import org.apache.camel.kafkaconnector.rabbitmq.services.RabbitMQServiceFactory;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+@Testcontainers
+@EnabledIfSystemProperty(named = "it.test.perf.enabled", matches = "true")
+public class RabbitMQSourcePerformanceITCase extends AbstractKafkaTest {
+    @RegisterExtension
+    public static RabbitMQService service = RabbitMQServiceFactory.createService();
+
+    private static int duration;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-rabbitmq-kafka-connector"};
+    }
+
+    @BeforeAll
+    public static void setUpAll() {
+        duration = Integer.parseInt(System.getProperty("rabbitmq.test.duration", "5"));
+    }
+
+    @Test
+    public void testMemory() throws ExecutionException, InterruptedException {
+        ConnectorPropertyFactory factory = CamelRabbitMQPropertyFactory
+                .basic()
+                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withUrl(service.connectionProperties().hostname(), service.connectionProperties().port(),
+                        "X.test")
+                .append("username", service.connectionProperties().username())
+                .append("password", service.connectionProperties().password())
+                .append("autoDelete", "true")
+                .append("queue", "Q.test.kafka.import")
+                .append("routingKey", "events")
+                .buildUrl();
+
+        factory.log();
+        getKafkaConnectService().initializeConnector(factory);
+
+        Thread.sleep(Duration.ofMinutes(duration).toMillis());
+    }
+
+}
diff --git a/tests/pom.xml b/tests/pom.xml
index 52f63e7..f65a488 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -58,6 +58,7 @@
         <module>itests-jdbc</module>
         <module>itests-azure-common</module>
         <module>itests-azure-storage-queue</module>
+        <module>perf-tests-rabbitmq</module>
     </modules>
 
 


[camel-kafka-connector] 02/02: Adjust the CamelSourceTask code to reduce CPU usage and heap allocations

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 0f26f989daeae86eda0073a3e33864fdffc81d3d
Author: Otavio R. Piske <an...@gmail.com>
AuthorDate: Fri Sep 4 20:34:54 2020 +0200

    Adjust the CamelSourceTask code to reduce CPU usage and heap allocations
    
    This modifies the code so that it blocks while waiting for the messages
    to arrive while also respecting the maxPollDuration.
---
 .../camel/kafkaconnector/CamelSourceTask.java      | 80 ++++++++++++----------
 1 file changed, 43 insertions(+), 37 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index d4e0810..e825e49 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -119,53 +119,59 @@ public class CamelSourceTask extends SourceTask {
         }
     }
 
+    private long remaining(long startPollEpochMilli, long maxPollDuration)  {
+        return maxPollDuration - (Instant.now().toEpochMilli() - startPollEpochMilli);
+    }
+
+
     @Override
     public synchronized List<SourceRecord> poll() {
-        long startPollEpochMilli = Instant.now().toEpochMilli();
+        final long startPollEpochMilli = Instant.now().toEpochMilli();
+
+        long remaining = remaining(startPollEpochMilli, maxPollDuration);
         long collectedRecords = 0L;
 
         List<SourceRecord> records = new ArrayList<>();
-        while (collectedRecords < maxBatchPollSize && (Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) {
-            Exchange exchange = consumer.receiveNoWait();
-
-            if (exchange != null) {
-                LOG.debug("Received Exchange {} with Message {} from Endpoint {}", exchange.getExchangeId(), exchange.getMessage().getMessageId(), exchange.getFromEndpoint());
-
-                // TODO: see if there is a better way to use sourcePartition
-                // an sourceOffset
-                Map<String, String> sourcePartition = Collections.singletonMap("filename", exchange.getFromEndpoint().toString());
-                Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());
-
-                final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
-                final Object messageBodyValue = exchange.getMessage().getBody();
-
-                final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
-                final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;
-
-                for (String singleTopic : topics) {
-                    SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, singleTopic, messageKeySchema, messageHeaderKey, messageBodySchema, messageBodyValue);
-                    if (exchange.getMessage().hasHeaders()) {
-                        setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
-                    }
-                    if (exchange.hasProperties()) {
-                        setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
-                    }
-
-                    TaskHelper.logRecordContent(LOG, record, config);
-                    records.add(record);
-                }
-                collectedRecords++;
-            } else {
+        while (collectedRecords < maxBatchPollSize && remaining > 0) {
+            Exchange exchange = consumer.receive(remaining);
+            if (exchange == null) {
+                // Nothing received, abort and return what we received so far
                 break;
             }
-        }
 
-        if (records.isEmpty()) {
-            return Collections.EMPTY_LIST;
-        } else {
-            return records;
+            LOG.debug("Received Exchange {} with Message {} from Endpoint {}", exchange.getExchangeId(),
+                    exchange.getMessage().getMessageId(), exchange.getFromEndpoint());
+
+            // TODO: see if there is a better way to use sourcePartition
+            // an sourceOffset
+            Map<String, String> sourcePartition = Collections.singletonMap("filename", exchange.getFromEndpoint().toString());
+            Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());
+
+            final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
+            final Object messageBodyValue = exchange.getMessage().getBody();
+
+            final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
+            final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;
+
+            for (String singleTopic : topics) {
+                SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, singleTopic, messageKeySchema,
+                        messageHeaderKey, messageBodySchema, messageBodyValue);
+
+                if (exchange.getMessage().hasHeaders()) {
+                    setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
+                }
+                if (exchange.hasProperties()) {
+                    setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
+                }
+
+                TaskHelper.logRecordContent(LOG, record, config);
+                records.add(record);
+            }
+            collectedRecords++;
+            remaining = remaining(startPollEpochMilli, maxPollDuration);
         }
 
+        return records.isEmpty() ? null : records;
     }
 
     @Override