You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/06/10 13:10:43 UTC

[pulsar] branch master updated: [feat][elasticsearch] Add hashed id support (#15428)

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 67224175bfb [feat][elasticsearch] Add hashed id support (#15428)
67224175bfb is described below

commit 67224175bfbdee52fb756e3cebf0ad59b3963321
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Fri Jun 10 15:10:34 2022 +0200

    [feat][elasticsearch] Add hashed id support (#15428)
    
    * [feat][elasticsearch] Add hashed id support
    
    * [feat][elasticsearch] Add hashed id support
    
    * add config default test
    
    * fix compile
---
 .../io/elasticsearch/ElasticSearchConfig.java      |  13 ++
 .../pulsar/io/elasticsearch/ElasticSearchSink.java |  25 ++++
 .../pulsar/io/elasticsearch/client/RestClient.java |   1 +
 .../elastic/ElasticSearchJavaRestClient.java       |  16 ++-
 .../opensearch/OpenSearchHighLevelRestClient.java  |  22 ++-
 .../io/elasticsearch/ElasticSearchConfigTests.java |   1 +
 .../io/elasticsearch/ElasticSearchSinkTests.java   | 159 +++++++++++++++++++--
 site2/docs/io-elasticsearch-sink.md                |   1 +
 8 files changed, 221 insertions(+), 17 deletions(-)

diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
index 3ae783ce41f..1dbfa0568ba 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
@@ -287,6 +287,13 @@ public class ElasticSearchConfig implements Serializable {
     )
     private boolean stripNonPrintableCharacters = true;
 
+    @FieldDoc(
+            defaultValue = "NONE",
+            help = "Hashing algorithm to use for the document id. This is useful in order to be compliant with "
+                    + "the ElasticSearch _id hard limit of 512 bytes."
+    )
+    private IdHashingAlgorithm idHashingAlgorithm = IdHashingAlgorithm.NONE;
+
     public enum MalformedDocAction {
         IGNORE,
         WARN,
@@ -306,6 +313,12 @@ public class ElasticSearchConfig implements Serializable {
         OPENSEARCH
     }
 
+    public enum IdHashingAlgorithm {
+        NONE,
+        SHA256,
+        SHA512
+    }
+
     public static ElasticSearchConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
         return mapper.readValue(new File(yamlFile), ElasticSearchConfig.class);
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
index 810ff6afb92..263dcc4ea6e 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
@@ -27,9 +27,12 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Base64;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -66,6 +69,7 @@ public class ElasticSearchSink implements Sink<GenericObject> {
     private ObjectMapper sortedObjectMapper;
     private List<String> primaryFields = null;
     private final Pattern nonPrintableCharactersPattern = Pattern.compile("[\\p{C}]");
+    private final Base64.Encoder base64Encoder = Base64.getEncoder().withoutPadding();
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
@@ -218,6 +222,27 @@ public class ElasticSearchSink implements Sink<GenericObject> {
                 }
             }
 
+            final ElasticSearchConfig.IdHashingAlgorithm idHashingAlgorithm =
+                    elasticSearchConfig.getIdHashingAlgorithm();
+            if (id != null
+                    && idHashingAlgorithm != null
+                    && idHashingAlgorithm != ElasticSearchConfig.IdHashingAlgorithm.NONE) {
+                Hasher hasher;
+                switch (idHashingAlgorithm) {
+                    case SHA256:
+                        hasher = Hashing.sha256().newHasher();
+                        break;
+                    case SHA512:
+                        hasher = Hashing.sha512().newHasher();
+                        break;
+                    default:
+                        throw new UnsupportedOperationException("Unsupported IdHashingAlgorithm: "
+                                + idHashingAlgorithm);
+                }
+                hasher.putString(id, StandardCharsets.UTF_8);
+                id = base64Encoder.encodeToString(hasher.hash().asBytes());
+            }
+
             if (log.isDebugEnabled()) {
                 SchemaType schemaType = null;
                 if (record.getSchema() != null && record.getSchema().getSchemaInfo() != null) {
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/RestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/RestClient.java
index 61aa86213eb..561358a3e5c 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/RestClient.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/RestClient.java
@@ -101,6 +101,7 @@ public abstract class RestClient implements Closeable {
     public abstract boolean deleteDocument(String index, String documentId) throws IOException;
 
     public abstract long totalHits(String index) throws IOException;
+    public abstract long totalHits(String index, String query) throws IOException;
 
     public abstract BulkProcessor getBulkProcessor();
 
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
index d4c7a5056d3..2166df22731 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
@@ -157,19 +157,29 @@ public class ElasticSearchJavaRestClient extends RestClient {
         }
     }
 
-    @VisibleForTesting
     public SearchResponse<Map> search(String indexName) throws IOException {
+        return search(indexName, "*:*");
+    }
+
+    @VisibleForTesting
+    public SearchResponse<Map> search(String indexName, String query) throws IOException {
         final RefreshRequest refreshRequest = new RefreshRequest.Builder().index(indexName).build();
         client.indices().refresh(refreshRequest);
 
+        query = query.replace("/", "\\/");
         return client.search(new SearchRequest.Builder().index(indexName)
-                .q("*:*")
+                .q(query)
                 .build(), Map.class);
     }
 
     @Override
     public long totalHits(String indexName) throws IOException {
-        final SearchResponse<Map> searchResponse = search(indexName);
+        return totalHits(indexName, "*:*");
+    }
+
+    @Override
+    public long totalHits(String indexName, String query) throws IOException {
+        final SearchResponse<Map> searchResponse = search(indexName, query);
         return searchResponse.hits().total().value();
     }
 
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
index 1a939774c4c..0a4629f24ad 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
@@ -56,6 +56,7 @@ import org.opensearch.common.unit.ByteSizeUnit;
 import org.opensearch.common.unit.ByteSizeValue;
 import org.opensearch.common.unit.TimeValue;
 import org.opensearch.common.xcontent.XContentType;
+import org.opensearch.index.query.QueryBuilder;
 import org.opensearch.index.query.QueryBuilders;
 import org.opensearch.search.builder.SearchSourceBuilder;
 
@@ -261,13 +262,32 @@ public class OpenSearchHighLevelRestClient extends RestClient implements BulkPro
         return search(indexName).getHits().getTotalHits().value;
     }
 
+    @Override
+    public long totalHits(String indexName, String query) throws IOException {
+        return search(indexName, query).getHits().getTotalHits().value;
+    }
+
     @VisibleForTesting
     public SearchResponse search(String indexName) throws IOException {
+        return search(indexName, "*:*");
+    }
+
+    @VisibleForTesting
+    public SearchResponse search(String indexName, String query) throws IOException {
         client.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
+        QueryBuilder queryBuilder;
+        if ("*:*".equals(query)) {
+            queryBuilder = QueryBuilders.matchAllQuery();
+        } else {
+            final String[] split = query.split(":");
+            final String name = split[0];
+            final String text = split[1];
+            queryBuilder = QueryBuilders.matchQuery(name, text);
+        }
         return client.search(
                 new SearchRequest()
                         .indices(indexName)
-                        .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
+                        .source(new SearchSourceBuilder().query(queryBuilder))  ,
                 RequestOptions.DEFAULT);
     }
     @Override
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java
index 8f076d9d8cc..463a712c34c 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java
@@ -109,6 +109,7 @@ public class ElasticSearchConfigTests {
         assertEquals(config.getSsl().getProtocols(), "TLSv1.2");
 
         assertEquals(config.getCompatibilityMode(), ElasticSearchConfig.CompatibilityMode.AUTO);
+        assertEquals(config.getIdHashingAlgorithm(), ElasticSearchConfig.IdHashingAlgorithm.NONE);
     }
 
     @Test
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
index 5c0651f3720..b7da0f8c0fd 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
@@ -25,6 +25,8 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaBuilder;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
@@ -38,12 +40,17 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 
 import java.nio.charset.StandardCharsets;
-import java.util.concurrent.TimeUnit;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
@@ -63,8 +70,10 @@ import org.testng.SkipException;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+
 import static org.testng.Assert.assertNull;
 
 public abstract class ElasticSearchSinkTests extends ElasticSearchTestBase {
@@ -254,14 +263,19 @@ public abstract class ElasticSearchSinkTests extends ElasticSearchTestBase {
         verify(mockRecord, times(1)).ack();
         assertEquals(sink.getElasticsearchClient().getRestClient().totalHits(index), 1L);
 
+        String value = getHitIdAtIndex(index, 0);
+        assertEquals(value, "bob");
+    }
+
+    private String getHitIdAtIndex(String indexName, int index) throws IOException {
         if (elasticImageName.equals(ELASTICSEARCH_8)) {
             final ElasticSearchJavaRestClient restClient = (ElasticSearchJavaRestClient)
                     sink.getElasticsearchClient().getRestClient();
-            assertEquals(restClient.search(index).hits().hits().get(0).id(), "bob");
+            return restClient.search(indexName).hits().hits().get(index).id();
         } else {
             final OpenSearchHighLevelRestClient restClient = (OpenSearchHighLevelRestClient)
                     sink.getElasticsearchClient().getRestClient();
-            assertEquals(restClient.search(index).getHits().getHits()[0].getId(), "bob");
+            return restClient.search(indexName).getHits().getHits()[0].getId();
         }
     }
 
@@ -275,15 +289,7 @@ public abstract class ElasticSearchSinkTests extends ElasticSearchTestBase {
         send(1);
         verify(mockRecord, times(1)).ack();
         assertEquals(sink.getElasticsearchClient().getRestClient().totalHits(index), 1L);
-        if (elasticImageName.equals(ELASTICSEARCH_8)) {
-            final ElasticSearchJavaRestClient restClient = (ElasticSearchJavaRestClient)
-                    sink.getElasticsearchClient().getRestClient();
-            assertEquals(restClient.search(index).hits().hits().get(0).id(), "[\"bob\",\"boby\"]");
-        } else {
-            final OpenSearchHighLevelRestClient restClient = (OpenSearchHighLevelRestClient)
-                    sink.getElasticsearchClient().getRestClient();
-            assertEquals(restClient.search(index).getHits().getHits()[0].getId(), "[\"bob\",\"boby\"]");
-        }
+        assertEquals("[\"bob\",\"boby\"]", getHitIdAtIndex(index, 0));
     }
 
     protected final void send(int numRecords) throws Exception {
@@ -450,4 +456,131 @@ public abstract class ElasticSearchSinkTests extends ElasticSearchTestBase {
             sink.close();
         }
     }
+
+    @DataProvider(name = "IdHashingAlgorithm")
+    public Object[] schemaType() {
+        return new Object[]{
+                ElasticSearchConfig.IdHashingAlgorithm.SHA256,
+                ElasticSearchConfig.IdHashingAlgorithm.SHA512
+        };
+    }
+
+    @Test(dataProvider = "IdHashingAlgorithm")
+    public final void testHashKey(ElasticSearchConfig.IdHashingAlgorithm algorithm) throws Exception {
+        when(mockRecord.getKey()).thenAnswer((Answer<Optional<String>>) invocation -> Optional.of( "record-key"));
+        final String indexName = "test-index" + UUID.randomUUID();
+        map.put("indexName", indexName);
+        map.put("keyIgnore", "false");
+        map.put("idHashingAlgorithm", algorithm.toString());
+        sink.open(map, mockSinkContext);
+        send(10);
+        verify(mockRecord, times(10)).ack();
+        final String expectedHashedValue = algorithm == ElasticSearchConfig.IdHashingAlgorithm.SHA256 ?
+                "gbY32PzSxtpjWeaWMROhFw3nleS3JbhNHgtM/Z7FjOk" :
+                "BBaia6VUM0KGsZVJGOyte6bDNXW0nfkV/zNntc737Nk7HwtDZjZmeyezYwEVQ5cfHIHDFR1e9yczUBwf8zw0rw";
+        final long count = sink.getElasticsearchClient().getRestClient()
+                .totalHits(indexName, "_id:" + expectedHashedValue);
+        assertEquals(count, 1);
+    }
+
+    @Test
+    public final void testKeyValueHashAndCanonicalOutput() throws Exception {
+        RecordSchemaBuilder keySchemaBuilder = SchemaBuilder.record("key");
+        keySchemaBuilder.field("keyFieldB").type(SchemaType.STRING).optional().defaultValue(null);
+        keySchemaBuilder.field("keyFieldA").type(SchemaType.STRING).optional().defaultValue(null);
+        GenericSchema<GenericRecord> keySchema = Schema.generic(keySchemaBuilder.build(SchemaType.JSON));
+
+        // more than 512 bytes to break the _id size limitation
+        final String keyFieldBValue = Stream.generate(() -> "keyB").limit(1000).collect(Collectors.joining());
+        GenericRecord keyGenericRecord = keySchema.newRecordBuilder()
+                .set("keyFieldB", keyFieldBValue)
+                .set("keyFieldA", "keyA")
+                .build();
+
+        GenericRecord keyGenericRecord2 = keySchema.newRecordBuilder()
+                .set("keyFieldA", "keyA")
+                .set("keyFieldB", keyFieldBValue)
+                .build();
+        Record<GenericObject> genericObjectRecord = createKeyValueGenericRecordWithGenericKeySchema(
+                keySchema, keyGenericRecord);
+        Record<GenericObject> genericObjectRecord2 = createKeyValueGenericRecordWithGenericKeySchema(
+                keySchema, keyGenericRecord2);
+        final String indexName = "test-index" + UUID.randomUUID();
+        map.put("indexName", indexName);
+        map.put("keyIgnore", "false");
+        map.put("nullValueAction", ElasticSearchConfig.NullValueAction.DELETE.toString());
+        map.put("canonicalKeyFields", "true");
+        map.put("idHashingAlgorithm", ElasticSearchConfig.IdHashingAlgorithm.SHA512);
+        sink.open(map, mockSinkContext);
+        for (int idx = 0; idx < 10; idx++) {
+            sink.write(genericObjectRecord);
+        }
+        for (int idx = 0; idx < 10; idx++) {
+            sink.write(genericObjectRecord2);
+        }
+        final String expectedHashedValue = "7BmM3pkYIbhm8cPN5ePd/BeZ7lYZnKhzmiJ62k0PsGNNAQdk" +
+                "S+/te9+NKpdy31lEN0jT1MVrBjYIj4O08QsU1g";
+        long count = sink.getElasticsearchClient().getRestClient()
+                .totalHits(indexName, "_id:" + expectedHashedValue);
+        assertEquals(count, 1);
+
+
+        Record<GenericObject> genericObjectRecordDelete = createKeyValueGenericRecordWithGenericKeySchema(
+                keySchema, keyGenericRecord, true);
+        sink.write(genericObjectRecordDelete);
+        count = sink.getElasticsearchClient().getRestClient()
+                .totalHits(indexName, "_id:" + expectedHashedValue);
+        assertEquals(count, 0);
+
+    }
+
+    private Record<GenericObject> createKeyValueGenericRecordWithGenericKeySchema(
+            GenericSchema<GenericRecord> keySchema,
+            GenericRecord keyGenericRecord) {
+        return createKeyValueGenericRecordWithGenericKeySchema(
+                keySchema,
+                keyGenericRecord,
+                false
+        );
+    }
+
+    private Record<GenericObject> createKeyValueGenericRecordWithGenericKeySchema(
+            GenericSchema<GenericRecord> keySchema,
+            GenericRecord keyGenericRecord, boolean emptyValue) {
+
+        Schema<KeyValue<GenericRecord, GenericRecord>> keyValueSchema =
+                Schema.KeyValue(keySchema, genericSchema, KeyValueEncodingType.INLINE);
+        KeyValue<GenericRecord, GenericRecord> keyValue = new KeyValue<>(keyGenericRecord,
+                emptyValue ? null: userProfile);
+        GenericObject genericObject = new GenericObject() {
+            @Override
+            public SchemaType getSchemaType() {
+                return SchemaType.KEY_VALUE;
+            }
+
+            @Override
+            public Object getNativeObject() {
+                return keyValue;
+            }
+        };
+        Record<GenericObject> genericObjectRecord = new Record<GenericObject>() {
+            @Override
+            public Optional<String> getTopicName() {
+                return Optional.of("topic-name");
+            }
+
+            @Override
+            public Schema  getSchema() {
+                return keyValueSchema;
+            }
+
+            @Override
+            public GenericObject getValue() {
+                return genericObject;
+            }
+        };
+        return genericObjectRecord;
+    }
+
+
 }
diff --git a/site2/docs/io-elasticsearch-sink.md b/site2/docs/io-elasticsearch-sink.md
index 7decfc96552..1b4449daef7 100644
--- a/site2/docs/io-elasticsearch-sink.md
+++ b/site2/docs/io-elasticsearch-sink.md
@@ -88,6 +88,7 @@ The configuration of the Elasticsearch sink connector has the following properti
 | `apiKey` | String| false | " " (empty string)|The apiKey used by the connector to connect to the ElasticSearch cluster. Only one between basic/token/apiKey authentication mode must be configured. |
 | `canonicalKeyFields` | Boolean | false | false | Whether to sort the key fields for JSON and Avro or not. If it is set to `true` and the record key schema is `JSON` or `AVRO`, the serialized object does not consider the order of properties. |
 | `stripNonPrintableCharacters` | Boolean| false | true| Whether to remove all non-printable characters from the document or not. If it is set to true, all non-printable characters are removed from the document. |
+| `idHashingAlgorithm` | enum(NONE,SHA256,SHA512)| false | NONE|Hashing algorithm to use for the document id. This is useful in order to be compliant with the ElasticSearch _id hard limit of 512 bytes. |
 
 ### Definition of ElasticSearchSslConfig structure: