You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/06/08 12:46:51 UTC

[pulsar] branch master updated: [feat][elasticsearch-sink] Option to strip out non printable characters (#15431)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3aef61ac53f [feat][elasticsearch-sink] Option to strip out non printable characters (#15431)
3aef61ac53f is described below

commit 3aef61ac53f37effe8a9afb75b4eeed059f66fd5
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Wed Jun 8 14:46:42 2022 +0200

    [feat][elasticsearch-sink] Option to strip out non printable characters (#15431)
    
    * [feat][elasticsearch-sink] Option to strip out non printable characters
    
    * Fix ElasticSearchSinkRawDataTests#testStripNonPrintableCharacters test
    
    (cherry picked from commit e9304bed4ff36330acaa877635afdfd0d1576f00)
    
    * Update site2/docs/io-elasticsearch-sink.md
    
    Co-authored-by: Anonymitaet <50...@users.noreply.github.com>
    
    * Update site2/docs/io-elasticsearch-sink.md
    
    Co-authored-by: momo-jun <60...@users.noreply.github.com>
    
    * fix checkstyle
    
    Co-authored-by: Anonymitaet <50...@users.noreply.github.com>
    Co-authored-by: momo-jun <60...@users.noreply.github.com>
---
 .../io/elasticsearch/ElasticSearchConfig.java      |  6 +++
 .../pulsar/io/elasticsearch/ElasticSearchSink.java | 24 ++++++++---
 .../ElasticSearchSinkRawDataTests.java             | 46 ++++++++++++++++++++++
 site2/docs/io-elasticsearch-sink.md                |  1 +
 4 files changed, 72 insertions(+), 5 deletions(-)

diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
index 396e892fc12..3ae783ce41f 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
@@ -281,6 +281,12 @@ public class ElasticSearchConfig implements Serializable {
     )
     private boolean canonicalKeyFields = false;
 
+    @FieldDoc(
+            defaultValue = "true",
+            help = "If stripNonPrintableCharacters is true, all non-printable characters will be removed from the document."
+    )
+    private boolean stripNonPrintableCharacters = true;
+
     public enum MalformedDocAction {
         IGNORE,
         WARN,
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
index 9f703a59ecb..810ff6afb92 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
@@ -34,6 +34,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.regex.Pattern;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.Message;
@@ -64,6 +65,7 @@ public class ElasticSearchSink implements Sink<GenericObject> {
     private final ObjectMapper objectMapper = new ObjectMapper();
     private ObjectMapper sortedObjectMapper;
     private List<String> primaryFields = null;
+    private final Pattern nonPrintableCharactersPattern = Pattern.compile("[\\p{C}]");
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
@@ -227,15 +229,27 @@ public class ElasticSearchSink implements Sink<GenericObject> {
                         id,
                         doc);
             }
+            doc = sanitizeValue(doc);
             return Pair.of(id, doc);
     } else {
-        return Pair.of(null, new String(
-                record.getMessage()
-                        .orElseThrow(() -> new IllegalArgumentException("Record does not carry message information"))
-                        .getData(), StandardCharsets.UTF_8));
+            final byte[] data = record
+                    .getMessage()
+                    .orElseThrow(() -> new IllegalArgumentException("Record does not carry message information"))
+                    .getData();
+            String doc = new String(data, StandardCharsets.UTF_8);
+            doc = sanitizeValue(doc);
+            return Pair.of(null, doc);
         }
     }
 
+    private String sanitizeValue(String value) {
+        if (value == null || !elasticSearchConfig.isStripNonPrintableCharacters()) {
+            return value;
+        }
+        return nonPrintableCharactersPattern.matcher(value).replaceAll("");
+
+    }
+
     public String stringifyKey(Schema<?> schema, Object val) throws JsonProcessingException {
         switch (schema.getSchemaInfo().getType()) {
             case INT8:
@@ -323,4 +337,4 @@ public class ElasticSearchSink implements Sink<GenericObject> {
         }
     }
 
-}
\ No newline at end of file
+}
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkRawDataTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkRawDataTests.java
index 92eb103d818..bff1f7d791a 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkRawDataTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkRawDataTests.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
+import lombok.AllArgsConstructor;
+import lombok.Data;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.GenericObject;
@@ -30,6 +32,7 @@ import org.testcontainers.elasticsearch.ElasticsearchContainer;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import java.nio.charset.StandardCharsets;
@@ -41,6 +44,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.fail;
 
 public abstract class ElasticSearchSinkRawDataTests extends ElasticSearchTestBase {
 
@@ -136,4 +140,46 @@ public abstract class ElasticSearchSinkRawDataTests extends ElasticSearchTestBas
         }
     }
 
+    @Data
+    @AllArgsConstructor
+    private static class StripNonPrintableCharactersTestConfig {
+        private boolean stripNonPrintableCharacters;
+        private boolean bulkEnabled;
+
+    }
+    @DataProvider(name = "stripNonPrintableCharacters")
+    public Object[] stripNonPrintableCharacters() {
+        return new Object[]{
+                new StripNonPrintableCharactersTestConfig(true, true),
+                new StripNonPrintableCharactersTestConfig(true, false),
+                new StripNonPrintableCharactersTestConfig(false, true),
+                new StripNonPrintableCharactersTestConfig(false, false),
+        };
+    }
+
+
+    @Test(dataProvider = "stripNonPrintableCharacters")
+    public final void testStripNonPrintableCharacters(StripNonPrintableCharactersTestConfig conf) throws Exception {
+        map.put("indexName", "test-index");
+        map.put("bulkEnabled", conf.isBulkEnabled());
+        map.put("bulkActions", 1);
+        map.put("maxRetries", 1);
+        map.put("stripNonPrintableCharacters", conf.isStripNonPrintableCharacters());
+        sink.open(map, mockSinkContext);
+
+        final String data = "\t" + ((char)0) + "{\"a\":\"b" + ((char)31) + "\"}";
+        when(mockMessage.getData()).thenReturn(data.getBytes(StandardCharsets.UTF_8));
+        try {
+            send(1);
+            if (!conf.isStripNonPrintableCharacters()) {
+                fail("with stripNonPrintableCharacters=false it should have raised an exception");
+            }
+            verify(mockRecord, times(1)).ack();
+        } catch (Throwable t) {
+            if (conf.isStripNonPrintableCharacters()) {
+                throw t;
+            }
+        }
+    }
+
 }
diff --git a/site2/docs/io-elasticsearch-sink.md b/site2/docs/io-elasticsearch-sink.md
index 9cd6d0020e1..7decfc96552 100644
--- a/site2/docs/io-elasticsearch-sink.md
+++ b/site2/docs/io-elasticsearch-sink.md
@@ -87,6 +87,7 @@ The configuration of the Elasticsearch sink connector has the following properti
 | `token` | String| false | " " (empty string)|The token used by the connector to connect to the ElasticSearch cluster. Only one between basic/token/apiKey authentication mode must be configured. |
 | `apiKey` | String| false | " " (empty string)|The apiKey used by the connector to connect to the ElasticSearch cluster. Only one between basic/token/apiKey authentication mode must be configured. |
 | `canonicalKeyFields` | Boolean | false | false | Whether to sort the key fields for JSON and Avro or not. If it is set to `true` and the record key schema is `JSON` or `AVRO`, the serialized object does not consider the order of properties. |
+| `stripNonPrintableCharacters` | Boolean| false | true| Whether to remove all non-printable characters from the document or not. If it is set to true, all non-printable characters are removed from the document. |
 
 ### Definition of ElasticSearchSslConfig structure: