You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/10/08 12:51:58 UTC

[camel-kafka-connector] branch master updated: core: cast from Byte[] to byte[] fails because of incompatible types

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new a57564e  core: cast from Byte[] to byte[] fails because of incompatible types
a57564e is described below

commit a57564ea589fd345a73d44d5f483d4901d1a09a2
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Thu Oct 8 13:59:59 2020 +0200

    core: cast from Byte[] to byte[] fails because of incompatible types
---
 .../camel/kafkaconnector/CamelSourceTask.java      |  9 ++++++-
 .../camel/kafkaconnector/CamelSourceTaskTest.java  | 28 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 96a7ad4..724ddd7 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -229,7 +229,14 @@ public class CamelSourceTask extends SourceTask {
             } else if (value instanceof Byte) {
                 record.headers().addByte(keyCamelHeader, (byte)value);
             } else if (value instanceof Byte[]) {
-                record.headers().addBytes(keyCamelHeader, (byte[])value);
+                final Byte[] array = (Byte[])value;
+                final byte[] bytes = new byte[array.length];
+
+                for (int i = 0; i < array.length; i++) {
+                    bytes[i] = array[i];
+                }
+
+                record.headers().addBytes(keyCamelHeader, bytes);
             } else if (value instanceof Time) {
                 record.headers().addTime(keyCamelHeader, (Time)value);
             } else if (value instanceof Timestamp) {
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 4a7b5b1..25088cf 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -301,6 +301,34 @@ public class CamelSourceTaskTest {
     }
 
     @Test
+    public void testSourceByteArrayHeader() {
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(mapOf(
+            CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
+            CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "direct",
+            CamelSourceTask.getCamelSourcePathConfigPrefix() + "name", "start"
+        ));
+
+        sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "test", "byteArray", new Byte[] {
+            1, 2
+        });
+
+        try {
+            List<SourceRecord> results = sourceTask.poll();
+            assertThat(results).hasSize(1);
+
+            Header header = results.get(0).headers().allWithName(CamelSourceTask.HEADER_CAMEL_PREFIX + "byteArray").next();
+
+            assertThat(header.schema().type()).isEqualTo(Schema.Type.BYTES);
+            assertThat(header.value()).isInstanceOfSatisfying(byte[].class, b -> {
+                assertThat(b).contains(1, 2);
+            });
+        } finally {
+            sourceTask.stop();
+        }
+    }
+
+    @Test
     public void testSourcePollingWithAggregationBySize() {
         final int size = 10;
         final int chunkSize = 5;