You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/03/05 14:19:04 UTC

[camel-kafka-connector] branch master updated: Added Google pub/sub sink integration test

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 35a0838  Added Google pub/sub sink integration test
35a0838 is described below

commit 35a0838ebacba92b8f3ad900b9b6e0a3c51193bc
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Thu Mar 4 10:00:14 2021 +0100

    Added Google pub/sub sink integration test
---
 .../src/test/resources/log4j2.properties           |   8 +
 tests/itests-google-pubsub/pom.xml                 |  65 ++++++++
 .../google/pubsub/clients/GooglePubEasy.java       | 167 +++++++++++++++++++++
 .../sink/CamelGooglePubSubPropertyFactory.java     |  54 +++++++
 .../pubsub/sink/CamelSinkGooglePubSubITCase.java   | 130 ++++++++++++++++
 tests/pom.xml                                      |   1 +
 6 files changed, 425 insertions(+)

diff --git a/tests/itests-common/src/test/resources/log4j2.properties b/tests/itests-common/src/test/resources/log4j2.properties
index aa48d0a..b7df3d4 100644
--- a/tests/itests-common/src/test/resources/log4j2.properties
+++ b/tests/itests-common/src/test/resources/log4j2.properties
@@ -95,3 +95,11 @@ logger.azure.name = com.azure
 logger.azure.level = trace
 logger.azure.additivity = false
 logger.azure.appenderRef.file.ref = file
