You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/12/02 11:32:59 UTC

[camel] branch main updated (a53b483 -> 5a7ddc9)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from a53b483  CAMEL-17274: camel-core - Properties component should allow reloading properties
     new 5ba7f79  Polished
     new 5a7ddc9  CAMEL-16521: camel-kafka - The OVERRIDE_TIMESTAMP header can now be string or other types that are automatic converted to long type.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/camel/component/kafka/KafkaProducer.java |  4 ++--
 .../kafka/consumer/support/KafkaConsumerResumeStrategy.java  |  1 -
 .../kafka/consumer/support/KafkaRecordProcessor.java         |  1 -
 .../consumer/support/OffsetKafkaConsumerResumeStrategy.java  |  1 -
 .../kafka/consumer/support/PartitionAssignmentListener.java  |  2 +-
 .../kafka/consumer/support/ResumeStrategyFactory.java        |  3 +--
 .../support/SeekPolicyKafkaConsumerResumeStrategy.java       |  1 -
 .../component/kafka/producer/support/DelegatingCallback.java |  1 -
 .../kafka/producer/support/KafkaProducerCallBack.java        |  1 -
 .../producer/support/KafkaProducerMetadataCallBack.java      |  1 -
 .../kafka/producer/support/KeyValueHolderIterator.java       | 12 +++++-------
 .../camel/component/kafka/producer/support/ProducerUtil.java |  2 --
 .../apache/camel/component/kafka/serde/KafkaSerdeHelper.java |  5 +----
 .../org/apache/camel/component/kafka/KafkaProducerTest.java  |  6 ++++--
 14 files changed, 14 insertions(+), 27 deletions(-)

[camel] 01/02: Polished

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 5ba7f79b5fc1f317b33b254707f2d6bded501a41
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Dec 2 12:18:03 2021 +0100

    Polished
---
 .../kafka/consumer/support/KafkaConsumerResumeStrategy.java          | 1 -
 .../camel/component/kafka/consumer/support/KafkaRecordProcessor.java | 1 -
 .../kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java    | 1 -
 .../kafka/consumer/support/PartitionAssignmentListener.java          | 2 +-
 .../component/kafka/consumer/support/ResumeStrategyFactory.java      | 3 +--
 .../consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java      | 1 -
 .../camel/component/kafka/producer/support/DelegatingCallback.java   | 1 -
 .../component/kafka/producer/support/KafkaProducerCallBack.java      | 1 -
 .../kafka/producer/support/KafkaProducerMetadataCallBack.java        | 1 -
 .../component/kafka/producer/support/KeyValueHolderIterator.java     | 1 -
 .../apache/camel/component/kafka/producer/support/ProducerUtil.java  | 2 --
 .../org/apache/camel/component/kafka/serde/KafkaSerdeHelper.java     | 5 +----
 12 files changed, 3 insertions(+), 17 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
index 0deba22..b104bbf 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.consumer.support;
 
 import org.apache.camel.ResumeStrategy;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index 320cbfe..d1f7f76 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.consumer.support;
 
 import java.time.Duration;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
index 5e690fa..cfe855c 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.consumer.support;
 
 import java.util.Set;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
index 1ffbd2f..07d914a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
@@ -51,7 +51,7 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener {
         this.lastProcessedOffset = lastProcessedOffset;
         this.stopStateSupplier = stopStateSupplier;
 
-        resumeStrategy = ResumeStrategyFactory.newResumeStrategy(configuration);
+        this.resumeStrategy = ResumeStrategyFactory.newResumeStrategy(configuration);
     }
 
     @Override
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
index 09f304b..bd94ccc 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.consumer.support;
 
 import org.apache.camel.component.kafka.KafkaConfiguration;
