You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/07/11 04:25:57 UTC

[camel] branch main updated: CAMEL-19444: fixed a few grammar errors on documentation and code comments in camel-kafka (#10647)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 9fa01bb716d CAMEL-19444: fixed a few grammar errors on documentation and code comments in camel-kafka (#10647)
9fa01bb716d is described below

commit 9fa01bb716d2391500c2b74a36c39ee51e08c18e
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Tue Jul 11 06:25:51 2023 +0200

    CAMEL-19444: fixed a few grammar errors on documentation and code comments in camel-kafka (#10647)
---
 .../camel-kafka/src/main/docs/kafka-component.adoc | 41 +++++++++++-----------
 .../camel/component/kafka/KafkaComponent.java      | 10 +++---
 .../camel/component/kafka/KafkaConfiguration.java  | 24 ++++++-------
 .../consumer/errorhandler/BridgeErrorStrategy.java |  2 +-
 .../errorhandler/DiscardErrorStrategy.java         |  2 +-
 .../KafkaConsumerAsyncManualCommitIT.java          |  2 +-
 .../integration/KafkaConsumerBatchSizeIT.java      |  2 +-
 7 files changed, 41 insertions(+), 42 deletions(-)

diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 8405f58461a..0f49a0264ff 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -60,26 +60,25 @@ http://kafka.apache.org/documentation.html#producerconfigs[http://kafka.apache.o
 include::partial$component-endpoint-headers.adoc[]
 // component headers: END
 
-If you want to send a message to a dynamic topic then use `KafkaConstants.OVERRIDE_TOPIC` as it is used as a one-time header
-that is not send along the message, as it is removed in the producer.
+If you want to send a message to a dynamic topic then use `KafkaConstants.OVERRIDE_TOPIC` as it is used as a one-time header that is not sent along the message, and actually is removed in the producer.
 
 == Consumer error handling
 
 While kafka consumer is polling messages from the kafka broker, then errors can happen. This section describes what happens and what
 you can configure.
 
-The consumer may throw exception when invoking the Kafka `poll` API. For example, if the message cannot be de-serialized due invalid data,
-and many other kind of errors. Those errors are in the form of `KafkaException` which are either _retryable_ or not. The exceptions
-which can be retried (`RetriableException`) will be retried again (with a poll timeout in between). All other kind of exceptions are
+The consumer may throw exception when invoking the Kafka `poll` API. For example, if the message cannot be deserialized due to invalid data,
+and many others kinds of errors. Those errors are in the form of `KafkaException` which are either _retryable_ or not. The exceptions
+which can be retried (`RetriableException`) will be retried again (with a poll timeout in between). All others kinds of exceptions are
 handled according to the _pollOnError_ configuration. This configuration has the following values:
 
-* DISCARD will discard the message and continue to poll next message.
-* ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message.
-* RECONNECT will re-connect the consumer and try poll the message again.
+* DISCARD will discard the message and continue to poll the next message.
+* ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll the next message.
+* RECONNECT will re-connect the consumer and try to poll the message again.
 * RETRY will let the consumer retry polling the same message again
-* STOP will stop the consumer (have to be manually started/restarted if the consumer should be able to consume messages again).
+* STOP will stop the consumer (it has to be manually started/restarted if the consumer should be able to consume messages again).
 
-The default is *ERROR_HANDLER* which will let Camel's error handler (if any configured) process the caused exception.
+The default is *ERROR_HANDLER*, which will let Camel's error handler (if any configured) process the caused exception.
 Afterwards continue to poll the next message. This behavior is similar to the _bridgeErrorHandler_ option that
 Camel components have.
 
@@ -102,7 +101,7 @@ from("kafka:test?brokers=localhost:9092")
     .log("    with the key ${headers[kafka.KEY]}")
 ----
 
-If you need to consume messages from multiple topics you can use a comma separated list of topic names.
+If you need to consume messages from multiple topics, you can use a comma separated list of topic names.
 
 [source,java]
 ----
@@ -126,8 +125,8 @@ from("kafka:test*?brokers=localhost:9092&topicIsPattern=true")
     .log("    with the key ${headers[kafka.KEY]}")
 ----
 
-When consuming messages from Kafka you can use your own offset management and not delegate this management to Kafka.
-In order to keep the offsets the component needs a `StateRepository` implementation such as `FileStateRepository`.
+When consuming messages from Kafka, you can use your own offset management and not delegate this management to Kafka.
+In order to keep the offsets, the component needs a `StateRepository` implementation such as `FileStateRepository`.
 This bean should be available in the registry.
 Here how to use it :
 
@@ -228,7 +227,7 @@ camelContext.addRoutes(new RouteBuilder() {
 
 The `camel-kafka` library provides a Kafka topic-based idempotent repository. This repository stores broadcasts all changes to idempotent state (add/remove) in a Kafka topic, and populates a local in-memory cache for each repository's process instance through event sourcing.
 The topic used must be unique per idempotent repository instance. The mechanism does not have any requirements about the number of topic partitions; as the repository consumes from all partitions at the same time. It also does not have any requirements about the replication factor of the topic.
-Each repository instance that uses the topic (e.g. typically on different machines running in parallel) controls its own consumer group, so in a cluster of 10 Camel processes using the same topic each will control its own offset.
+Each repository instance that uses the topic (e.g., typically on different machines running in parallel) controls its own consumer group, so in a cluster of 10 Camel processes using the same topic, each will control its own offset.
 On startup, the instance subscribes to the topic, rewinds the offset to the beginning and rebuilds the cache to the latest state. The cache will not be considered warmed up until one poll of `pollDurationMs` in length returns 0 records. Startup will not be completed until either the cache has warmed up, or 30 seconds go by; if the latter happens the idempotent repository may be in an inconsistent state until its consumer catches up to the end of the topic.
 Be mindful of the format of the header used for the uniqueness check. By default, it uses Strings as the data types. When using primitive numeric formats, the header must be deserialized accordingly. Check the samples below for examples.
 
@@ -243,7 +242,7 @@ A `KafkaIdempotentRepository` has the following properties:
 | maxCacheSize | How many of the most recently used keys should be stored in memory (default 1000).
 | pollDurationMs | The poll duration of the Kafka consumer. The local caches are updated immediately. This value will affect how far behind other peers that update their caches from the topic are relative to the idempotent consumer instance that sent the cache action message. The default value of this is 100 ms. +
 If setting this value explicitly, be aware that there is a tradeoff between the remote cache liveness and the volume of network traffic between this repository's consumer and the Kafka brokers. The cache warmup process also depends on there being one poll that fetches nothing - this indicates that the stream has been consumed up to the current point. If the poll duration is excessively long for the rate at which messages are sent on the topic, there exists a possibility that the cache ca [...]
-| groupId | The groupId to assign to the idempotent consumer. If not specified it will be randomize.
+| groupId | The groupId to assign to the idempotent consumer. If not specified it will be randomized.
 |===
 
 The repository can be instantiated by defining the `topic` and `bootstrapServers`, or the `producerConfig` and `consumerConfig` property sets can be explicitly defined to enable features such as SSL/SASL.
@@ -346,7 +345,7 @@ from(from).routeId("foo")
 By default, the Kafka consumer will use auto commit, where the offset will be committed automatically in the background using a given interval.
 
 In case you want to force manual commits, you can use `KafkaManualCommit` API from the Camel Exchange, stored on the message header.
-This requires to turn on manual commits by either setting the option `allowManualCommit` to `true` on the `KafkaComponent`
+This requires turning on manual commits by either setting the option `allowManualCommit` to `true` on the `KafkaComponent`
 or on the endpoint, for example:
 
 [source,java]
@@ -404,9 +403,9 @@ Producing flow backed by same behaviour - camel headers of particular exchange w
 
 Since kafka headers allows only `byte[]` values, in order camel exchange header to be propagated its value should be serialized to `bytes[]`,
 otherwise header will be skipped.
-Following header value types are supported: `String`, `Integer`, `Long`, `Double`, `Boolean`, `byte[]`.
+The following header value types are supported: `String`, `Integer`, `Long`, `Double`, `Boolean`, `byte[]`.
 Note: all headers propagated *from* kafka *to* camel exchange will contain `byte[]` value by default.
-In order to override default functionality uri parameters can be set: `headerDeserializer` for `from` route and `headerSerializer` for `to` route. Example:
+In order to override default functionality, these uri parameters can be set: `headerDeserializer` for `from` route and `headerSerializer` for `to` route. For example:
 
 [source,java]
 ----
@@ -426,7 +425,7 @@ from("kafka:my_topic?headerFilterStrategy=#myStrategy")
 .to("kafka:my_topic?headerFilterStrategy=#myStrategy")
 ----
 
-`myStrategy` object should be subclass of `HeaderFilterStrategy` and must be placed in the Camel registry, either manually or by registration as a bean in Spring/Blueprint, as it is `CamelContext` aware.
+`myStrategy` object should be a subclass of `HeaderFilterStrategy` and must be placed in the Camel registry, either manually or by registration as a bean in Spring/Blueprint, as it is `CamelContext` aware.
 
 == Kafka Transaction
 
@@ -438,7 +437,7 @@ from("direct:transaction")
 ----
 At the end of exchange routing, the kafka producer would commit the transaction or abort it if there is an Exception throwing or the exchange is `RollbackOnly`. Since Kafka does not support transactions in multi threads, it will throw `ProducerFencedException` if there is another producer with the same `transaction.id` to make the transactional request.
 
-It would work with JTA `camel-jta` by using `transacted()` and if it involves some resources (SQL or JMS) which supports XA, then they would work in tandem, where they both will either commit or rollback at the end of the exchange routing. In some cases, if the JTA transaction manager fails to commit (during the 2PC processing), but kafka transaction has been committed before and there is no chance to rollback the changes since the kafka transaction does not support JTA/XA spec. There is [...]
+It would work with JTA `camel-jta` by using `transacted()` and if it involves some resources (SQL or JMS) which supports XA, then they would work in tandem, where they both will either commit or rollback at the end of the exchange routing. In some cases, if the JTA transaction manager fails to commit (during the 2PC processing), but kafka transaction has been committed before and there is no chance to roll back the changes since the kafka transaction does not support JTA/XA spec. There i [...]
 
 == Setting Kerberos config file
 
@@ -449,4 +448,4 @@ static {
     KafkaComponent.setKerberosConfigLocation("path/to/config/file");
 }
 ----
-include::spring-boot:partial$starter.adoc[]
\ No newline at end of file
+include::spring-boot:partial$starter.adoc[]
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index 72a2ef51c17..2f2618939f7 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -171,12 +171,12 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame
      *
      * Error during creating the consumer may be fatal due to invalid configuration and as such recovery is not
      * possible. However, one part of the validation is DNS resolution of the bootstrap broker hostnames. This may be a
-     * temporary networking problem, and could potentially be recoverable. While other errors are fatal such as some
-     * invalid kafka configurations. Unfortunately kafka-client does not separate this kind of errors.
+     * temporary networking problem, and could potentially be recoverable. While other errors are fatal, such as some
+     * invalid kafka configurations. Unfortunately, kafka-client does not separate this kind of errors.
      *
      * Camel will by default retry forever, and therefore never give up. If you want to give up after many attempts then
-     * set this option and Camel will then when giving up terminate the consumer. You can manually restart the consumer
-     * by stopping and starting the route, to try again.
+     * set this option and Camel will then when giving up terminate the consumer. To try again, you can manually restart the consumer
+     * by stopping, and starting the route.
      */
     public void setCreateConsumerBackoffMaxAttempts(int createConsumerBackoffMaxAttempts) {
         this.createConsumerBackoffMaxAttempts = createConsumerBackoffMaxAttempts;
@@ -204,7 +204,7 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame
      * Error during subscribing the consumer to the kafka topic could be temporary errors due to network issues, and
      * could potentially be recoverable.
      *
-     * Camel will by default retry forever, and therefore never give up. If you want to give up after many attempts then
+     * Camel will by default retry forever, and therefore never give up. If you want to give up after many attempts, then
      * set this option and Camel will then when giving up terminate the consumer. You can manually restart the consumer
      * by stopping and starting the route, to try again.
      */
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 20686f13463..a695f2028fb 100755
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -824,7 +824,7 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware
      * <tt>false</tt> then the consumer continues to the next message and processes it. If the option is <tt>true</tt>
      * then the consumer breaks out, and will seek back to offset of the message that caused a failure, and then
      * re-attempt to process this message. However this can lead to endless processing of the same message if its bound
-     * to fail every time, eg a poison message. Therefore its recommended to deal with that for example by using Camel's
+     * to fail every time, eg a poison message. Therefore it is recommended to deal with that for example by using Camel's
      * error handler.
      */
     public void setBreakOnFirstError(boolean breakOnFirstError) {
@@ -889,7 +889,7 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware
 
     /**
      * Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been
-     * elected. Since leader election takes a bit of time, this property specifies the amount of time that the producer
+     * elected. Since the leader election takes a bit of time, this property specifies the amount of time that the producer
      * waits before refreshing the metadata.
      */
     public void setRetryBackoffMs(Integer retryBackoffMs) {
@@ -1220,8 +1220,8 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware
     }
 
     /**
-     * The location of the key store file. This is optional for client and can be used for two-way authentication for
-     * client.
+     * The location of the key store file. This is optional for the client and can be used for two-way authentication for
+     * the client.
      */
     public void setSslKeystoreLocation(String sslKeystoreLocation) {
         this.sslKeystoreLocation = sslKeystoreLocation;
@@ -1232,7 +1232,7 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware
     }
 
     /**
-     * The store password for the key store file. This is optional for client and only needed if sslKeystoreLocation' is
+     * The store password for the key store file. This is optional for the client and only needed if sslKeystoreLocation' is
      * configured. Key store password is not supported for PEM format.
      */
     public void setSslKeystorePassword(String sslKeystorePassword) {
@@ -1355,8 +1355,8 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware
     /**
      * The producer will attempt to batch records together into fewer requests whenever multiple records are being sent
      * to the same partition. This helps performance on both the client and the server. This configuration controls the
-     * default batch size in bytes. No attempt will be made to batch records larger than this size.Requests sent to
-     * brokers will contain multiple batches, one for each partition with data available to be sent.A small batch size
+     * default batch size in bytes. No attempt will be made to batch records larger than this size. Requests sent to
+     * brokers will contain multiple batches, one for each partition with data available to be sent. A small batch size
      * will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A
      * very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified
      * batch size in anticipation of additional records.
@@ -1399,7 +1399,7 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware
      * request. Normally this occurs only under load when records arrive faster than they can be sent out. However in
      * some circumstances the client may want to reduce the number of requests even under moderate load. This setting
      * accomplishes this by adding a small amount of artificial delay that is, rather than immediately sending out a
-     * record the producer will wait for up to the given delay to allow other records to be sent so that the sends can
+     * record the producer will wait for up to the given delay to allow other records to be sent so that they can
      * be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the
      * upper bound on the delay for batching: once we get batch.size worth of records for a partition it will be sent
      * immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this
@@ -1421,7 +1421,7 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware
      * timeout bounds the total time waiting for both metadata fetch and buffer allocation (blocking in the
      * user-supplied serializers or partitioner is not counted against this timeout). For partitionsFor() this timeout
      * bounds the time spent waiting for metadata if it is unavailable. The transaction-related methods always block,
-     * but may timeout if the transaction coordinator could not be discovered or did not respond within the timeout.
+     * but may time out if the transaction coordinator could not be discovered or did not respond within the timeout.
      */
     public void setMaxBlockMs(Integer maxBlockMs) {
         this.maxBlockMs = maxBlockMs;
@@ -1641,7 +1641,7 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware
     }
 
     /**
-     * Deserializer class for key that implements the Deserializer interface.
+     * Deserializer class for the key that implements the Deserializer interface.
      */
     public void setKeyDeserializer(String keyDeserializer) {
         this.keyDeserializer = keyDeserializer;
@@ -1663,8 +1663,8 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware
     }
 
     /**
-     * Set if KafkaConsumer will read from beginning or end on startup: SeekPolicy.BEGINNING: read from beginning.
-     * SeekPolicy.END: read from end.
+     * Set if KafkaConsumer will read from the beginning or the end on startup: SeekPolicy.BEGINNING: read from the beginning.
+     * SeekPolicy.END: read from the end.
      */
     public void setSeekTo(SeekPolicy seekTo) {
         this.seekTo = seekTo;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java
index f8b00c92923..098d8d10a36 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java
@@ -47,7 +47,7 @@ public class BridgeErrorStrategy implements PollExceptionStrategy {
 
         // use bridge error handler to route with exception
         recordFetcher.getBridge().handleException(exception);
-        // skip this poison message and seek to next message
+        // skip this poison message and seek to the next message
         SeekUtil.seekToNextOffset(consumer, partitionLastOffset);
 
         if (exception instanceof AuthenticationException || exception instanceof AuthorizationException) {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/DiscardErrorStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/DiscardErrorStrategy.java
index d5424003f00..46c61b92d9e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/DiscardErrorStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/DiscardErrorStrategy.java
@@ -39,7 +39,7 @@ public class DiscardErrorStrategy implements PollExceptionStrategy {
     public void handle(long partitionLastOffset, Exception exception) {
         LOG.warn("Requesting the consumer to discard the message and continue to the next based on polling exception strategy");
 
-        // skip this poison message and seek to next message
+        // skip this poison message and seek to the next message
         SeekUtil.seekToNextOffset(consumer, partitionLastOffset);
     }
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
index 06e6b5ac1e2..a9b966bb961 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
@@ -159,7 +159,7 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo
         MockEndpoint to = contextExtension.getMockEndpoint(KafkaTestUtil.MOCK_RESULT);
 
         // Fourth step: We start again our route, since we have been committing the offsets from the first step,
-        // we will expect to consume from the latest committed offset i.e. from offset 5
+        // we will expect to consume from the latest committed offset (i.e., from offset 5)
         context.getRouteController().startRoute("foo");
 
         to.expectedMessageCount(3);
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBatchSizeIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBatchSizeIT.java
index 388a8b22d17..6be326622ce 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBatchSizeIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBatchSizeIT.java
@@ -64,7 +64,7 @@ public class KafkaConsumerBatchSizeIT extends BaseEmbeddedKafkaTestSupport {
     public void kafkaMessagesIsConsumedByCamel() throws Exception {
         MockEndpoint to = contextExtension.getMockEndpoint(KafkaTestUtil.MOCK_RESULT);
 
-        // First 2 must not be committed since batch size is 3
+        // The first 2 must not be committed since batch size is 3
         to.expectedBodiesReceivedInAnyOrder("m1", "m2");
         for (int k = 1; k <= 2; k++) {
             String msg = "m" + k;