+
+# Google
+logger.google.name = com.google
+logger.google.level = INFO
+logger.google.additivity = false
+logger.google.appenderRef.file.ref = file
+
+
diff --git a/tests/itests-google-pubsub/pom.xml b/tests/itests-google-pubsub/pom.xml
new file mode 100644
index 0000000..1dc469a
--- /dev/null
+++ b/tests/itests-google-pubsub/pom.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+        <artifactId>itests-parent</artifactId>
+        <version>0.9.0-SNAPSHOT</version>
+        <relativePath>../itests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>itests-google-pubsub</artifactId>
+    <name>Camel-Kafka-Connector :: Tests :: Google Pub/Sub</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- test infra -->
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test-infra-common</artifactId>
+            <version>${camel.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test-infra-google-pubsub</artifactId>
+            <version>${camel.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-google-pubsub</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/clients/GooglePubEasy.java b/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/clients/GooglePubEasy.java
new file mode 100644
index 0000000..02420fe
--- /dev/null
+++ b/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/clients/GooglePubEasy.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.google.pubsub.clients;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
+import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
+import com.google.cloud.pubsub.v1.TopicAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PushConfig;
+import com.google.pubsub.v1.Subscription;
+import com.google.pubsub.v1.TopicName;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GooglePubEasy {
+    private static final Logger LOG = LoggerFactory.getLogger(GooglePubEasy.class);
+    private final List<String> receivedMessages = new ArrayList<>();
+
+    private final String serviceAddress;
+    private final String project;
+
+
+    private final ManagedChannel channel;
+    private final FixedTransportChannelProvider channelProvider;
+
+    private ProjectSubscriptionName projectSubscriptionName;
+    private Subscriber subscriber;
+
+    public GooglePubEasy(String serviceAddress, String project) {
+        this.serviceAddress = serviceAddress;
+        this.project = project;
+
+        channel = ManagedChannelBuilder
+                .forTarget(String.format(serviceAddress))
+                .usePlaintext()
+                .build();
+
+        channelProvider =
+                FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
+    }
+
+    public void createTopic(String topicName) throws IOException, InterruptedException {
+        doCreateTopic(topicName);
+    }
+
+    public void createSubscription(String subscriptionName, String topicName) throws IOException {
+        TopicName googleTopic = TopicName.of(project, topicName);
+
+        projectSubscriptionName = ProjectSubscriptionName.of(project, subscriptionName);
+
+        SubscriptionAdminSettings adminSettings = SubscriptionAdminSettings
+                .newBuilder()
+                .setCredentialsProvider(NoCredentialsProvider.create())
+                .setTransportChannelProvider(channelProvider)
+                .build();
+
+        try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create(adminSettings)) {
+            Subscription subscription = subscriptionAdminClient.createSubscription(
+                    projectSubscriptionName, googleTopic, PushConfig.getDefaultInstance(), 10);
+        }
+    }
+
+    private void doCreateTopic(String topicName) throws IOException, InterruptedException {
+        TopicName googleTopic = TopicName.of(project, topicName);
+
+        TopicAdminSettings topicAdminSettings = TopicAdminSettings
+                .newBuilder()
+                .setCredentialsProvider(NoCredentialsProvider.create())
+                .setTransportChannelProvider(channelProvider)
+                .build();
+
+        try (TopicAdminClient client = TopicAdminClient.create(topicAdminSettings)) {
+            LOG.info("Creating topic {} (original {})", googleTopic.toString(), googleTopic.getTopic());
+
+            client.createTopic(googleTopic);
+
+            if (client.awaitTermination(10, TimeUnit.SECONDS)) {
+                client.shutdownNow();
+            }
+        }
+    }
+
+    public void receive() {
+        try {
+            MessageReceiver receiver = (pubsubMessage, ackReplyConsumer) -> {
+                String data = pubsubMessage.getData().toString();
+                LOG.info("Received: {}", data);
+                receivedMessages.add(data);
+
+                if (receivedMessages.size() >= 10) {
+                    subscriber.stopAsync();
+                }
+
+                ackReplyConsumer.ack();
+            };
+
+            subscriber = Subscriber
+                    .newBuilder(projectSubscriptionName, receiver)
+                    .setCredentialsProvider(NoCredentialsProvider.create())
+                    .setChannelProvider(channelProvider)
+                    .build();
+
+
+            LOG.info("Adding listener ...");
+            subscriber.addListener(
+                    new Subscriber.Listener() {
+                        @Override
+                        public void failed(Subscriber.State from, Throwable failure) {
+                            LOG.error(failure.getMessage(), failure);
+                        }
+                    },
+                    MoreExecutors.directExecutor());
+
+            LOG.info("Starting async ...");
+            subscriber.startAsync().awaitRunning();
+            LOG.info("Waiting for messages ...");
+            subscriber.awaitTerminated(25, TimeUnit.SECONDS);
+        } catch (TimeoutException e) {
+            subscriber.stopAsync();
+        } finally {
+            if (subscriber != null) {
+                subscriber.stopAsync();
+            }
+        }
+    }
+
+    public void shutdown() {
+        if (channel != null) {
+            channel.shutdown();
+        }
+    }
+
+    public List<String> getReceivedMessages() {
+        return Collections.unmodifiableList(receivedMessages);
+    }
+}
diff --git a/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelGooglePubSubPropertyFactory.java b/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelGooglePubSubPropertyFactory.java
new file mode 100644
index 0000000..e7a1c01
--- /dev/null
+++ b/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelGooglePubSubPropertyFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.google.pubsub.sink;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+public class CamelGooglePubSubPropertyFactory extends SinkConnectorPropertyFactory<CamelGooglePubSubPropertyFactory> {
+
+    public CamelGooglePubSubPropertyFactory withProjectId(String value) {
+        return setProperty("camel.sink.path.projectId", value);
+    }
+
+    public CamelGooglePubSubPropertyFactory withDestinationName(String value) {
+        return setProperty("camel.sink.path.destinationName", value);
+    }
+
+    public CamelGooglePubSubPropertyFactory withEndpoint(String value) {
+        return setProperty("camel.component.google-pubsub.endpoint", value);
+    }
+
+
+    public EndpointUrlBuilder<CamelGooglePubSubPropertyFactory> withUrl(String projectId, String destinationName) {
+        String queueUrl = String.format("google-pubsub:%s:%s", projectId, destinationName);
+
+        return new EndpointUrlBuilder<>(this::withSinkUrl, queueUrl);
+    }
+
+    public static CamelGooglePubSubPropertyFactory basic() {
+        return new CamelGooglePubSubPropertyFactory()
+                    .withTasksMax(1)
+                    .withName("CamelGooglePubSub")
+                    .withConnectorClass("org.apache.camel.kafkaconnector.googlepubsub.CamelGooglepubsubSinkConnector")
+                    .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                    .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+
+    }
+
+}
diff --git a/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelSinkGooglePubSubITCase.java b/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelSinkGooglePubSubITCase.java
new file mode 100644
index 0000000..ff1adf7
--- /dev/null
+++ b/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelSinkGooglePubSubITCase.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.google.pubsub.sink;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.google.pubsub.clients.GooglePubEasy;
+import org.apache.camel.test.infra.google.pubsub.services.GooglePubSubService;
+import org.apache.camel.test.infra.google.pubsub.services.GooglePubSubServiceFactory;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class CamelSinkGooglePubSubITCase extends CamelSinkTestSupport {
+    @RegisterExtension
+    public static GooglePubSubService service = GooglePubSubServiceFactory.createService();
+
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkGooglePubSubITCase.class);
+    private String project = "ckc";
+    private GooglePubEasy easyClient;
+
+    private String googlePubSubTopic;
+
+    private final int expected = 10;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[]{"camel-google-pubsub-kafka-connector"};
+    }
+
+
+    @BeforeEach
+    public void setUp() {
+        googlePubSubTopic = "ckctopic" + TestUtils.randomWithRange(0, 100);
+        LOG.info("Requesting topic {} for the pub/sub client", googlePubSubTopic);
+
+        easyClient = new GooglePubEasy(service.getServiceAddress(), project);
+
+        try {
+            easyClient.createTopic(googlePubSubTopic);
+            easyClient.createSubscription("test-subscription", googlePubSubTopic);
+        } catch (InterruptedException | IOException e) {
+            fail(e.getMessage());
+        }
+
+    }
+
+    @AfterEach
+    public void tearDown() {
+        easyClient.shutdown();
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            easyClient.receive();
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        List<String> receivedMessages = easyClient.getReceivedMessages();
+
+        if (latch.await(30, TimeUnit.SECONDS)) {
+            assertEquals(expected, receivedMessages.size(), "Did not receive as many messages as was sent");
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
+
+    @Test
+    public void testBasicSendReceive() throws Exception {
+        String topicName = getTopicForTest(this);
+
+        ConnectorPropertyFactory connectorPropertyFactory = CamelGooglePubSubPropertyFactory
+                .basic()
+                .withTopics(topicName)
+                .withProjectId(project)
+                .withDestinationName(googlePubSubTopic)
+                .withEndpoint(service.getServiceAddress());
+
+        runTest(connectorPropertyFactory, topicName, expected);
+    }
+
+    @Disabled("Disabled due to #1086")
+    @Test
+    public void testBasicSendReceiveUrl() throws Exception {
+        String topicName = getTopicForTest(this);
+
+        ConnectorPropertyFactory connectorPropertyFactory = CamelGooglePubSubPropertyFactory
+                .basic()
+                .withTopics(topicName)
+                .withUrl(project, googlePubSubTopic)
+                .append("endpoint", service.getServiceAddress())
+                .buildUrl();
+
+        runTest(connectorPropertyFactory, topicName, expected);
+    }
+
+}
diff --git a/tests/pom.xml b/tests/pom.xml
index fadfc35..0c3ff80 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -65,6 +65,7 @@
         <module>itests-cxf</module>
         <module>itests-netty</module>
         <module>itests-netty-http</module>
+        <module>itests-google-pubsub</module>
     </modules>