@@ -24,6 +23,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public final class ResumeStrategyFactory {
+
     /**
      * A NO-OP resume strategy that does nothing (i.e.: no resume)
      */
@@ -42,7 +42,6 @@ public final class ResumeStrategyFactory {
     }
 
     public static KafkaConsumerResumeStrategy newResumeStrategy(KafkaConfiguration configuration) {
-
         if (configuration.getResumeStrategy() != null) {
             return configuration.getResumeStrategy();
         }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
index 1eba302..573d0b1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.consumer.support;
 
 import org.apache.kafka.clients.consumer.Consumer;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java
index cc71667..f1a830e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.producer.support;
 
 import org.apache.kafka.clients.producer.Callback;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java
index 32d0602..09b6a3d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.producer.support;
 
 import java.util.ArrayList;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerMetadataCallBack.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerMetadataCallBack.java
index a199881..37db490 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerMetadataCallBack.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerMetadataCallBack.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.producer.support;
 
 import org.apache.kafka.clients.producer.Callback;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java
index b3ee5e4..b46983a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.producer.support;
 
 import java.util.Iterator;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java
index 5b0499b..ba15dc3 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.producer.support;
 
 import java.nio.ByteBuffer;
@@ -30,7 +29,6 @@ import org.apache.kafka.common.utils.Bytes;
 public final class ProducerUtil {
 
     private ProducerUtil() {
-
     }
 
     public static Object tryConvertToSerializedType(Exchange exchange, Object object, String valueSerializer) {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/KafkaSerdeHelper.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/KafkaSerdeHelper.java
index 645da01..55b026a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/KafkaSerdeHelper.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/KafkaSerdeHelper.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.serde;
 
 import java.math.BigInteger;
@@ -24,8 +23,8 @@ import org.apache.camel.support.ExpressionAdapter;
 import org.apache.camel.support.builder.ValueBuilder;
 
 public final class KafkaSerdeHelper {
-    private KafkaSerdeHelper() {
 
+    private KafkaSerdeHelper() {
     }
 
     public static ValueBuilder numericHeader(String name) {
@@ -33,9 +32,7 @@ public final class KafkaSerdeHelper {
             @Override
             public Object evaluate(Exchange exchange) {
                 byte[] id = exchange.getIn().getHeader(name, byte[].class);
-
                 BigInteger bi = new BigInteger(id);
-
                 return String.valueOf(bi.longValue());
             }
         });

[camel] 02/02: CAMEL-16521: camel-kafka - The OVERRIDE_TIMESTAMP header can now be string or other types that are automatic converted to long type.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 5a7ddc9563afe86a0e3e1e18e9295f1a632cbcb7
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Dec 2 12:32:19 2021 +0100

    CAMEL-16521: camel-kafka - The OVERRIDE_TIMESTAMP header can now be string or other types that are automatic converted to long type.
---
 .../java/org/apache/camel/component/kafka/KafkaProducer.java  |  4 ++--
 .../kafka/producer/support/KeyValueHolderIterator.java        | 11 +++++------
 .../org/apache/camel/component/kafka/KafkaProducerTest.java   |  6 ++++--
 3 files changed, 11 insertions(+), 10 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 97a6e6d..017645e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -176,9 +176,9 @@ public class KafkaProducer extends DefaultAsyncProducer {
 
         Long timeStamp = null;
         Object overrideTimeStamp = message.removeHeader(KafkaConstants.OVERRIDE_TIMESTAMP);
-        if (overrideTimeStamp instanceof Long) {
+        if (overrideTimeStamp != null) {
+            timeStamp = exchange.getContext().getTypeConverter().convertTo(Long.class, exchange, overrideTimeStamp);
             LOG.debug("Using override TimeStamp: {}", overrideTimeStamp);
-            timeStamp = (Long) overrideTimeStamp;
         }
 
         // extracting headers which need to be propagated
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java
index b46983a..8e37321 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java
@@ -107,13 +107,12 @@ public class KeyValueHolderIterator implements Iterator<KeyValueHolder<Object, P
     }
 
     private Long getOverrideTimestamp(Message innerMessage) {
-        Object objTimestamp = innerMessage.getHeader(KafkaConstants.OVERRIDE_TIMESTAMP);
-
-        if (hasValidTimestampHeader(objTimestamp)) {
-            return (Long) innerMessage.removeHeader(KafkaConstants.OVERRIDE_TIMESTAMP);
+        Long timeStamp = null;
+        Object overrideTimeStamp = innerMessage.removeHeader(KafkaConstants.OVERRIDE_TIMESTAMP);
+        if (overrideTimeStamp != null) {
+            timeStamp = exchange.getContext().getTypeConverter().convertTo(Long.class, exchange, overrideTimeStamp);
         }
-
-        return null;
+        return timeStamp;
     }
 
     private String getInnerTopic(Message innerMessage) {
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 9495020..ec2750d 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -222,8 +222,10 @@ public class KafkaProducerTest {
         in.setHeader(KafkaConstants.PARTITION_KEY, 4);
         in.setHeader(KafkaConstants.OVERRIDE_TOPIC, "anotherTopic");
         in.setHeader(KafkaConstants.KEY, "someKey");
-        in.setHeader(KafkaConstants.OVERRIDE_TIMESTAMP,
-                LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
+
+        // test using a string value instead of long
+        String time = "" + LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
+        in.setHeader(KafkaConstants.OVERRIDE_TIMESTAMP, time);
 
         producer.process(exchange);