You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/01/08 12:51:51 UTC
[camel-kafka-connector] branch camel-kafka-connector-0.7.x updated:
Ensure the DLQ configuration from Kafka Connect is correctly handled (issue
#835)
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-kafka-connector-0.7.x
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/camel-kafka-connector-0.7.x by this push:
new 2e52089 Ensure the DLQ configuration from Kafka Connect is correctly handled (issue #835)
2e52089 is described below
commit 2e520895550380e06a6e4938e0a359d18daf6609
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Fri Jan 8 12:01:51 2021 +0100
Ensure the DLQ configuration from Kafka Connect is correctly handled (issue #835)
---
.../apache/camel/kafkaconnector/CamelSinkTask.java | 14 ++-
.../common/BasicConnectorPropertyFactory.java | 15 +++
.../sjms2/sink/CamelSinkWithDLQJMSITCase.java | 127 +++++++++++++++++++++
3 files changed, 155 insertions(+), 1 deletion(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 82f2f05..e44676b 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -35,6 +35,7 @@ import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
@@ -51,6 +52,7 @@ public class CamelSinkTask extends SinkTask {
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkTask.class);
private static final String LOCAL_URL = "direct:start";
+ private ErrantRecordReporter reporter;
private CamelKafkaConnectMain cms;
@@ -70,6 +72,10 @@ public class CamelSinkTask extends SinkTask {
Map<String, String> actualProps = TaskHelper.combineDefaultAndLoadedProperties(getDefaultConfig(), props);
CamelSinkConnectorConfig config = getCamelSinkConnectorConfig(actualProps);
+ if (context != null) {
+ reporter = context.errantRecordReporter();
+ }
+
try {
String levelStr = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
loggingLevel = LoggingLevel.valueOf(levelStr.toUpperCase());
@@ -175,7 +181,13 @@ public class CamelSinkTask extends SinkTask {
producer.send(localEndpoint, exchange);
if (exchange.isFailed()) {
- throw new ConnectException("Exchange delivery has failed!", exchange.getException());
+ if (reporter == null) {
+ LOG.warn("A delivery has failed and the error reporting is NOT enabled. Records may be lost or ignored");
+ throw new ConnectException("Exchange delivery has failed!", exchange.getException());
+ }
+
+ LOG.warn("A delivery has failed and the error reporting is enabled. Sending record to the DLQ");
+ reporter.report(record, exchange.getException());
}
}
}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java
index d5dd4f6..cee96b9 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java
@@ -63,6 +63,21 @@ public abstract class BasicConnectorPropertyFactory<T extends BasicConnectorProp
return (T) this;
}
+ /**
+ * This enables sending failed records to the DLQ. Note: it automatically configure other required/recommended
+ * options!
+ * @param topicName the DLQ topic name
+ * @return this object instance
+ */
+ public T withDeadLetterQueueTopicName(String topicName) {
+ // There's no constant for the DLQ settings
+ connectorProps.put("errors.deadletterqueue.topic.name", topicName);
+ connectorProps.put("errors.deadletterqueue.topic.replication.factor", 1);
+ connectorProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, true);
+
+ return (T) this;
+ }
+
public TransformsConfigBuilder<T> withTransformsConfig(String name) {
return new TransformsConfigBuilder<>((T) this, getProperties(), name);
}
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkWithDLQJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkWithDLQJMSITCase.java
new file mode 100644
index 0000000..d47e4c8
--- /dev/null
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkWithDLQJMSITCase.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.sjms2.sink;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Integration tests for the JMS sink with a DLQ configuration. This test forces a failure in the sink connector to
+ * ensure that the failed records are added to the DLQ configured in Kafka.
+ */
+@Testcontainers
+public class CamelSinkWithDLQJMSITCase extends AbstractKafkaTest {
+ private static final Logger LOG = LoggerFactory.getLogger(CamelSinkWithDLQJMSITCase.class);
+
+ private int received;
+ private final int expect = 10;
+ private int errors;
+ private final int expectedErrors = 1;
+
+ private Properties connectionProperties() {
+ Properties properties = new Properties();
+
+ properties.put("camel.component.sjms2.connection-factory", "#class:org.apache.qpid.jms.JmsConnectionFactory");
+ properties.put("camel.component.sjms2.connection-factory.remoteURI", "invalid");
+
+ return properties;
+ }
+
+ @Override
+ protected String[] getConnectorsInTest() {
+ return new String[] {"camel-sjms2-kafka-connector"};
+ }
+
+ @BeforeEach
+ public void setUp() {
+ received = 0;
+ errors = 0;
+ }
+
+ private <T> boolean checkDqlRecord(ConsumerRecord<String, T> record) {
+ LOG.debug("Received: {}", record.value());
+ errors++;
+
+ if (errors >= expectedErrors) {
+ return false;
+ }
+
+ return true;
+ }
+
+ private void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+ connectorPropertyFactory.log();
+ getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+
+ LOG.debug("Creating the consumer ...");
+
+
+ KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+ for (int i = 0; i < expect; i++) {
+ kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i);
+ }
+
+ LOG.debug("Created the consumer ... About to receive messages");
+ }
+
+
+ @Test
+ @Timeout(10)
+ public void testSendReceiveWithError() {
+ try {
+ Properties brokenProp = connectionProperties();
+
+ brokenProp.put("camel.component.sjms2.connection-factory.remoteURI", "invalid");
+
+ ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
+ .basic()
+ .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+ .withConnectionProperties(brokenProp)
+ .withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE)
+ .withDeadLetterQueueTopicName("dlq-sink-topic");
+
+ runTest(connectorPropertyFactory);
+
+ KafkaClient<String, Integer> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+ kafkaClient.consume("dlq-sink-topic", this::checkDqlRecord);
+
+ assertEquals(expectedErrors, errors, "Didn't process the expected amount of messages");
+
+ } catch (Exception e) {
+ LOG.error("JMS test failed: {}", e.getMessage(), e);
+ fail(e.getMessage());
+ }
+ }
+}