You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/01/08 12:51:51 UTC

[camel-kafka-connector] branch camel-kafka-connector-0.7.x updated: Ensure the DLQ configuration from Kafka Connect is correctly handled (issue #835)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-kafka-connector-0.7.x
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/camel-kafka-connector-0.7.x by this push:
     new 2e52089  Ensure the DLQ configuration from Kafka Connect is correctly handled (issue #835)
2e52089 is described below

commit 2e520895550380e06a6e4938e0a359d18daf6609
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Fri Jan 8 12:01:51 2021 +0100

    Ensure the DLQ configuration from Kafka Connect is correctly handled (issue #835)
---
 .../apache/camel/kafkaconnector/CamelSinkTask.java |  14 ++-
 .../common/BasicConnectorPropertyFactory.java      |  15 +++
 .../sjms2/sink/CamelSinkWithDLQJMSITCase.java      | 127 +++++++++++++++++++++
 3 files changed, 155 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 82f2f05..e44676b 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -35,6 +35,7 @@ import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.slf4j.Logger;
@@ -51,6 +52,7 @@ public class CamelSinkTask extends SinkTask {
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkTask.class);
 
     private static final String LOCAL_URL = "direct:start";
+    private ErrantRecordReporter reporter;
 
 
     private CamelKafkaConnectMain cms;
@@ -70,6 +72,10 @@ public class CamelSinkTask extends SinkTask {
             Map<String, String> actualProps = TaskHelper.combineDefaultAndLoadedProperties(getDefaultConfig(), props);
             CamelSinkConnectorConfig config = getCamelSinkConnectorConfig(actualProps);
 
+            if (context != null) {
+                reporter = context.errantRecordReporter();
+            }
+
             try {
                 String levelStr = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
                 loggingLevel = LoggingLevel.valueOf(levelStr.toUpperCase());
@@ -175,7 +181,13 @@ public class CamelSinkTask extends SinkTask {
             producer.send(localEndpoint, exchange);
 
             if (exchange.isFailed()) {
-                throw new ConnectException("Exchange delivery has failed!", exchange.getException());
+                if (reporter == null) {
+                    LOG.warn("A delivery has failed and the error reporting is NOT enabled. Records may be lost or ignored");
+                    throw new ConnectException("Exchange delivery has failed!", exchange.getException());
+                }
+
+                LOG.warn("A delivery has failed and the error reporting is enabled. Sending record to the DLQ");
+                reporter.report(record, exchange.getException());
             }
         }
     }
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java
index d5dd4f6..cee96b9 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java
@@ -63,6 +63,21 @@ public abstract class BasicConnectorPropertyFactory<T extends BasicConnectorProp
         return (T) this;
     }
 
+    /**
+     * This enables sending failed records to the DLQ. Note: it automatically configure other required/recommended
+     * options!
+     * @param topicName the DLQ topic name
+     * @return this object instance
+     */
+    public T withDeadLetterQueueTopicName(String topicName) {
+        // There's no constant for the DLQ settings
+        connectorProps.put("errors.deadletterqueue.topic.name", topicName);
+        connectorProps.put("errors.deadletterqueue.topic.replication.factor", 1);
+        connectorProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, true);
+
+        return (T) this;
+    }
+
     public TransformsConfigBuilder<T> withTransformsConfig(String name) {
         return new TransformsConfigBuilder<>((T) this, getProperties(), name);
     }
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkWithDLQJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkWithDLQJMSITCase.java
new file mode 100644
index 0000000..d47e4c8
--- /dev/null
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkWithDLQJMSITCase.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.sjms2.sink;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Integration tests for the JMS sink with a DLQ configuration. This test forces a failure in the sink connector to
+ * ensure that the failed records are added to the DLQ configured in Kafka.
+ */
+@Testcontainers
+public class CamelSinkWithDLQJMSITCase extends AbstractKafkaTest {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkWithDLQJMSITCase.class);
+
+    private int received;
+    private final int expect = 10;
+    private int errors;
+    private final int expectedErrors = 1;
+
+    private Properties connectionProperties() {
+        Properties properties = new Properties();
+
+        properties.put("camel.component.sjms2.connection-factory", "#class:org.apache.qpid.jms.JmsConnectionFactory");
+        properties.put("camel.component.sjms2.connection-factory.remoteURI", "invalid");
+
+        return properties;
+    }
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-sjms2-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        received = 0;
+        errors = 0;
+    }
+
+    private <T> boolean checkDqlRecord(ConsumerRecord<String, T> record) {
+        LOG.debug("Received: {}", record.value());
+        errors++;
+
+        if (errors >= expectedErrors) {
+            return false;
+        }
+
+        return true;
+    }
+
+    private void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+        connectorPropertyFactory.log();
+        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+
+        LOG.debug("Creating the consumer ...");
+
+
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+        for (int i = 0; i < expect; i++) {
+            kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i);
+        }
+
+        LOG.debug("Created the consumer ... About to receive messages");
+    }
+
+
+    @Test
+    @Timeout(10)
+    public void testSendReceiveWithError() {
+        try {
+            Properties brokenProp = connectionProperties();
+
+            brokenProp.put("camel.component.sjms2.connection-factory.remoteURI", "invalid");
+
+            ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
+                    .basic()
+                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                    .withConnectionProperties(brokenProp)
+                    .withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE)
+                    .withDeadLetterQueueTopicName("dlq-sink-topic");
+
+            runTest(connectorPropertyFactory);
+
+            KafkaClient<String, Integer> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+            kafkaClient.consume("dlq-sink-topic", this::checkDqlRecord);
+
+            assertEquals(expectedErrors, errors, "Didn't process the expected amount of messages");
+
+        } catch (Exception e) {
+            LOG.error("JMS test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        }
+    }
+}