You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/10/08 12:51:58 UTC
[camel-kafka-connector] branch master updated: core: cast from
Byte[] to byte[] fails because of incompatible types
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new a57564e core: cast from Byte[] to byte[] fails because of incompatible types
a57564e is described below
commit a57564ea589fd345a73d44d5f483d4901d1a09a2
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Thu Oct 8 13:59:59 2020 +0200
core: cast from Byte[] to byte[] fails because of incompatible types
---
.../camel/kafkaconnector/CamelSourceTask.java | 9 ++++++-
.../camel/kafkaconnector/CamelSourceTaskTest.java | 28 ++++++++++++++++++++++
2 files changed, 36 insertions(+), 1 deletion(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 96a7ad4..724ddd7 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -229,7 +229,14 @@ public class CamelSourceTask extends SourceTask {
} else if (value instanceof Byte) {
record.headers().addByte(keyCamelHeader, (byte)value);
} else if (value instanceof Byte[]) {
- record.headers().addBytes(keyCamelHeader, (byte[])value);
+ final Byte[] array = (Byte[])value;
+ final byte[] bytes = new byte[array.length];
+
+ for (int i = 0; i < array.length; i++) {
+ bytes[i] = array[i];
+ }
+
+ record.headers().addBytes(keyCamelHeader, bytes);
} else if (value instanceof Time) {
record.headers().addTime(keyCamelHeader, (Time)value);
} else if (value instanceof Timestamp) {
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 4a7b5b1..25088cf 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -301,6 +301,34 @@ public class CamelSourceTaskTest {
}
@Test
+ public void testSourceByteArrayHeader() {
+ CamelSourceTask sourceTask = new CamelSourceTask();
+ sourceTask.start(mapOf(
+ CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
+ CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "direct",
+ CamelSourceTask.getCamelSourcePathConfigPrefix() + "name", "start"
+ ));
+
+ sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "test", "byteArray", new Byte[] {
+ 1, 2
+ });
+
+ try {
+ List<SourceRecord> results = sourceTask.poll();
+ assertThat(results).hasSize(1);
+
+ Header header = results.get(0).headers().allWithName(CamelSourceTask.HEADER_CAMEL_PREFIX + "byteArray").next();
+
+ assertThat(header.schema().type()).isEqualTo(Schema.Type.BYTES);
+ assertThat(header.value()).isInstanceOfSatisfying(byte[].class, b -> {
+ assertThat(b).contains(1, 2);
+ });
+ } finally {
+ sourceTask.stop();
+ }
+ }
+
+ @Test
public void testSourcePollingWithAggregationBySize() {
final int size = 10;
final int chunkSize = 5;