You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/05/25 11:58:27 UTC
[pulsar] branch master updated: [fix][elasticsearch-sink] Handle Avro collections native types (GenericData.Array and Utf8 map keys) (#15430)
This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b7e15103789 [fix][elasticsearch-sink] Handle Avro collections native types (GenericData.Array and Utf8 map keys) (#15430)
b7e15103789 is described below
commit b7e15103789acd36699bc4ab551677d3e204dfcb
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Wed May 25 13:58:21 2022 +0200
[fix][elasticsearch-sink] Handle Avro collections native types (GenericData.Array and Utf8 map keys) (#15430)
---
.../pulsar/io/elasticsearch/JsonConverter.java | 18 ++++++++++++-----
.../io/elasticsearch/JsonConverterTests.java | 12 +++++++++++
.../io/sinks/ElasticSearchSinkTester.java | 23 ++++++++++++++++++++--
3 files changed, 46 insertions(+), 7 deletions(-)
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/JsonConverter.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/JsonConverter.java
index a7d15914044..d15f5ccb32f 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/JsonConverter.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/JsonConverter.java
@@ -35,9 +35,9 @@ import org.apache.avro.Conversion;
import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.data.TimeConversions;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
-
/**
* Convert an AVRO GenericRecord to a JsonNode.
*/
@@ -87,18 +87,26 @@ public class JsonConverter {
case ARRAY: {
Schema elementSchema = schema.getElementType();
ArrayNode arrayNode = jsonNodeFactory.arrayNode();
- for (Object elem : (Object[]) value) {
+ Object[] iterable;
+ if (value instanceof GenericData.Array) {
+ iterable = ((GenericData.Array) value).toArray();
+ } else {
+ iterable = (Object[]) value;
+ }
+ for (Object elem : iterable) {
JsonNode fieldValue = toJson(elementSchema, elem);
arrayNode.add(fieldValue);
}
return arrayNode;
}
case MAP: {
- Map<String, Object> map = (Map<String, Object>) value;
+ Map<Object, Object> map = (Map<Object, Object>) value;
ObjectNode objectNode = jsonNodeFactory.objectNode();
- for (Map.Entry<String, Object> entry : map.entrySet()) {
+ for (Map.Entry<Object, Object> entry : map.entrySet()) {
JsonNode jsonNode = toJson(schema.getValueType(), entry.getValue());
- objectNode.set(entry.getKey(), jsonNode);
+ // can be a String or org.apache.avro.util.Utf8
+ final String entryKey = entry.getKey() == null ? null : entry.getKey().toString();
+ objectNode.set(entryKey, jsonNode);
}
return objectNode;
}
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/JsonConverterTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/JsonConverterTests.java
index fb2740716a0..322891c7be0 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/JsonConverterTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/JsonConverterTests.java
@@ -36,6 +36,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.TimeZone;
@@ -48,6 +49,8 @@ public class JsonConverterTests {
@Test
public void testAvroToJson() throws IOException {
+ Schema avroArraySchema = SchemaBuilder.array().items(SchemaBuilder.builder().stringType());
+ Schema mapUtf8Schema = SchemaBuilder.map().values(SchemaBuilder.builder().intType());
Schema schema = SchemaBuilder.record("record").fields()
.name("n").type().longType().longDefault(10)
.name("l").type().longType().longDefault(10)
@@ -60,7 +63,9 @@ public class JsonConverterTests {
.name("fi").type().fixed("fi").size(3).fixedDefault(new byte[]{1,2,3})
.name("en").type().enumeration("en").symbols("a","b","c").enumDefault("b")
.name("array").type().optional().array().items(SchemaBuilder.builder().stringType())
+ .name("arrayavro").type().optional().array().items(SchemaBuilder.builder().stringType())
.name("map").type().optional().map().values(SchemaBuilder.builder().intType())
+ .name("maputf8").type().optional().map().values(SchemaBuilder.builder().intType())
.endRecord();
GenericRecord genericRecord = new GenericData.Record(schema);
genericRecord.put("n", null);
@@ -74,7 +79,9 @@ public class JsonConverterTests {
genericRecord.put("fi", GenericData.get().createFixed(null, new byte[]{'a','b','c'}, schema.getField("fi").schema()));
genericRecord.put("en", GenericData.get().createEnum("b", schema.getField("en").schema()));
genericRecord.put("array", new String[] {"toto"});
+ genericRecord.put("arrayavro", new GenericData.Array<>(avroArraySchema, Arrays.asList("toto")));
genericRecord.put("map", ImmutableMap.of("a",10));
+ genericRecord.put("maputf8", ImmutableMap.of(new org.apache.avro.util.Utf8("a"),10));
JsonNode jsonNode = JsonConverter.toJson(genericRecord);
assertEquals(jsonNode.get("n"), NullNode.getInstance());
assertEquals(jsonNode.get("l").asLong(), 1L);
@@ -88,9 +95,14 @@ public class JsonConverterTests {
assertEquals(jsonNode.get("s").asText(), "toto");
assertTrue(jsonNode.get("array").isArray());
assertEquals(jsonNode.get("array").iterator().next().asText(), "toto");
+ assertTrue(jsonNode.get("arrayavro").isArray());
+ assertEquals(jsonNode.get("arrayavro").iterator().next().asText(), "toto");
assertTrue(jsonNode.get("map").isObject());
assertEquals(jsonNode.get("map").elements().next().asText(), "10");
assertEquals(jsonNode.get("map").get("a").numberValue(), 10);
+ assertTrue(jsonNode.get("maputf8").isObject());
+ assertEquals(jsonNode.get("maputf8").elements().next().asText(), "10");
+ assertEquals(jsonNode.get("maputf8").get("a").numberValue(), 10);
}
@Test
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
index 9b37a1989e3..07c5c6e9b12 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
@@ -20,9 +20,13 @@ package org.apache.pulsar.tests.integration.io.sinks;
import static org.testng.Assert.assertTrue;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.SearchRequest;
@@ -30,6 +34,7 @@ import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
+import com.google.common.collect.ImmutableMap;
import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.Data;
@@ -60,6 +65,9 @@ public abstract class ElasticSearchSinkTester extends SinkTester<ElasticsearchCo
public static final class SimplePojo {
private String field1;
private String field2;
+ private List<Integer> list1;
+ private Set<Long> set1;
+ private Map<String, String> map1;
}
/**
@@ -128,9 +136,20 @@ public abstract class ElasticSearchSinkTester extends SinkTester<ElasticsearchCo
for (int i = 0; i < numMessages; i++) {
String key = "key-" + i;
kvs.put(key, key);
+ final SimplePojo keyPojo = new SimplePojo(
+ "f1_" + i,
+ "f2_" + i,
+ Arrays.asList(i, i +1),
+ new HashSet<>(Arrays.asList((long) i)),
+ ImmutableMap.of("map1_k_" + i, "map1_kv_" + i));
+ final SimplePojo valuePojo = new SimplePojo(
+ "f1_" + i,
+ "f2_" + i,
+ Arrays.asList(i, i +1),
+ new HashSet<>(Arrays.asList((long) i)),
+ ImmutableMap.of("map1_v_" + i, "map1_vv_" + i));
producer.newMessage()
- .value(new KeyValue<>(new SimplePojo("f1_" + i, "f2_" + i),
- new SimplePojo("v1_" + i, "v2_" + i)))
+ .value(new KeyValue<>(keyPojo, valuePojo))
.send();
}