You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/05/25 11:58:27 UTC

[pulsar] branch master updated: [fix][elasticsearch-sink] Handle Avro collections native types (GenericData.Array and Utf8 map keys) (#15430)

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b7e15103789 [fix][elasticsearch-sink] Handle Avro collections native types (GenericData.Array and Utf8 map keys) (#15430)
b7e15103789 is described below

commit b7e15103789acd36699bc4ab551677d3e204dfcb
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Wed May 25 13:58:21 2022 +0200

    [fix][elasticsearch-sink] Handle Avro collections native types (GenericData.Array and Utf8 map keys) (#15430)
---
 .../pulsar/io/elasticsearch/JsonConverter.java     | 18 ++++++++++++-----
 .../io/elasticsearch/JsonConverterTests.java       | 12 +++++++++++
 .../io/sinks/ElasticSearchSinkTester.java          | 23 ++++++++++++++++++++--
 3 files changed, 46 insertions(+), 7 deletions(-)

diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/JsonConverter.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/JsonConverter.java
index a7d15914044..d15f5ccb32f 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/JsonConverter.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/JsonConverter.java
@@ -35,9 +35,9 @@ import org.apache.avro.Conversion;
 import org.apache.avro.Conversions;
 import org.apache.avro.Schema;
 import org.apache.avro.data.TimeConversions;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
-
 /**
  * Convert an AVRO GenericRecord to a JsonNode.
  */
@@ -87,18 +87,26 @@ public class JsonConverter {
             case ARRAY: {
                 Schema elementSchema = schema.getElementType();
                 ArrayNode arrayNode = jsonNodeFactory.arrayNode();
-                for (Object elem : (Object[]) value) {
+                Object[] iterable;
+                if (value instanceof GenericData.Array) {
+                    iterable = ((GenericData.Array) value).toArray();
+                } else {
+                    iterable = (Object[]) value;
+                }
+                for (Object elem : iterable) {
                     JsonNode fieldValue = toJson(elementSchema, elem);
                     arrayNode.add(fieldValue);
                 }
                 return arrayNode;
             }
             case MAP: {
-                Map<String, Object> map = (Map<String, Object>) value;
+                Map<Object, Object> map = (Map<Object, Object>) value;
                 ObjectNode objectNode = jsonNodeFactory.objectNode();
-                for (Map.Entry<String, Object> entry : map.entrySet()) {
+                for (Map.Entry<Object, Object> entry : map.entrySet()) {
                     JsonNode jsonNode = toJson(schema.getValueType(), entry.getValue());
-                    objectNode.set(entry.getKey(), jsonNode);
+                    // can be a String or org.apache.avro.util.Utf8
+                    final String entryKey = entry.getKey() == null ? null : entry.getKey().toString();
+                    objectNode.set(entryKey, jsonNode);
                 }
                 return objectNode;
             }
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/JsonConverterTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/JsonConverterTests.java
index fb2740716a0..322891c7be0 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/JsonConverterTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/JsonConverterTests.java
@@ -36,6 +36,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.Calendar;
 import java.util.GregorianCalendar;
 import java.util.TimeZone;
@@ -48,6 +49,8 @@ public class JsonConverterTests {
 
     @Test
     public void testAvroToJson() throws IOException {
+        Schema avroArraySchema = SchemaBuilder.array().items(SchemaBuilder.builder().stringType());
+        Schema mapUtf8Schema = SchemaBuilder.map().values(SchemaBuilder.builder().intType());
         Schema schema = SchemaBuilder.record("record").fields()
                 .name("n").type().longType().longDefault(10)
                 .name("l").type().longType().longDefault(10)
@@ -60,7 +63,9 @@ public class JsonConverterTests {
                 .name("fi").type().fixed("fi").size(3).fixedDefault(new byte[]{1,2,3})
                 .name("en").type().enumeration("en").symbols("a","b","c").enumDefault("b")
                 .name("array").type().optional().array().items(SchemaBuilder.builder().stringType())
+                .name("arrayavro").type().optional().array().items(SchemaBuilder.builder().stringType())
                 .name("map").type().optional().map().values(SchemaBuilder.builder().intType())
+                .name("maputf8").type().optional().map().values(SchemaBuilder.builder().intType())
                 .endRecord();
         GenericRecord genericRecord = new GenericData.Record(schema);
         genericRecord.put("n", null);
@@ -74,7 +79,9 @@ public class JsonConverterTests {
         genericRecord.put("fi", GenericData.get().createFixed(null, new byte[]{'a','b','c'}, schema.getField("fi").schema()));
         genericRecord.put("en", GenericData.get().createEnum("b", schema.getField("en").schema()));
         genericRecord.put("array", new String[] {"toto"});
+        genericRecord.put("arrayavro", new GenericData.Array<>(avroArraySchema, Arrays.asList("toto")));
         genericRecord.put("map", ImmutableMap.of("a",10));
+        genericRecord.put("maputf8", ImmutableMap.of(new org.apache.avro.util.Utf8("a"),10));
         JsonNode jsonNode = JsonConverter.toJson(genericRecord);
         assertEquals(jsonNode.get("n"), NullNode.getInstance());
         assertEquals(jsonNode.get("l").asLong(), 1L);
@@ -88,9 +95,14 @@ public class JsonConverterTests {
         assertEquals(jsonNode.get("s").asText(), "toto");
         assertTrue(jsonNode.get("array").isArray());
         assertEquals(jsonNode.get("array").iterator().next().asText(), "toto");
+        assertTrue(jsonNode.get("arrayavro").isArray());
+        assertEquals(jsonNode.get("arrayavro").iterator().next().asText(), "toto");
         assertTrue(jsonNode.get("map").isObject());
         assertEquals(jsonNode.get("map").elements().next().asText(), "10");
         assertEquals(jsonNode.get("map").get("a").numberValue(), 10);
+        assertTrue(jsonNode.get("maputf8").isObject());
+        assertEquals(jsonNode.get("maputf8").elements().next().asText(), "10");
+        assertEquals(jsonNode.get("maputf8").get("a").numberValue(), 10);
     }
 
     @Test
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
index 9b37a1989e3..07c5c6e9b12 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
@@ -20,9 +20,13 @@ package org.apache.pulsar.tests.integration.io.sinks;
 
 import static org.testng.Assert.assertTrue;
 
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import co.elastic.clients.elasticsearch.ElasticsearchClient;
 import co.elastic.clients.elasticsearch.core.SearchRequest;
@@ -30,6 +34,7 @@ import co.elastic.clients.elasticsearch.core.SearchResponse;
 import co.elastic.clients.json.jackson.JacksonJsonpMapper;
 import co.elastic.clients.transport.ElasticsearchTransport;
 import co.elastic.clients.transport.rest_client.RestClientTransport;
+import com.google.common.collect.ImmutableMap;
 import lombok.AllArgsConstructor;
 import lombok.Cleanup;
 import lombok.Data;
@@ -60,6 +65,9 @@ public abstract class ElasticSearchSinkTester extends SinkTester<ElasticsearchCo
     public static final class SimplePojo {
         private String field1;
         private String field2;
+        private List<Integer> list1;
+        private Set<Long> set1;
+        private Map<String, String> map1;
     }
 
     /**
@@ -128,9 +136,20 @@ public abstract class ElasticSearchSinkTester extends SinkTester<ElasticsearchCo
             for (int i = 0; i < numMessages; i++) {
                 String key = "key-" + i;
                 kvs.put(key, key);
+                final SimplePojo keyPojo = new SimplePojo(
+                        "f1_" + i,
+                        "f2_" + i,
+                        Arrays.asList(i, i +1),
+                        new HashSet<>(Arrays.asList((long) i)),
+                        ImmutableMap.of("map1_k_" + i, "map1_kv_" + i));
+                final SimplePojo valuePojo = new SimplePojo(
+                        "f1_" + i,
+                        "f2_" + i,
+                        Arrays.asList(i, i +1),
+                        new HashSet<>(Arrays.asList((long) i)),
+                        ImmutableMap.of("map1_v_" + i, "map1_vv_" + i));
                 producer.newMessage()
-                        .value(new KeyValue<>(new SimplePojo("f1_" + i, "f2_" + i),
-                                new SimplePojo("v1_" + i, "v2_" + i)))
+                        .value(new KeyValue<>(keyPojo, valuePojo))
                         .send();
             }