You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/06/08 12:46:51 UTC
[pulsar] branch master updated: [feat][elasticsearch-sink] Option to strip out non printable characters (#15431)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3aef61ac53f [feat][elasticsearch-sink] Option to strip out non printable characters (#15431)
3aef61ac53f is described below
commit 3aef61ac53f37effe8a9afb75b4eeed059f66fd5
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Wed Jun 8 14:46:42 2022 +0200
[feat][elasticsearch-sink] Option to strip out non printable characters (#15431)
* [feat][elasticsearch-sink] Option to strip out non printable characters
* Fix ElasticSearchSinkRawDataTests#testStripNonPrintableCharacters test
(cherry picked from commit e9304bed4ff36330acaa877635afdfd0d1576f00)
* Update site2/docs/io-elasticsearch-sink.md
Co-authored-by: Anonymitaet <50...@users.noreply.github.com>
* Update site2/docs/io-elasticsearch-sink.md
Co-authored-by: momo-jun <60...@users.noreply.github.com>
* fix checkstyle
Co-authored-by: Anonymitaet <50...@users.noreply.github.com>
Co-authored-by: momo-jun <60...@users.noreply.github.com>
---
.../io/elasticsearch/ElasticSearchConfig.java | 6 +++
.../pulsar/io/elasticsearch/ElasticSearchSink.java | 24 ++++++++---
.../ElasticSearchSinkRawDataTests.java | 46 ++++++++++++++++++++++
site2/docs/io-elasticsearch-sink.md | 1 +
4 files changed, 72 insertions(+), 5 deletions(-)
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
index 396e892fc12..3ae783ce41f 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
@@ -281,6 +281,12 @@ public class ElasticSearchConfig implements Serializable {
)
private boolean canonicalKeyFields = false;
+ @FieldDoc(
+ defaultValue = "true",
+ help = "If stripNonPrintableCharacters is true, all non-printable characters will be removed from the document."
+ )
+ private boolean stripNonPrintableCharacters = true;
+
public enum MalformedDocAction {
IGNORE,
WARN,
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
index 9f703a59ecb..810ff6afb92 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
@@ -34,6 +34,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.Message;
@@ -64,6 +65,7 @@ public class ElasticSearchSink implements Sink<GenericObject> {
private final ObjectMapper objectMapper = new ObjectMapper();
private ObjectMapper sortedObjectMapper;
private List<String> primaryFields = null;
+ private final Pattern nonPrintableCharactersPattern = Pattern.compile("[\\p{C}]");
@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
@@ -227,15 +229,27 @@ public class ElasticSearchSink implements Sink<GenericObject> {
id,
doc);
}
+ doc = sanitizeValue(doc);
return Pair.of(id, doc);
} else {
- return Pair.of(null, new String(
- record.getMessage()
- .orElseThrow(() -> new IllegalArgumentException("Record does not carry message information"))
- .getData(), StandardCharsets.UTF_8));
+ final byte[] data = record
+ .getMessage()
+ .orElseThrow(() -> new IllegalArgumentException("Record does not carry message information"))
+ .getData();
+ String doc = new String(data, StandardCharsets.UTF_8);
+ doc = sanitizeValue(doc);
+ return Pair.of(null, doc);
}
}
+ private String sanitizeValue(String value) {
+ if (value == null || !elasticSearchConfig.isStripNonPrintableCharacters()) {
+ return value;
+ }
+ return nonPrintableCharactersPattern.matcher(value).replaceAll("");
+
+ }
+
public String stringifyKey(Schema<?> schema, Object val) throws JsonProcessingException {
switch (schema.getSchemaInfo().getType()) {
case INT8:
@@ -323,4 +337,4 @@ public class ElasticSearchSink implements Sink<GenericObject> {
}
}
-}
\ No newline at end of file
+}
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkRawDataTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkRawDataTests.java
index 92eb103d818..bff1f7d791a 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkRawDataTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkRawDataTests.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.io.elasticsearch;
+import lombok.AllArgsConstructor;
+import lombok.Data;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericObject;
@@ -30,6 +32,7 @@ import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.nio.charset.StandardCharsets;
@@ -41,6 +44,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.fail;
public abstract class ElasticSearchSinkRawDataTests extends ElasticSearchTestBase {
@@ -136,4 +140,46 @@ public abstract class ElasticSearchSinkRawDataTests extends ElasticSearchTestBas
}
}
+ @Data
+ @AllArgsConstructor
+ private static class StripNonPrintableCharactersTestConfig {
+ private boolean stripNonPrintableCharacters;
+ private boolean bulkEnabled;
+
+ }
+ @DataProvider(name = "stripNonPrintableCharacters")
+ public Object[] stripNonPrintableCharacters() {
+ return new Object[]{
+ new StripNonPrintableCharactersTestConfig(true, true),
+ new StripNonPrintableCharactersTestConfig(true, false),
+ new StripNonPrintableCharactersTestConfig(false, true),
+ new StripNonPrintableCharactersTestConfig(false, false),
+ };
+ }
+
+
+ @Test(dataProvider = "stripNonPrintableCharacters")
+ public final void testStripNonPrintableCharacters(StripNonPrintableCharactersTestConfig conf) throws Exception {
+ map.put("indexName", "test-index");
+ map.put("bulkEnabled", conf.isBulkEnabled());
+ map.put("bulkActions", 1);
+ map.put("maxRetries", 1);
+ map.put("stripNonPrintableCharacters", conf.isStripNonPrintableCharacters());
+ sink.open(map, mockSinkContext);
+
+ final String data = "\t" + ((char)0) + "{\"a\":\"b" + ((char)31) + "\"}";
+ when(mockMessage.getData()).thenReturn(data.getBytes(StandardCharsets.UTF_8));
+ try {
+ send(1);
+ if (!conf.isStripNonPrintableCharacters()) {
+ fail("with stripNonPrintableCharacters=false it should have raised an exception");
+ }
+ verify(mockRecord, times(1)).ack();
+ } catch (Throwable t) {
+ if (conf.isStripNonPrintableCharacters()) {
+ throw t;
+ }
+ }
+ }
+
}
diff --git a/site2/docs/io-elasticsearch-sink.md b/site2/docs/io-elasticsearch-sink.md
index 9cd6d0020e1..7decfc96552 100644
--- a/site2/docs/io-elasticsearch-sink.md
+++ b/site2/docs/io-elasticsearch-sink.md
@@ -87,6 +87,7 @@ The configuration of the Elasticsearch sink connector has the following properti
| `token` | String| false | " " (empty string)|The token used by the connector to connect to the ElasticSearch cluster. Only one between basic/token/apiKey authentication mode must be configured. |
| `apiKey` | String| false | " " (empty string)|The apiKey used by the connector to connect to the ElasticSearch cluster. Only one between basic/token/apiKey authentication mode must be configured. |
| `canonicalKeyFields` | Boolean | false | false | Whether to sort the key fields for JSON and Avro or not. If it is set to `true` and the record key schema is `JSON` or `AVRO`, the serialized object does not consider the order of properties. |
+| `stripNonPrintableCharacters` | Boolean| false | true| Whether to remove all non-printable characters from the document or not. If it is set to true, all non-printable characters are removed from the document. |
### Definition of ElasticSearchSslConfig structure: