You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/06/10 13:10:43 UTC
[pulsar] branch master updated: [feat][elasticsearch] Add hashed id support (#15428)
This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 67224175bfb [feat][elasticsearch] Add hashed id support (#15428)
67224175bfb is described below
commit 67224175bfbdee52fb756e3cebf0ad59b3963321
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Fri Jun 10 15:10:34 2022 +0200
[feat][elasticsearch] Add hashed id support (#15428)
* [feat][elasticsearch] Add hashed id support
* [feat][elasticsearch] Add hashed id support
* add config default test
* fix compile
---
.../io/elasticsearch/ElasticSearchConfig.java | 13 ++
.../pulsar/io/elasticsearch/ElasticSearchSink.java | 25 ++++
.../pulsar/io/elasticsearch/client/RestClient.java | 1 +
.../elastic/ElasticSearchJavaRestClient.java | 16 ++-
.../opensearch/OpenSearchHighLevelRestClient.java | 22 ++-
.../io/elasticsearch/ElasticSearchConfigTests.java | 1 +
.../io/elasticsearch/ElasticSearchSinkTests.java | 159 +++++++++++++++++++--
site2/docs/io-elasticsearch-sink.md | 1 +
8 files changed, 221 insertions(+), 17 deletions(-)
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
index 3ae783ce41f..1dbfa0568ba 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
@@ -287,6 +287,13 @@ public class ElasticSearchConfig implements Serializable {
)
private boolean stripNonPrintableCharacters = true;
+ @FieldDoc(
+ defaultValue = "NONE",
+ help = "Hashing algorithm to use for the document id. This is useful in order to be compliant with "
+ + "the ElasticSearch _id hard limit of 512 bytes."
+ )
+ private IdHashingAlgorithm idHashingAlgorithm = IdHashingAlgorithm.NONE;
+
public enum MalformedDocAction {
IGNORE,
WARN,
@@ -306,6 +313,12 @@ public class ElasticSearchConfig implements Serializable {
OPENSEARCH
}
+ public enum IdHashingAlgorithm {
+ NONE,
+ SHA256,
+ SHA512
+ }
+
public static ElasticSearchConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), ElasticSearchConfig.class);
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
index 810ff6afb92..263dcc4ea6e 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
@@ -27,9 +27,12 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Base64;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -66,6 +69,7 @@ public class ElasticSearchSink implements Sink<GenericObject> {
private ObjectMapper sortedObjectMapper;
private List<String> primaryFields = null;
private final Pattern nonPrintableCharactersPattern = Pattern.compile("[\\p{C}]");
+ private final Base64.Encoder base64Encoder = Base64.getEncoder().withoutPadding();
@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
@@ -218,6 +222,27 @@ public class ElasticSearchSink implements Sink<GenericObject> {
}
}
+ final ElasticSearchConfig.IdHashingAlgorithm idHashingAlgorithm =
+ elasticSearchConfig.getIdHashingAlgorithm();
+ if (id != null
+ && idHashingAlgorithm != null
+ && idHashingAlgorithm != ElasticSearchConfig.IdHashingAlgorithm.NONE) {
+ Hasher hasher;
+ switch (idHashingAlgorithm) {
+ case SHA256:
+ hasher = Hashing.sha256().newHasher();
+ break;
+ case SHA512:
+ hasher = Hashing.sha512().newHasher();
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported IdHashingAlgorithm: "
+ + idHashingAlgorithm);
+ }
+ hasher.putString(id, StandardCharsets.UTF_8);
+ id = base64Encoder.encodeToString(hasher.hash().asBytes());
+ }
+
if (log.isDebugEnabled()) {
SchemaType schemaType = null;
if (record.getSchema() != null && record.getSchema().getSchemaInfo() != null) {
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/RestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/RestClient.java
index 61aa86213eb..561358a3e5c 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/RestClient.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/RestClient.java
@@ -101,6 +101,7 @@ public abstract class RestClient implements Closeable {
public abstract boolean deleteDocument(String index, String documentId) throws IOException;
public abstract long totalHits(String index) throws IOException;
+ public abstract long totalHits(String index, String query) throws IOException;
public abstract BulkProcessor getBulkProcessor();
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
index d4c7a5056d3..2166df22731 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
@@ -157,19 +157,29 @@ public class ElasticSearchJavaRestClient extends RestClient {
}
}
- @VisibleForTesting
public SearchResponse<Map> search(String indexName) throws IOException {
+ return search(indexName, "*:*");
+ }
+
+ @VisibleForTesting
+ public SearchResponse<Map> search(String indexName, String query) throws IOException {
final RefreshRequest refreshRequest = new RefreshRequest.Builder().index(indexName).build();
client.indices().refresh(refreshRequest);
+ query = query.replace("/", "\\/");
return client.search(new SearchRequest.Builder().index(indexName)
- .q("*:*")
+ .q(query)
.build(), Map.class);
}
@Override
public long totalHits(String indexName) throws IOException {
- final SearchResponse<Map> searchResponse = search(indexName);
+ return totalHits(indexName, "*:*");
+ }
+
+ @Override
+ public long totalHits(String indexName, String query) throws IOException {
+ final SearchResponse<Map> searchResponse = search(indexName, query);
return searchResponse.hits().total().value();
}
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
index 1a939774c4c..0a4629f24ad 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
@@ -56,6 +56,7 @@ import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentType;
+import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
@@ -261,13 +262,32 @@ public class OpenSearchHighLevelRestClient extends RestClient implements BulkPro
return search(indexName).getHits().getTotalHits().value;
}
+ @Override
+ public long totalHits(String indexName, String query) throws IOException {
+ return search(indexName, query).getHits().getTotalHits().value;
+ }
+
@VisibleForTesting
public SearchResponse search(String indexName) throws IOException {
+ return search(indexName, "*:*");
+ }
+
+ @VisibleForTesting
+ public SearchResponse search(String indexName, String query) throws IOException {
client.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
+ QueryBuilder queryBuilder;
+ if ("*:*".equals(query)) {
+ queryBuilder = QueryBuilders.matchAllQuery();
+ } else {
+ final String[] split = query.split(":");
+ final String name = split[0];
+ final String text = split[1];
+ queryBuilder = QueryBuilders.matchQuery(name, text);
+ }
return client.search(
new SearchRequest()
.indices(indexName)
- .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
+ .source(new SearchSourceBuilder().query(queryBuilder)) ,
RequestOptions.DEFAULT);
}
@Override
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java
index 8f076d9d8cc..463a712c34c 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java
@@ -109,6 +109,7 @@ public class ElasticSearchConfigTests {
assertEquals(config.getSsl().getProtocols(), "TLSv1.2");
assertEquals(config.getCompatibilityMode(), ElasticSearchConfig.CompatibilityMode.AUTO);
+ assertEquals(config.getIdHashingAlgorithm(), ElasticSearchConfig.IdHashingAlgorithm.NONE);
}
@Test
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
index 5c0651f3720..b7da0f8c0fd 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
@@ -25,6 +25,8 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaBuilder;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
@@ -38,12 +40,17 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import java.nio.charset.StandardCharsets;
-import java.util.concurrent.TimeUnit;
-import java.util.HashMap;
import java.util.List;
import java.util.Locale;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Locale;
import java.util.Map;
import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
@@ -63,8 +70,10 @@ import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+
import static org.testng.Assert.assertNull;
public abstract class ElasticSearchSinkTests extends ElasticSearchTestBase {
@@ -254,14 +263,19 @@ public abstract class ElasticSearchSinkTests extends ElasticSearchTestBase {
verify(mockRecord, times(1)).ack();
assertEquals(sink.getElasticsearchClient().getRestClient().totalHits(index), 1L);
+ String value = getHitIdAtIndex(index, 0);
+ assertEquals(value, "bob");
+ }
+
+ private String getHitIdAtIndex(String indexName, int index) throws IOException {
if (elasticImageName.equals(ELASTICSEARCH_8)) {
final ElasticSearchJavaRestClient restClient = (ElasticSearchJavaRestClient)
sink.getElasticsearchClient().getRestClient();
- assertEquals(restClient.search(index).hits().hits().get(0).id(), "bob");
+ return restClient.search(indexName).hits().hits().get(index).id();
} else {
final OpenSearchHighLevelRestClient restClient = (OpenSearchHighLevelRestClient)
sink.getElasticsearchClient().getRestClient();
- assertEquals(restClient.search(index).getHits().getHits()[0].getId(), "bob");
+ return restClient.search(indexName).getHits().getHits()[0].getId();
}
}
@@ -275,15 +289,7 @@ public abstract class ElasticSearchSinkTests extends ElasticSearchTestBase {
send(1);
verify(mockRecord, times(1)).ack();
assertEquals(sink.getElasticsearchClient().getRestClient().totalHits(index), 1L);
- if (elasticImageName.equals(ELASTICSEARCH_8)) {
- final ElasticSearchJavaRestClient restClient = (ElasticSearchJavaRestClient)
- sink.getElasticsearchClient().getRestClient();
- assertEquals(restClient.search(index).hits().hits().get(0).id(), "[\"bob\",\"boby\"]");
- } else {
- final OpenSearchHighLevelRestClient restClient = (OpenSearchHighLevelRestClient)
- sink.getElasticsearchClient().getRestClient();
- assertEquals(restClient.search(index).getHits().getHits()[0].getId(), "[\"bob\",\"boby\"]");
- }
+ assertEquals("[\"bob\",\"boby\"]", getHitIdAtIndex(index, 0));
}
protected final void send(int numRecords) throws Exception {
@@ -450,4 +456,131 @@ public abstract class ElasticSearchSinkTests extends ElasticSearchTestBase {
sink.close();
}
}
+
+ @DataProvider(name = "IdHashingAlgorithm")
+ public Object[] schemaType() {
+ return new Object[]{
+ ElasticSearchConfig.IdHashingAlgorithm.SHA256,
+ ElasticSearchConfig.IdHashingAlgorithm.SHA512
+ };
+ }
+
+ @Test(dataProvider = "IdHashingAlgorithm")
+ public final void testHashKey(ElasticSearchConfig.IdHashingAlgorithm algorithm) throws Exception {
+ when(mockRecord.getKey()).thenAnswer((Answer<Optional<String>>) invocation -> Optional.of( "record-key"));
+ final String indexName = "test-index" + UUID.randomUUID();
+ map.put("indexName", indexName);
+ map.put("keyIgnore", "false");
+ map.put("idHashingAlgorithm", algorithm.toString());
+ sink.open(map, mockSinkContext);
+ send(10);
+ verify(mockRecord, times(10)).ack();
+ final String expectedHashedValue = algorithm == ElasticSearchConfig.IdHashingAlgorithm.SHA256 ?
+ "gbY32PzSxtpjWeaWMROhFw3nleS3JbhNHgtM/Z7FjOk" :
+ "BBaia6VUM0KGsZVJGOyte6bDNXW0nfkV/zNntc737Nk7HwtDZjZmeyezYwEVQ5cfHIHDFR1e9yczUBwf8zw0rw";
+ final long count = sink.getElasticsearchClient().getRestClient()
+ .totalHits(indexName, "_id:" + expectedHashedValue);
+ assertEquals(count, 1);
+ }
+
+ @Test
+ public final void testKeyValueHashAndCanonicalOutput() throws Exception {
+ RecordSchemaBuilder keySchemaBuilder = SchemaBuilder.record("key");
+ keySchemaBuilder.field("keyFieldB").type(SchemaType.STRING).optional().defaultValue(null);
+ keySchemaBuilder.field("keyFieldA").type(SchemaType.STRING).optional().defaultValue(null);
+ GenericSchema<GenericRecord> keySchema = Schema.generic(keySchemaBuilder.build(SchemaType.JSON));
+
+ // more than 512 bytes to break the _id size limitation
+ final String keyFieldBValue = Stream.generate(() -> "keyB").limit(1000).collect(Collectors.joining());
+ GenericRecord keyGenericRecord = keySchema.newRecordBuilder()
+ .set("keyFieldB", keyFieldBValue)
+ .set("keyFieldA", "keyA")
+ .build();
+
+ GenericRecord keyGenericRecord2 = keySchema.newRecordBuilder()
+ .set("keyFieldA", "keyA")
+ .set("keyFieldB", keyFieldBValue)
+ .build();
+ Record<GenericObject> genericObjectRecord = createKeyValueGenericRecordWithGenericKeySchema(
+ keySchema, keyGenericRecord);
+ Record<GenericObject> genericObjectRecord2 = createKeyValueGenericRecordWithGenericKeySchema(
+ keySchema, keyGenericRecord2);
+ final String indexName = "test-index" + UUID.randomUUID();
+ map.put("indexName", indexName);
+ map.put("keyIgnore", "false");
+ map.put("nullValueAction", ElasticSearchConfig.NullValueAction.DELETE.toString());
+ map.put("canonicalKeyFields", "true");
+ map.put("idHashingAlgorithm", ElasticSearchConfig.IdHashingAlgorithm.SHA512);
+ sink.open(map, mockSinkContext);
+ for (int idx = 0; idx < 10; idx++) {
+ sink.write(genericObjectRecord);
+ }
+ for (int idx = 0; idx < 10; idx++) {
+ sink.write(genericObjectRecord2);
+ }
+ final String expectedHashedValue = "7BmM3pkYIbhm8cPN5ePd/BeZ7lYZnKhzmiJ62k0PsGNNAQdk" +
+ "S+/te9+NKpdy31lEN0jT1MVrBjYIj4O08QsU1g";
+ long count = sink.getElasticsearchClient().getRestClient()
+ .totalHits(indexName, "_id:" + expectedHashedValue);
+ assertEquals(count, 1);
+
+
+ Record<GenericObject> genericObjectRecordDelete = createKeyValueGenericRecordWithGenericKeySchema(
+ keySchema, keyGenericRecord, true);
+ sink.write(genericObjectRecordDelete);
+ count = sink.getElasticsearchClient().getRestClient()
+ .totalHits(indexName, "_id:" + expectedHashedValue);
+ assertEquals(count, 0);
+
+ }
+
+ private Record<GenericObject> createKeyValueGenericRecordWithGenericKeySchema(
+ GenericSchema<GenericRecord> keySchema,
+ GenericRecord keyGenericRecord) {
+ return createKeyValueGenericRecordWithGenericKeySchema(
+ keySchema,
+ keyGenericRecord,
+ false
+ );
+ }
+
+ private Record<GenericObject> createKeyValueGenericRecordWithGenericKeySchema(
+ GenericSchema<GenericRecord> keySchema,
+ GenericRecord keyGenericRecord, boolean emptyValue) {
+
+ Schema<KeyValue<GenericRecord, GenericRecord>> keyValueSchema =
+ Schema.KeyValue(keySchema, genericSchema, KeyValueEncodingType.INLINE);
+ KeyValue<GenericRecord, GenericRecord> keyValue = new KeyValue<>(keyGenericRecord,
+ emptyValue ? null: userProfile);
+ GenericObject genericObject = new GenericObject() {
+ @Override
+ public SchemaType getSchemaType() {
+ return SchemaType.KEY_VALUE;
+ }
+
+ @Override
+ public Object getNativeObject() {
+ return keyValue;
+ }
+ };
+ Record<GenericObject> genericObjectRecord = new Record<GenericObject>() {
+ @Override
+ public Optional<String> getTopicName() {
+ return Optional.of("topic-name");
+ }
+
+ @Override
+ public Schema getSchema() {
+ return keyValueSchema;
+ }
+
+ @Override
+ public GenericObject getValue() {
+ return genericObject;
+ }
+ };
+ return genericObjectRecord;
+ }
+
+
}
diff --git a/site2/docs/io-elasticsearch-sink.md b/site2/docs/io-elasticsearch-sink.md
index 7decfc96552..1b4449daef7 100644
--- a/site2/docs/io-elasticsearch-sink.md
+++ b/site2/docs/io-elasticsearch-sink.md
@@ -88,6 +88,7 @@ The configuration of the Elasticsearch sink connector has the following properti
| `apiKey` | String| false | " " (empty string)|The apiKey used by the connector to connect to the ElasticSearch cluster. Only one between basic/token/apiKey authentication mode must be configured. |
| `canonicalKeyFields` | Boolean | false | false | Whether to sort the key fields for JSON and Avro or not. If it is set to `true` and the record key schema is `JSON` or `AVRO`, the serialized object does not consider the order of properties. |
| `stripNonPrintableCharacters` | Boolean| false | true| Whether to remove all non-printable characters from the document or not. If it is set to true, all non-printable characters are removed from the document. |
+| `idHashingAlgorithm` | enum(NONE,SHA256,SHA512)| false | NONE|Hashing algorithm to use for the document id. This is useful in order to be compliant with the ElasticSearch _id hard limit of 512 bytes. |
### Definition of ElasticSearchSslConfig structure: