You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/10/01 10:27:02 UTC

[camel] branch master updated: CAMEL-13878: Regen and add a log when the consumer thread is terminating.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new dafff18  CAMEL-13878: Regen and add a log when the consumer thread is terminating.
dafff18 is described below

commit dafff18a787233f6a4ae8e64aab487f1e12ab465
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Oct 1 12:26:44 2019 +0200

    CAMEL-13878: Regen and add a log when the consumer thread is terminating.
---
 .../camel-kafka/src/main/docs/kafka-component.adoc |  6 +-
 .../camel/component/kafka/KafkaConsumer.java       |  2 +
 .../endpoint/dsl/KafkaEndpointBuilderFactory.java  | 88 +++-------------------
 .../springboot/KafkaComponentConfiguration.java    | 32 --------
 4 files changed, 16 insertions(+), 112 deletions(-)

diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index c459720..e4b83450 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -72,7 +72,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (97 parameters):
+=== Query Parameters (95 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -113,9 +113,7 @@ with the following path and query parameters:
 | *valueDeserializer* (consumer) | Deserializer class for value that implements the Deserializer interface. | org.apache.kafka.common.serialization.StringDeserializer | String
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
-| *bridgeEndpoint* (producer) | If the option is true, then KafkaProducer will ignore the KafkaConstants.TOPIC header setting of the inbound message. | false | boolean
 | *bufferMemorySize* (producer) | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will either block or throw an exception based on the preference specified by block.on.buffer.full.This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additio [...]
-| *circularTopicDetection* (producer) | If the option is true, then KafkaProducer will detect if the message is attempted to be sent back to the same topic it may come from, if the message was original from a kafka consumer. If the KafkaConstants.TOPIC header is the same as the original kafka consumer topic, then the header setting is ignored, and the topic of the producer endpoint is used. In other words this avoids sending the same message back to where it came from. This option is not [...]
 | *compressionCodec* (producer) | This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are none, gzip and snappy. | none | String
 | *connectionMaxIdleMs* (producer) | Close idle connections after the number of milliseconds specified by this config. | 540000 | Integer
 | *enableIdempotence* (producer) | If set to 'true' the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries may write duplicates of the retried message in the stream. If set to true this option will require max.in.flight.requests.per.connection to be set to 1 and retries cannot be zero and additionally acks must be set to 'all'. | false | boolean
@@ -171,9 +169,9 @@ with the following path and query parameters:
 | *sslProtocol* (security) | The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities. | TLS | String
 | *sslProvider* (security) | The name of the security provider used for SSL connections. Default value is the default security provider of the JVM. |  | String
 | *sslTrustmanagerAlgorithm* (security) | The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine. | PKIX | String
+| *sslTruststoreLocation* (security) | The location of the trust store file. |  | String
 | *sslTruststoreType* (security) | The file format of the trust store file. Default value is JKS. | JKS | String
 | *schemaRegistryURL* (confluent) | URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. This is known as schema.registry.url in the Confluent Platform documentation. This option is only available in the Confluent Platform (not standard Apache Kafka) |  | String
-| *sslTruststoreLocation* (security) | The location of the trust store file. |  | String
 | *sslTruststorePassword* (security) | The password for the trust store file. |  | String
 |===
 // endpoint options: END
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 7dcb4da..49f31a7 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -216,6 +216,8 @@ public class KafkaConsumer extends DefaultConsumer {
                 // re-connect
                 reConnect = doRun();
             }
+
+            log.info("Terminating KafkaConsumer thread: {} receiving from topic: {}", threadId, topicName);
         }
 
         void preInit() {
diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
index 6886165..db93d5c 100644
--- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
@@ -1456,32 +1456,6 @@ public interface KafkaEndpointBuilderFactory {
             return this;
         }
         /**
-         * If the option is true, then KafkaProducer will ignore the
-         * KafkaConstants.TOPIC header setting of the inbound message.
-         * 
-         * The option is a: <code>boolean</code> type.
-         * 
-         * Group: producer
-         */
-        default KafkaEndpointProducerBuilder bridgeEndpoint(
-                boolean bridgeEndpoint) {
-            doSetProperty("bridgeEndpoint", bridgeEndpoint);
-            return this;
-        }
-        /**
-         * If the option is true, then KafkaProducer will ignore the
-         * KafkaConstants.TOPIC header setting of the inbound message.
-         * 
-         * The option will be converted to a <code>boolean</code> type.
-         * 
-         * Group: producer
-         */
-        default KafkaEndpointProducerBuilder bridgeEndpoint(
-                String bridgeEndpoint) {
-            doSetProperty("bridgeEndpoint", bridgeEndpoint);
-            return this;
-        }
-        /**
          * The total bytes of memory the producer can use to buffer records
          * waiting to be sent to the server. If records are sent faster than
          * they can be delivered to the server the producer will either block or
@@ -1523,44 +1497,6 @@ public interface KafkaEndpointBuilderFactory {
             return this;
         }
         /**
-         * If the option is true, then KafkaProducer will detect if the message
-         * is attempted to be sent back to the same topic it may come from, if
-         * the message was original from a kafka consumer. If the
-         * KafkaConstants.TOPIC header is the same as the original kafka
-         * consumer topic, then the header setting is ignored, and the topic of
-         * the producer endpoint is used. In other words this avoids sending the
-         * same message back to where it came from. This option is not in use if
-         * the option bridgeEndpoint is set to true.
-         * 
-         * The option is a: <code>boolean</code> type.
-         * 
-         * Group: producer
-         */
-        default KafkaEndpointProducerBuilder circularTopicDetection(
-                boolean circularTopicDetection) {
-            doSetProperty("circularTopicDetection", circularTopicDetection);
-            return this;
-        }
-        /**
-         * If the option is true, then KafkaProducer will detect if the message
-         * is attempted to be sent back to the same topic it may come from, if
-         * the message was original from a kafka consumer. If the
-         * KafkaConstants.TOPIC header is the same as the original kafka
-         * consumer topic, then the header setting is ignored, and the topic of
-         * the producer endpoint is used. In other words this avoids sending the
-         * same message back to where it came from. This option is not in use if
-         * the option bridgeEndpoint is set to true.
-         * 
-         * The option will be converted to a <code>boolean</code> type.
-         * 
-         * Group: producer
-         */
-        default KafkaEndpointProducerBuilder circularTopicDetection(
-                String circularTopicDetection) {
-            doSetProperty("circularTopicDetection", circularTopicDetection);
-            return this;
-        }
-        /**
          * This parameter allows you to specify the compression codec for all
          * data generated by this producer. Valid values are none, gzip and
          * snappy.
@@ -2785,6 +2721,18 @@ public interface KafkaEndpointBuilderFactory {
             return this;
         }
         /**
+         * The location of the trust store file.
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: security
+         */
+        default KafkaEndpointProducerBuilder sslTruststoreLocation(
+                String sslTruststoreLocation) {
+            doSetProperty("sslTruststoreLocation", sslTruststoreLocation);
+            return this;
+        }
+        /**
          * The file format of the trust store file. Default value is JKS.
          * 
          * The option is a: <code>java.lang.String</code> type.
@@ -2813,18 +2761,6 @@ public interface KafkaEndpointBuilderFactory {
             return this;
         }
         /**
-         * The location of the trust store file.
-         * 
-         * The option is a: <code>java.lang.String</code> type.
-         * 
-         * Group: security
-         */
-        default KafkaEndpointProducerBuilder sslTruststoreLocation(
-                String sslTruststoreLocation) {
-            doSetProperty("sslTruststoreLocation", sslTruststoreLocation);
-            return this;
-        }
-        /**
          * The password for the trust store file.
          * 
          * The option is a: <code>java.lang.String</code> type.
diff --git a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index a662965..de958fb 100644
--- a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -181,22 +181,6 @@ public class KafkaComponentConfiguration
          */
         private String groupId;
         /**
-         * If the option is true, then KafkaProducer will ignore the
-         * KafkaConstants.TOPIC header setting of the inbound message.
-         */
-        private Boolean bridgeEndpoint = false;
-        /**
-         * If the option is true, then KafkaProducer will detect if the message
-         * is attempted to be sent back to the same topic it may come from, if
-         * the message was original from a kafka consumer. If the
-         * KafkaConstants.TOPIC header is the same as the original kafka
-         * consumer topic, then the header setting is ignored, and the topic of
-         * the producer endpoint is used. In other words this avoids sending the
-         * same message back to where it came from. This option is not in use if
-         * the option bridgeEndpoint is set to true.
-         */
-        private Boolean circularTopicDetection = true;
-        /**
          * The partitioner class for partitioning messages amongst sub-topics.
          * The default partitioner is based on the hash of the key.
          */
@@ -805,22 +789,6 @@ public class KafkaComponentConfiguration
             this.groupId = groupId;
         }
 
-        public Boolean getBridgeEndpoint() {
-            return bridgeEndpoint;
-        }
-
-        public void setBridgeEndpoint(Boolean bridgeEndpoint) {
-            this.bridgeEndpoint = bridgeEndpoint;
-        }
-
-        public Boolean getCircularTopicDetection() {
-            return circularTopicDetection;
-        }
-
-        public void setCircularTopicDetection(Boolean circularTopicDetection) {
-            this.circularTopicDetection = circularTopicDetection;
-        }
-
         public String getPartitioner() {
             return partitioner;
         }