You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/03/05 14:19:04 UTC
[camel-kafka-connector] branch master updated: Added Google pub/sub
sink integration test
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 35a0838 Added Google pub/sub sink integration test
35a0838 is described below
commit 35a0838ebacba92b8f3ad900b9b6e0a3c51193bc
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Thu Mar 4 10:00:14 2021 +0100
Added Google pub/sub sink integration test
---
.../src/test/resources/log4j2.properties | 8 +
tests/itests-google-pubsub/pom.xml | 65 ++++++++
.../google/pubsub/clients/GooglePubEasy.java | 167 +++++++++++++++++++++
.../sink/CamelGooglePubSubPropertyFactory.java | 54 +++++++
.../pubsub/sink/CamelSinkGooglePubSubITCase.java | 130 ++++++++++++++++
tests/pom.xml | 1 +
6 files changed, 425 insertions(+)
diff --git a/tests/itests-common/src/test/resources/log4j2.properties b/tests/itests-common/src/test/resources/log4j2.properties
index aa48d0a..b7df3d4 100644
--- a/tests/itests-common/src/test/resources/log4j2.properties
+++ b/tests/itests-common/src/test/resources/log4j2.properties
@@ -95,3 +95,11 @@ logger.azure.name = com.azure
logger.azure.level = trace
logger.azure.additivity = false
logger.azure.appenderRef.file.ref = file
+
+# Google
+logger.google.name = com.google
+logger.google.level = INFO
+logger.google.additivity = false
+logger.google.appenderRef.file.ref = file
+
+
diff --git a/tests/itests-google-pubsub/pom.xml b/tests/itests-google-pubsub/pom.xml
new file mode 100644
index 0000000..1dc469a
--- /dev/null
+++ b/tests/itests-google-pubsub/pom.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>itests-parent</artifactId>
+ <version>0.9.0-SNAPSHOT</version>
+ <relativePath>../itests-parent/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>itests-google-pubsub</artifactId>
+ <name>Camel-Kafka-Connector :: Tests :: Google Pub/Sub</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>itests-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- test infra -->
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-infra-common</artifactId>
+ <version>${camel.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-infra-google-pubsub</artifactId>
+ <version>${camel.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-google-pubsub</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/clients/GooglePubEasy.java b/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/clients/GooglePubEasy.java
new file mode 100644
index 0000000..02420fe
--- /dev/null
+++ b/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/clients/GooglePubEasy.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.google.pubsub.clients;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
+import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
+import com.google.cloud.pubsub.v1.TopicAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PushConfig;
+import com.google.pubsub.v1.Subscription;
+import com.google.pubsub.v1.TopicName;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GooglePubEasy {
+ private static final Logger LOG = LoggerFactory.getLogger(GooglePubEasy.class);
+ private final List<String> receivedMessages = new ArrayList<>();
+
+ private final String serviceAddress;
+ private final String project;
+
+
+ private final ManagedChannel channel;
+ private final FixedTransportChannelProvider channelProvider;
+
+ private ProjectSubscriptionName projectSubscriptionName;
+ private Subscriber subscriber;
+
+ public GooglePubEasy(String serviceAddress, String project) {
+ this.serviceAddress = serviceAddress;
+ this.project = project;
+
+ channel = ManagedChannelBuilder
+ .forTarget(String.format(serviceAddress))
+ .usePlaintext()
+ .build();
+
+ channelProvider =
+ FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
+ }
+
+ public void createTopic(String topicName) throws IOException, InterruptedException {
+ doCreateTopic(topicName);
+ }
+
+ public void createSubscription(String subscriptionName, String topicName) throws IOException {
+ TopicName googleTopic = TopicName.of(project, topicName);
+
+ projectSubscriptionName = ProjectSubscriptionName.of(project, subscriptionName);
+
+ SubscriptionAdminSettings adminSettings = SubscriptionAdminSettings
+ .newBuilder()
+ .setCredentialsProvider(NoCredentialsProvider.create())
+ .setTransportChannelProvider(channelProvider)
+ .build();
+
+ try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create(adminSettings)) {
+ Subscription subscription = subscriptionAdminClient.createSubscription(
+ projectSubscriptionName, googleTopic, PushConfig.getDefaultInstance(), 10);
+ }
+ }
+
+ private void doCreateTopic(String topicName) throws IOException, InterruptedException {
+ TopicName googleTopic = TopicName.of(project, topicName);
+
+ TopicAdminSettings topicAdminSettings = TopicAdminSettings
+ .newBuilder()
+ .setCredentialsProvider(NoCredentialsProvider.create())
+ .setTransportChannelProvider(channelProvider)
+ .build();
+
+ try (TopicAdminClient client = TopicAdminClient.create(topicAdminSettings)) {
+ LOG.info("Creating topic {} (original {})", googleTopic.toString(), googleTopic.getTopic());
+
+ client.createTopic(googleTopic);
+
+ if (client.awaitTermination(10, TimeUnit.SECONDS)) {
+ client.shutdownNow();
+ }
+ }
+ }
+
+ public void receive() {
+ try {
+ MessageReceiver receiver = (pubsubMessage, ackReplyConsumer) -> {
+ String data = pubsubMessage.getData().toString();
+ LOG.info("Received: {}", data);
+ receivedMessages.add(data);
+
+ if (receivedMessages.size() >= 10) {
+ subscriber.stopAsync();
+ }
+
+ ackReplyConsumer.ack();
+ };
+
+ subscriber = Subscriber
+ .newBuilder(projectSubscriptionName, receiver)
+ .setCredentialsProvider(NoCredentialsProvider.create())
+ .setChannelProvider(channelProvider)
+ .build();
+
+
+ LOG.info("Adding listener ...");
+ subscriber.addListener(
+ new Subscriber.Listener() {
+ @Override
+ public void failed(Subscriber.State from, Throwable failure) {
+ LOG.error(failure.getMessage(), failure);
+ }
+ },
+ MoreExecutors.directExecutor());
+
+ LOG.info("Starting async ...");
+ subscriber.startAsync().awaitRunning();
+ LOG.info("Waiting for messages ...");
+ subscriber.awaitTerminated(25, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ subscriber.stopAsync();
+ } finally {
+ if (subscriber != null) {
+ subscriber.stopAsync();
+ }
+ }
+ }
+
+ public void shutdown() {
+ if (channel != null) {
+ channel.shutdown();
+ }
+ }
+
+ public List<String> getReceivedMessages() {
+ return Collections.unmodifiableList(receivedMessages);
+ }
+}
diff --git a/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelGooglePubSubPropertyFactory.java b/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelGooglePubSubPropertyFactory.java
new file mode 100644
index 0000000..e7a1c01
--- /dev/null
+++ b/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelGooglePubSubPropertyFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.google.pubsub.sink;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+public class CamelGooglePubSubPropertyFactory extends SinkConnectorPropertyFactory<CamelGooglePubSubPropertyFactory> {
+
+ public CamelGooglePubSubPropertyFactory withProjectId(String value) {
+ return setProperty("camel.sink.path.projectId", value);
+ }
+
+ public CamelGooglePubSubPropertyFactory withDestinationName(String value) {
+ return setProperty("camel.sink.path.destinationName", value);
+ }
+
+ public CamelGooglePubSubPropertyFactory withEndpoint(String value) {
+ return setProperty("camel.component.google-pubsub.endpoint", value);
+ }
+
+
+ public EndpointUrlBuilder<CamelGooglePubSubPropertyFactory> withUrl(String projectId, String destinationName) {
+ String queueUrl = String.format("google-pubsub:%s:%s", projectId, destinationName);
+
+ return new EndpointUrlBuilder<>(this::withSinkUrl, queueUrl);
+ }
+
+ public static CamelGooglePubSubPropertyFactory basic() {
+ return new CamelGooglePubSubPropertyFactory()
+ .withTasksMax(1)
+ .withName("CamelGooglePubSub")
+ .withConnectorClass("org.apache.camel.kafkaconnector.googlepubsub.CamelGooglepubsubSinkConnector")
+ .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+ .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+
+ }
+
+}
diff --git a/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelSinkGooglePubSubITCase.java b/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelSinkGooglePubSubITCase.java
new file mode 100644
index 0000000..ff1adf7
--- /dev/null
+++ b/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelSinkGooglePubSubITCase.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.google.pubsub.sink;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.google.pubsub.clients.GooglePubEasy;
+import org.apache.camel.test.infra.google.pubsub.services.GooglePubSubService;
+import org.apache.camel.test.infra.google.pubsub.services.GooglePubSubServiceFactory;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class CamelSinkGooglePubSubITCase extends CamelSinkTestSupport {
+ @RegisterExtension
+ public static GooglePubSubService service = GooglePubSubServiceFactory.createService();
+
+ private static final Logger LOG = LoggerFactory.getLogger(CamelSinkGooglePubSubITCase.class);
+ private String project = "ckc";
+ private GooglePubEasy easyClient;
+
+ private String googlePubSubTopic;
+
+ private final int expected = 10;
+
+ @Override
+ protected String[] getConnectorsInTest() {
+ return new String[]{"camel-google-pubsub-kafka-connector"};
+ }
+
+
+ @BeforeEach
+ public void setUp() {
+ googlePubSubTopic = "ckctopic" + TestUtils.randomWithRange(0, 100);
+ LOG.info("Requesting topic {} for the pub/sub client", googlePubSubTopic);
+
+ easyClient = new GooglePubEasy(service.getServiceAddress(), project);
+
+ try {
+ easyClient.createTopic(googlePubSubTopic);
+ easyClient.createSubscription("test-subscription", googlePubSubTopic);
+ } catch (InterruptedException | IOException e) {
+ fail(e.getMessage());
+ }
+
+ }
+
+ @AfterEach
+ public void tearDown() {
+ easyClient.shutdown();
+ }
+
+ @Override
+ protected void consumeMessages(CountDownLatch latch) {
+ try {
+ easyClient.receive();
+ } finally {
+ latch.countDown();
+ }
+ }
+
+ @Override
+ protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+ List<String> receivedMessages = easyClient.getReceivedMessages();
+
+ if (latch.await(30, TimeUnit.SECONDS)) {
+ assertEquals(expected, receivedMessages.size(), "Did not receive as many messages as was sent");
+ } else {
+ fail("Failed to receive the messages within the specified time");
+ }
+ }
+
+ @Test
+ public void testBasicSendReceive() throws Exception {
+ String topicName = getTopicForTest(this);
+
+ ConnectorPropertyFactory connectorPropertyFactory = CamelGooglePubSubPropertyFactory
+ .basic()
+ .withTopics(topicName)
+ .withProjectId(project)
+ .withDestinationName(googlePubSubTopic)
+ .withEndpoint(service.getServiceAddress());
+
+ runTest(connectorPropertyFactory, topicName, expected);
+ }
+
+ @Disabled("Disabled due to #1086")
+ @Test
+ public void testBasicSendReceiveUrl() throws Exception {
+ String topicName = getTopicForTest(this);
+
+ ConnectorPropertyFactory connectorPropertyFactory = CamelGooglePubSubPropertyFactory
+ .basic()
+ .withTopics(topicName)
+ .withUrl(project, googlePubSubTopic)
+ .append("endpoint", service.getServiceAddress())
+ .buildUrl();
+
+ runTest(connectorPropertyFactory, topicName, expected);
+ }
+
+}
diff --git a/tests/pom.xml b/tests/pom.xml
index fadfc35..0c3ff80 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -65,6 +65,7 @@
<module>itests-cxf</module>
<module>itests-netty</module>
<module>itests-netty-http</module>
+ <module>itests-google-pubsub</module>
</modules>