You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/05/17 15:01:15 UTC

[GitHub] [pulsar] vroyer opened a new pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

vroyer opened a new pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613


   ### Motivation
   
   Enhance the Elasticsearch sink connector for production use with the following features:
   * Support KeyValue message with any generic key/value schema
   * Add configuration NullValueAction to deal with null values (default is to delete the elasticsearch document)
   * Add configuration keyIgnore allowing to use the messageId or extract the elasticsearch _id from the message value
   * Use the configured Elasticsearch index name or convert the record topic name to a valid elasticsearch index name
   * Add support for SSL
   * Add support for bulk processing (with new bulk settings)
   * Add support for random exponential retry
   * Add compression support
   * Add configuration malformedDocAction to deal with errors
   * Support for both JSON and AVRO encoding with [AVRO logical types](https://avro.apache.org/docs/current/spec.html#Logical+Types).
   
   ### Modifications
   
   Introduce new classes:
   * org.apache.pulsar.io.core.SslConfig to deal with client SSL/TLS configurations
   * org.apache.pulsar.io.elasticsearch.ElasticsearchClient to deal with Elasticsearch
   * org.apache.pulsar.io.elasticsearch.RandomExponentialRetry to deal with retries, as described in this [blog](https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/)
   * org.apache.pulsar.io.elasticsearch.JsonConverter to deal with AVRO to JSON conversion
   
   ### Verifying this change
   
   * Introduce numerous unit and testcontainers tests.
   
   ### Documentation
   
   * Settings are documented in the ElasticsearchConfig (with the FieldDoc annotation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] vroyer commented on a change in pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

Posted by GitBox <gi...@apache.org>.
vroyer commented on a change in pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613#discussion_r635421839



##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,254 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 /**
  * The base abstract class for ElasticSearch sinks.
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticsearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {
 
-    private URL url;
-    private RestHighLevelClient client;
-    private CredentialsProvider credentialsProvider;
-    private ElasticSearchConfig elasticSearchConfig;
+    private ElasticsearchConfig elasticSearchConfig;
+    private ElasticsearchClient elasticsearchClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        elasticSearchConfig = ElasticSearchConfig.load(config);
+        elasticSearchConfig = ElasticsearchConfig.load(config);
         elasticSearchConfig.validate();
-        createIndexIfNeeded();
+        elasticsearchClient = new ElasticsearchClient(elasticSearchConfig);
     }
 
     @Override
-    public void close() throws Exception {
-        client.close();
+    public void close() {
+        if (elasticsearchClient != null) {
+            elasticsearchClient.close();
+            elasticsearchClient = null;
+        }
     }
 
     @Override
-    public void write(Record<byte[]> record) {
-        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
-        IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName());
-        indexRequest.type(elasticSearchConfig.getTypeName());
-        indexRequest.source(keyValue.getValue(), XContentType.JSON);
-
-        try {
-        IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT);
-            if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
-                record.ack();
-            } else {
-                record.fail();
+    public void write(Record<GenericObject> record) throws Exception {
+        if (!elasticsearchClient.isFailed()) {
+            try {
+                Pair<String, String> idAndDoc = extractIdAndDocument(record);
+                if (idAndDoc.getRight() == null) {
+                    switch (elasticSearchConfig.getNullValueAction()) {
+                        case DELETE:
+                            if (elasticSearchConfig.isBulkEnabled()) {
+                                elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
+                            } else {
+                                elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
+                            }
+                            break;
+                        case IGNORE:
+                            break;
+                        case FAIL:
+                            elasticsearchClient.failed(new PulsarClientException.InvalidMessageException("Unexpected null message value"));
+                            throw elasticsearchClient.irrecoverableError.get();
+                    }
+                } else {
+                    if (elasticSearchConfig.isBulkEnabled()) {
+                        elasticsearchClient.bulkIndex(record, idAndDoc);
+                    } else {
+                        elasticsearchClient.indexDocument(record, idAndDoc);
+                    }
+                }
+            } catch (Exception e) {

Review comment:
       It catches and re-throw Exception, as the Sink.write() can throw Exception. What's wrong here ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613#discussion_r635366547



##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,254 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 /**
  * The base abstract class for ElasticSearch sinks.
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticsearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {
 
-    private URL url;
-    private RestHighLevelClient client;
-    private CredentialsProvider credentialsProvider;
-    private ElasticSearchConfig elasticSearchConfig;
+    private ElasticsearchConfig elasticSearchConfig;
+    private ElasticsearchClient elasticsearchClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        elasticSearchConfig = ElasticSearchConfig.load(config);
+        elasticSearchConfig = ElasticsearchConfig.load(config);
         elasticSearchConfig.validate();
-        createIndexIfNeeded();
+        elasticsearchClient = new ElasticsearchClient(elasticSearchConfig);
     }
 
     @Override
-    public void close() throws Exception {
-        client.close();
+    public void close() {
+        if (elasticsearchClient != null) {
+            elasticsearchClient.close();
+            elasticsearchClient = null;
+        }
     }
 
     @Override
-    public void write(Record<byte[]> record) {
-        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
-        IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName());
-        indexRequest.type(elasticSearchConfig.getTypeName());
-        indexRequest.source(keyValue.getValue(), XContentType.JSON);
-
-        try {
-        IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT);
-            if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
-                record.ack();
-            } else {
-                record.fail();
+    public void write(Record<GenericObject> record) throws Exception {
+        if (!elasticsearchClient.isFailed()) {
+            try {
+                Pair<String, String> idAndDoc = extractIdAndDocument(record);
+                if (idAndDoc.getRight() == null) {
+                    switch (elasticSearchConfig.getNullValueAction()) {
+                        case DELETE:
+                            if (elasticSearchConfig.isBulkEnabled()) {
+                                elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
+                            } else {
+                                elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
+                            }
+                            break;
+                        case IGNORE:
+                            break;
+                        case FAIL:
+                            elasticsearchClient.failed(new PulsarClientException.InvalidMessageException("Unexpected null message value"));
+                            throw elasticsearchClient.irrecoverableError.get();
+                    }
+                } else {
+                    if (elasticSearchConfig.isBulkEnabled()) {
+                        elasticsearchClient.bulkIndex(record, idAndDoc);
+                    } else {
+                        elasticsearchClient.indexDocument(record, idAndDoc);
+                    }
+                }
+            } catch (Exception e) {
+                log.error("write error:", e);
+                throw e;
             }
-        } catch (final IOException ex) {
-            record.fail();
         }
     }
 
-    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
-        String key = record.getKey().orElse("");
-        return new KeyValue<>(key, record.getValue());
+    @VisibleForTesting
+    ElasticsearchClient getElasticsearchClient() {
+        return this.elasticsearchClient;
     }
 
-    private void createIndexIfNeeded() throws IOException {
-        GetIndexRequest request = new GetIndexRequest();
-        request.indices(elasticSearchConfig.getIndexName());
-        boolean exists = getClient().indices().exists(request, RequestOptions.DEFAULT);
+    /**
+     * Extract ES _id and _source using the Schema if available.
+     *
+     * @param record
+     * @return A pair for _id and _source
+     */
+    public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) {
+        Object key = null;
+        Object value = null;
+        Schema<?> keySchema = null;
+        Schema<?> valueSchema = null;
+
+        if (record.getSchema() instanceof KeyValueSchema) {
+            KeyValueSchema<GenericObject,GenericObject> keyValueSchema = (KeyValueSchema) record.getSchema();
+            keySchema = keyValueSchema.getKeySchema();
+            valueSchema = keyValueSchema.getValueSchema();
+            if (KeyValueEncodingType.SEPARATED.equals(keyValueSchema.getKeyValueEncodingType()) && record.getKey().isPresent()) {
+                key = keySchema.decode(record.getKey().get().getBytes(StandardCharsets.UTF_8));
+            }
+            if (record.getValue() != null) {
+                value = ((KeyValue) record.getValue().getNativeObject()).getValue();
+                if (KeyValueEncodingType.INLINE.equals(keyValueSchema.getKeyValueEncodingType())) {
+                    key = ((KeyValue) record.getValue().getNativeObject()).getKey();
+                }
+            }
+        } else {
+            key = record.getKey().orElse(null);
+            valueSchema = record.getSchema();
+            value = record.getValue() == null ? null : record.getValue().getNativeObject();
+        }
 
-        if (!exists) {
-            CreateIndexRequest cireq = new CreateIndexRequest(elasticSearchConfig.getIndexName());
+        String id = key.toString();
+        if (keySchema != null) {
+            id = stringifyKey(keySchema, key);
+        }
 
-            cireq.settings(Settings.builder()
-               .put("index.number_of_shards", elasticSearchConfig.getIndexNumberOfShards())
-               .put("index.number_of_replicas", elasticSearchConfig.getIndexNumberOfReplicas()));
+        String doc = null;
+        if (value != null) {
+            if (valueSchema != null) {
+                doc = stringifyValue(valueSchema, value);
+            } else {
+                doc = value.toString();
+            }
+        }
 
-            CreateIndexResponse ciresp = getClient().indices().create(cireq, RequestOptions.DEFAULT);
-            if (!ciresp.isAcknowledged() || !ciresp.isShardsAcknowledged()) {
-                throw new RuntimeException("Unable to create index.");
+        if (elasticSearchConfig.isKeyIgnore()) {
+            if (doc == null || Strings.isNullOrEmpty(elasticSearchConfig.getPrimaryFields())) {
+                id = Hex.encodeHexString(record.getMessage().get().getMessageId().toByteArray());
+            } else {
+                try {
+                    // extract the PK from the JSON document
+                    JsonNode jsonNode = objectMapper.readTree(doc);
+                    List<String> pkFields = Arrays.asList(elasticSearchConfig.getPrimaryFields().split(","));
+                    id = stringifyKey(jsonNode, pkFields);
+                } catch (JsonProcessingException e) {
+                    log.error("Failed to read JSON", e);
+                }
             }
         }
+
+        SchemaType schemaType = null;
+        if (record.getSchema() != null && record.getSchema().getSchemaInfo() != null) {
+            schemaType = record.getSchema().getSchemaInfo().getType();
+        }
+        log.debug("recordType={} schemaType={} id={} doc={}",

Review comment:
       yes. I mean to add it.
   it is a pattern in Pulsar code base and we should keep the code consistent.
   
   there is a never ending debate about this topic :-) 
   
   my take is that if a project uses one style it is better to follow that codestyle.
   
   if we want to change it is better to open a separate discussion




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] vroyer commented on a change in pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

Posted by GitBox <gi...@apache.org>.
vroyer commented on a change in pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613#discussion_r635329441



##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,254 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 /**
  * The base abstract class for ElasticSearch sinks.
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticsearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {
 
-    private URL url;
-    private RestHighLevelClient client;
-    private CredentialsProvider credentialsProvider;
-    private ElasticSearchConfig elasticSearchConfig;
+    private ElasticsearchConfig elasticSearchConfig;
+    private ElasticsearchClient elasticsearchClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        elasticSearchConfig = ElasticSearchConfig.load(config);
+        elasticSearchConfig = ElasticsearchConfig.load(config);
         elasticSearchConfig.validate();
-        createIndexIfNeeded();
+        elasticsearchClient = new ElasticsearchClient(elasticSearchConfig);
     }
 
     @Override
-    public void close() throws Exception {
-        client.close();
+    public void close() {
+        if (elasticsearchClient != null) {
+            elasticsearchClient.close();
+            elasticsearchClient = null;
+        }
     }
 
     @Override
-    public void write(Record<byte[]> record) {
-        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
-        IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName());
-        indexRequest.type(elasticSearchConfig.getTypeName());
-        indexRequest.source(keyValue.getValue(), XContentType.JSON);
-
-        try {
-        IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT);
-            if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
-                record.ack();
-            } else {
-                record.fail();
+    public void write(Record<GenericObject> record) throws Exception {
+        if (!elasticsearchClient.isFailed()) {
+            try {
+                Pair<String, String> idAndDoc = extractIdAndDocument(record);
+                if (idAndDoc.getRight() == null) {
+                    switch (elasticSearchConfig.getNullValueAction()) {
+                        case DELETE:
+                            if (elasticSearchConfig.isBulkEnabled()) {
+                                elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
+                            } else {
+                                elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
+                            }
+                            break;
+                        case IGNORE:
+                            break;
+                        case FAIL:
+                            elasticsearchClient.failed(new PulsarClientException.InvalidMessageException("Unexpected null message value"));
+                            throw elasticsearchClient.irrecoverableError.get();
+                    }
+                } else {
+                    if (elasticSearchConfig.isBulkEnabled()) {
+                        elasticsearchClient.bulkIndex(record, idAndDoc);
+                    } else {
+                        elasticsearchClient.indexDocument(record, idAndDoc);
+                    }
+                }
+            } catch (Exception e) {
+                log.error("write error:", e);
+                throw e;
             }
-        } catch (final IOException ex) {
-            record.fail();
         }
     }
 
-    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
-        String key = record.getKey().orElse("");
-        return new KeyValue<>(key, record.getValue());
+    @VisibleForTesting
+    ElasticsearchClient getElasticsearchClient() {
+        return this.elasticsearchClient;
     }
 
-    private void createIndexIfNeeded() throws IOException {
-        GetIndexRequest request = new GetIndexRequest();
-        request.indices(elasticSearchConfig.getIndexName());
-        boolean exists = getClient().indices().exists(request, RequestOptions.DEFAULT);
+    /**
+     * Extract ES _id and _source using the Schema if available.
+     *
+     * @param record
+     * @return A pair for _id and _source
+     */
+    public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) {
+        Object key = null;
+        Object value = null;
+        Schema<?> keySchema = null;
+        Schema<?> valueSchema = null;
+
+        if (record.getSchema() instanceof KeyValueSchema) {
+            KeyValueSchema<GenericObject,GenericObject> keyValueSchema = (KeyValueSchema) record.getSchema();
+            keySchema = keyValueSchema.getKeySchema();
+            valueSchema = keyValueSchema.getValueSchema();
+            if (KeyValueEncodingType.SEPARATED.equals(keyValueSchema.getKeyValueEncodingType()) && record.getKey().isPresent()) {
+                key = keySchema.decode(record.getKey().get().getBytes(StandardCharsets.UTF_8));
+            }
+            if (record.getValue() != null) {
+                value = ((KeyValue) record.getValue().getNativeObject()).getValue();
+                if (KeyValueEncodingType.INLINE.equals(keyValueSchema.getKeyValueEncodingType())) {
+                    key = ((KeyValue) record.getValue().getNativeObject()).getKey();
+                }
+            }
+        } else {
+            key = record.getKey().orElse(null);
+            valueSchema = record.getSchema();
+            value = record.getValue() == null ? null : record.getValue().getNativeObject();
+        }
 
-        if (!exists) {
-            CreateIndexRequest cireq = new CreateIndexRequest(elasticSearchConfig.getIndexName());
+        String id = key.toString();
+        if (keySchema != null) {
+            id = stringifyKey(keySchema, key);
+        }
 
-            cireq.settings(Settings.builder()
-               .put("index.number_of_shards", elasticSearchConfig.getIndexNumberOfShards())
-               .put("index.number_of_replicas", elasticSearchConfig.getIndexNumberOfReplicas()));
+        String doc = null;
+        if (value != null) {
+            if (valueSchema != null) {
+                doc = stringifyValue(valueSchema, value);
+            } else {
+                doc = value.toString();
+            }
+        }
 
-            CreateIndexResponse ciresp = getClient().indices().create(cireq, RequestOptions.DEFAULT);
-            if (!ciresp.isAcknowledged() || !ciresp.isShardsAcknowledged()) {
-                throw new RuntimeException("Unable to create index.");
+        if (elasticSearchConfig.isKeyIgnore()) {
+            if (doc == null || Strings.isNullOrEmpty(elasticSearchConfig.getPrimaryFields())) {
+                id = Hex.encodeHexString(record.getMessage().get().getMessageId().toByteArray());
+            } else {
+                try {
+                    // extract the PK from the JSON document
+                    JsonNode jsonNode = objectMapper.readTree(doc);
+                    List<String> pkFields = Arrays.asList(elasticSearchConfig.getPrimaryFields().split(","));
+                    id = stringifyKey(jsonNode, pkFields);
+                } catch (JsonProcessingException e) {
+                    log.error("Failed to read JSON", e);
+                }
             }
         }
+
+        SchemaType schemaType = null;
+        if (record.getSchema() != null && record.getSchema().getSchemaInfo() != null) {
+            schemaType = record.getSchema().getSchemaInfo().getType();
+        }
+        log.debug("recordType={} schemaType={} id={} doc={}",
+                record.getClass().getName(),
+                schemaType,
+                id,
+                doc);
+        return Pair.of(id, doc);
     }
 
-    private URL getUrl() throws MalformedURLException {
-        if (url == null) {
-            url = new URL(elasticSearchConfig.getElasticSearchUrl());
+    public String stringifyKey(Schema<?> schema, Object val) {
+        switch (schema.getSchemaInfo().getType()) {
+            case INT8:
+                return Byte.toString((Byte) val);
+            case INT16:
+                return Short.toString((Short) val);
+            case INT32:
+                return Integer.toString((Integer) val);
+            case INT64:
+                return Long.toString((Long) val);
+            case STRING:
+                return (String) val;
+            case JSON:
+                try {
+                    GenericJsonRecord genericJsonRecord = (GenericJsonRecord) val;
+                    return stringifyKey(genericJsonRecord.getJsonNode());
+                } catch (JsonProcessingException e) {
+                    log.error("Failed to convert JSON to a JSON string", e);
+                    throw new RuntimeException(e);
+                }
+            case AVRO:
+                try {
+                    GenericAvroRecord genericAvroRecord = (GenericAvroRecord) val;
+                    JsonNode jsonNode = JsonConverter.toJson(genericAvroRecord.getAvroRecord());

Review comment:
       yes, fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613#discussion_r633824470



##########
File path: pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SslConfig.java
##########
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.core;
+
+import lombok.Data;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.Serializable;
+
+@Data
+@Accessors(chain = true)

Review comment:
       Can you move this inside the ES Sink?
   It looks like a new API if we put this here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613#issuecomment-851650806


   @merlimat @sijie @codelipenghui please take a look.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613#discussion_r635367731



##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,254 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 /**
  * The base abstract class for ElasticSearch sinks.
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticsearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {
 
-    private URL url;
-    private RestHighLevelClient client;
-    private CredentialsProvider credentialsProvider;
-    private ElasticSearchConfig elasticSearchConfig;
+    private ElasticsearchConfig elasticSearchConfig;
+    private ElasticsearchClient elasticsearchClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        elasticSearchConfig = ElasticSearchConfig.load(config);
+        elasticSearchConfig = ElasticsearchConfig.load(config);
         elasticSearchConfig.validate();
-        createIndexIfNeeded();
+        elasticsearchClient = new ElasticsearchClient(elasticSearchConfig);
     }
 
     @Override
-    public void close() throws Exception {
-        client.close();
+    public void close() {
+        if (elasticsearchClient != null) {
+            elasticsearchClient.close();
+            elasticsearchClient = null;
+        }
     }
 
     @Override
-    public void write(Record<byte[]> record) {
-        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
-        IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName());
-        indexRequest.type(elasticSearchConfig.getTypeName());
-        indexRequest.source(keyValue.getValue(), XContentType.JSON);
-
-        try {
-        IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT);
-            if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
-                record.ack();
-            } else {
-                record.fail();
+    public void write(Record<GenericObject> record) throws Exception {
+        if (!elasticsearchClient.isFailed()) {
+            try {
+                Pair<String, String> idAndDoc = extractIdAndDocument(record);
+                if (idAndDoc.getRight() == null) {
+                    switch (elasticSearchConfig.getNullValueAction()) {
+                        case DELETE:
+                            if (elasticSearchConfig.isBulkEnabled()) {
+                                elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
+                            } else {
+                                elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
+                            }
+                            break;
+                        case IGNORE:
+                            break;
+                        case FAIL:
+                            elasticsearchClient.failed(new PulsarClientException.InvalidMessageException("Unexpected null message value"));
+                            throw elasticsearchClient.irrecoverableError.get();
+                    }
+                } else {
+                    if (elasticSearchConfig.isBulkEnabled()) {
+                        elasticsearchClient.bulkIndex(record, idAndDoc);
+                    } else {
+                        elasticsearchClient.indexDocument(record, idAndDoc);
+                    }
+                }
+            } catch (Exception e) {

Review comment:
       in theory the exception should bubble up.
   
   if you catch generic Exception then you have to deal with system exceptions.
   
   at least we have to deal with InterruptedException as a special case and set `Thread.currentThread().interrupted()` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613#discussion_r633824470



##########
File path: pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SslConfig.java
##########
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.core;
+
+import lombok.Data;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.Serializable;
+
+@Data
+@Accessors(chain = true)

Review comment:
       inside the ES Sink?
   It looks like a new API if we put this here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] vroyer commented on a change in pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

Posted by GitBox <gi...@apache.org>.
vroyer commented on a change in pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613#discussion_r635328832



##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,254 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 /**
  * The base abstract class for ElasticSearch sinks.
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticsearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {
 
-    private URL url;
-    private RestHighLevelClient client;
-    private CredentialsProvider credentialsProvider;
-    private ElasticSearchConfig elasticSearchConfig;
+    private ElasticsearchConfig elasticSearchConfig;
+    private ElasticsearchClient elasticsearchClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        elasticSearchConfig = ElasticSearchConfig.load(config);
+        elasticSearchConfig = ElasticsearchConfig.load(config);
         elasticSearchConfig.validate();
-        createIndexIfNeeded();
+        elasticsearchClient = new ElasticsearchClient(elasticSearchConfig);
     }
 
     @Override
-    public void close() throws Exception {
-        client.close();
+    public void close() {
+        if (elasticsearchClient != null) {
+            elasticsearchClient.close();
+            elasticsearchClient = null;
+        }
     }
 
     @Override
-    public void write(Record<byte[]> record) {
-        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
-        IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName());
-        indexRequest.type(elasticSearchConfig.getTypeName());
-        indexRequest.source(keyValue.getValue(), XContentType.JSON);
-
-        try {
-        IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT);
-            if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
-                record.ack();
-            } else {
-                record.fail();
+    public void write(Record<GenericObject> record) throws Exception {
+        if (!elasticsearchClient.isFailed()) {
+            try {
+                Pair<String, String> idAndDoc = extractIdAndDocument(record);
+                if (idAndDoc.getRight() == null) {
+                    switch (elasticSearchConfig.getNullValueAction()) {
+                        case DELETE:
+                            if (elasticSearchConfig.isBulkEnabled()) {
+                                elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
+                            } else {
+                                elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
+                            }
+                            break;
+                        case IGNORE:
+                            break;
+                        case FAIL:
+                            elasticsearchClient.failed(new PulsarClientException.InvalidMessageException("Unexpected null message value"));
+                            throw elasticsearchClient.irrecoverableError.get();
+                    }
+                } else {
+                    if (elasticSearchConfig.isBulkEnabled()) {
+                        elasticsearchClient.bulkIndex(record, idAndDoc);
+                    } else {
+                        elasticsearchClient.indexDocument(record, idAndDoc);
+                    }
+                }
+            } catch (Exception e) {
+                log.error("write error:", e);
+                throw e;
             }
-        } catch (final IOException ex) {
-            record.fail();
         }
     }
 
-    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
-        String key = record.getKey().orElse("");
-        return new KeyValue<>(key, record.getValue());
+    @VisibleForTesting
+    ElasticsearchClient getElasticsearchClient() {
+        return this.elasticsearchClient;
     }
 
-    private void createIndexIfNeeded() throws IOException {
-        GetIndexRequest request = new GetIndexRequest();
-        request.indices(elasticSearchConfig.getIndexName());
-        boolean exists = getClient().indices().exists(request, RequestOptions.DEFAULT);
+    /**
+     * Extract ES _id and _source using the Schema if available.
+     *
+     * @param record
+     * @return A pair for _id and _source
+     */
+    public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) {
+        Object key = null;
+        Object value = null;
+        Schema<?> keySchema = null;
+        Schema<?> valueSchema = null;
+
+        if (record.getSchema() instanceof KeyValueSchema) {
+            KeyValueSchema<GenericObject,GenericObject> keyValueSchema = (KeyValueSchema) record.getSchema();
+            keySchema = keyValueSchema.getKeySchema();
+            valueSchema = keyValueSchema.getValueSchema();
+            if (KeyValueEncodingType.SEPARATED.equals(keyValueSchema.getKeyValueEncodingType()) && record.getKey().isPresent()) {
+                key = keySchema.decode(record.getKey().get().getBytes(StandardCharsets.UTF_8));
+            }
+            if (record.getValue() != null) {
+                value = ((KeyValue) record.getValue().getNativeObject()).getValue();
+                if (KeyValueEncodingType.INLINE.equals(keyValueSchema.getKeyValueEncodingType())) {
+                    key = ((KeyValue) record.getValue().getNativeObject()).getKey();
+                }
+            }
+        } else {
+            key = record.getKey().orElse(null);
+            valueSchema = record.getSchema();
+            value = record.getValue() == null ? null : record.getValue().getNativeObject();
+        }
 
-        if (!exists) {
-            CreateIndexRequest cireq = new CreateIndexRequest(elasticSearchConfig.getIndexName());
+        String id = key.toString();
+        if (keySchema != null) {
+            id = stringifyKey(keySchema, key);
+        }
 
-            cireq.settings(Settings.builder()
-               .put("index.number_of_shards", elasticSearchConfig.getIndexNumberOfShards())
-               .put("index.number_of_replicas", elasticSearchConfig.getIndexNumberOfReplicas()));
+        String doc = null;
+        if (value != null) {
+            if (valueSchema != null) {
+                doc = stringifyValue(valueSchema, value);
+            } else {
+                doc = value.toString();
+            }
+        }
 
-            CreateIndexResponse ciresp = getClient().indices().create(cireq, RequestOptions.DEFAULT);
-            if (!ciresp.isAcknowledged() || !ciresp.isShardsAcknowledged()) {
-                throw new RuntimeException("Unable to create index.");
+        if (elasticSearchConfig.isKeyIgnore()) {
+            if (doc == null || Strings.isNullOrEmpty(elasticSearchConfig.getPrimaryFields())) {
+                id = Hex.encodeHexString(record.getMessage().get().getMessageId().toByteArray());
+            } else {
+                try {
+                    // extract the PK from the JSON document
+                    JsonNode jsonNode = objectMapper.readTree(doc);
+                    List<String> pkFields = Arrays.asList(elasticSearchConfig.getPrimaryFields().split(","));
+                    id = stringifyKey(jsonNode, pkFields);
+                } catch (JsonProcessingException e) {
+                    log.error("Failed to read JSON", e);
+                }
             }
         }
+
+        SchemaType schemaType = null;
+        if (record.getSchema() != null && record.getSchema().getSchemaInfo() != null) {
+            schemaType = record.getSchema().getSchemaInfo().getType();
+        }
+        log.debug("recordType={} schemaType={} id={} doc={}",

Review comment:
       You mean add if(log.isDebugEnabled()) before every log.debug() ? The log.debug() is doing that check internally, this is useless ?

##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,254 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 /**
  * The base abstract class for ElasticSearch sinks.
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticsearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {
 
-    private URL url;
-    private RestHighLevelClient client;
-    private CredentialsProvider credentialsProvider;
-    private ElasticSearchConfig elasticSearchConfig;
+    private ElasticsearchConfig elasticSearchConfig;
+    private ElasticsearchClient elasticsearchClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        elasticSearchConfig = ElasticSearchConfig.load(config);
+        elasticSearchConfig = ElasticsearchConfig.load(config);
         elasticSearchConfig.validate();
-        createIndexIfNeeded();
+        elasticsearchClient = new ElasticsearchClient(elasticSearchConfig);
     }
 
     @Override
-    public void close() throws Exception {
-        client.close();
+    public void close() {
+        if (elasticsearchClient != null) {
+            elasticsearchClient.close();
+            elasticsearchClient = null;
+        }
     }
 
     @Override
-    public void write(Record<byte[]> record) {
-        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
-        IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName());
-        indexRequest.type(elasticSearchConfig.getTypeName());
-        indexRequest.source(keyValue.getValue(), XContentType.JSON);
-
-        try {
-        IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT);
-            if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
-                record.ack();
-            } else {
-                record.fail();
+    public void write(Record<GenericObject> record) throws Exception {
+        if (!elasticsearchClient.isFailed()) {
+            try {
+                Pair<String, String> idAndDoc = extractIdAndDocument(record);
+                if (idAndDoc.getRight() == null) {
+                    switch (elasticSearchConfig.getNullValueAction()) {
+                        case DELETE:
+                            if (elasticSearchConfig.isBulkEnabled()) {
+                                elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
+                            } else {
+                                elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
+                            }
+                            break;
+                        case IGNORE:
+                            break;
+                        case FAIL:
+                            elasticsearchClient.failed(new PulsarClientException.InvalidMessageException("Unexpected null message value"));
+                            throw elasticsearchClient.irrecoverableError.get();
+                    }
+                } else {
+                    if (elasticSearchConfig.isBulkEnabled()) {
+                        elasticsearchClient.bulkIndex(record, idAndDoc);
+                    } else {
+                        elasticsearchClient.indexDocument(record, idAndDoc);
+                    }
+                }
+            } catch (Exception e) {
+                log.error("write error:", e);
+                throw e;
             }
-        } catch (final IOException ex) {
-            record.fail();
         }
     }
 
-    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
-        String key = record.getKey().orElse("");
-        return new KeyValue<>(key, record.getValue());
+    @VisibleForTesting
+    ElasticsearchClient getElasticsearchClient() {
+        return this.elasticsearchClient;
     }
 
-    private void createIndexIfNeeded() throws IOException {
-        GetIndexRequest request = new GetIndexRequest();
-        request.indices(elasticSearchConfig.getIndexName());
-        boolean exists = getClient().indices().exists(request, RequestOptions.DEFAULT);
+    /**
+     * Extract ES _id and _source using the Schema if available.
+     *
+     * @param record
+     * @return A pair for _id and _source
+     */
+    public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) {
+        Object key = null;
+        Object value = null;
+        Schema<?> keySchema = null;
+        Schema<?> valueSchema = null;
+
+        if (record.getSchema() instanceof KeyValueSchema) {
+            KeyValueSchema<GenericObject,GenericObject> keyValueSchema = (KeyValueSchema) record.getSchema();
+            keySchema = keyValueSchema.getKeySchema();
+            valueSchema = keyValueSchema.getValueSchema();
+            if (KeyValueEncodingType.SEPARATED.equals(keyValueSchema.getKeyValueEncodingType()) && record.getKey().isPresent()) {
+                key = keySchema.decode(record.getKey().get().getBytes(StandardCharsets.UTF_8));
+            }
+            if (record.getValue() != null) {
+                value = ((KeyValue) record.getValue().getNativeObject()).getValue();
+                if (KeyValueEncodingType.INLINE.equals(keyValueSchema.getKeyValueEncodingType())) {
+                    key = ((KeyValue) record.getValue().getNativeObject()).getKey();
+                }
+            }
+        } else {
+            key = record.getKey().orElse(null);
+            valueSchema = record.getSchema();
+            value = record.getValue() == null ? null : record.getValue().getNativeObject();
+        }
 
-        if (!exists) {
-            CreateIndexRequest cireq = new CreateIndexRequest(elasticSearchConfig.getIndexName());
+        String id = key.toString();
+        if (keySchema != null) {
+            id = stringifyKey(keySchema, key);
+        }
 
-            cireq.settings(Settings.builder()
-               .put("index.number_of_shards", elasticSearchConfig.getIndexNumberOfShards())
-               .put("index.number_of_replicas", elasticSearchConfig.getIndexNumberOfReplicas()));
+        String doc = null;
+        if (value != null) {
+            if (valueSchema != null) {
+                doc = stringifyValue(valueSchema, value);
+            } else {
+                doc = value.toString();
+            }
+        }
 
-            CreateIndexResponse ciresp = getClient().indices().create(cireq, RequestOptions.DEFAULT);
-            if (!ciresp.isAcknowledged() || !ciresp.isShardsAcknowledged()) {
-                throw new RuntimeException("Unable to create index.");
+        if (elasticSearchConfig.isKeyIgnore()) {
+            if (doc == null || Strings.isNullOrEmpty(elasticSearchConfig.getPrimaryFields())) {
+                id = Hex.encodeHexString(record.getMessage().get().getMessageId().toByteArray());
+            } else {
+                try {
+                    // extract the PK from the JSON document
+                    JsonNode jsonNode = objectMapper.readTree(doc);
+                    List<String> pkFields = Arrays.asList(elasticSearchConfig.getPrimaryFields().split(","));
+                    id = stringifyKey(jsonNode, pkFields);
+                } catch (JsonProcessingException e) {
+                    log.error("Failed to read JSON", e);
+                }
             }
         }
+
+        SchemaType schemaType = null;
+        if (record.getSchema() != null && record.getSchema().getSchemaInfo() != null) {
+            schemaType = record.getSchema().getSchemaInfo().getType();
+        }
+        log.debug("recordType={} schemaType={} id={} doc={}",
+                record.getClass().getName(),
+                schemaType,
+                id,
+                doc);
+        return Pair.of(id, doc);
     }
 
-    private URL getUrl() throws MalformedURLException {
-        if (url == null) {
-            url = new URL(elasticSearchConfig.getElasticSearchUrl());
+    public String stringifyKey(Schema<?> schema, Object val) {
+        switch (schema.getSchemaInfo().getType()) {
+            case INT8:
+                return Byte.toString((Byte) val);
+            case INT16:
+                return Short.toString((Short) val);
+            case INT32:
+                return Integer.toString((Integer) val);
+            case INT64:
+                return Long.toString((Long) val);
+            case STRING:
+                return (String) val;
+            case JSON:
+                try {
+                    GenericJsonRecord genericJsonRecord = (GenericJsonRecord) val;

Review comment:
       yes, fixed

##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,254 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 /**
  * The base abstract class for ElasticSearch sinks.
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticsearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {
 
-    private URL url;
-    private RestHighLevelClient client;
-    private CredentialsProvider credentialsProvider;
-    private ElasticSearchConfig elasticSearchConfig;
+    private ElasticsearchConfig elasticSearchConfig;
+    private ElasticsearchClient elasticsearchClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        elasticSearchConfig = ElasticSearchConfig.load(config);
+        elasticSearchConfig = ElasticsearchConfig.load(config);
         elasticSearchConfig.validate();
-        createIndexIfNeeded();
+        elasticsearchClient = new ElasticsearchClient(elasticSearchConfig);
     }
 
     @Override
-    public void close() throws Exception {
-        client.close();
+    public void close() {
+        if (elasticsearchClient != null) {
+            elasticsearchClient.close();
+            elasticsearchClient = null;
+        }
     }
 
     @Override
-    public void write(Record<byte[]> record) {
-        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
-        IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName());
-        indexRequest.type(elasticSearchConfig.getTypeName());
-        indexRequest.source(keyValue.getValue(), XContentType.JSON);
-
-        try {
-        IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT);
-            if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
-                record.ack();
-            } else {
-                record.fail();
+    public void write(Record<GenericObject> record) throws Exception {
+        if (!elasticsearchClient.isFailed()) {
+            try {
+                Pair<String, String> idAndDoc = extractIdAndDocument(record);
+                if (idAndDoc.getRight() == null) {
+                    switch (elasticSearchConfig.getNullValueAction()) {
+                        case DELETE:
+                            if (elasticSearchConfig.isBulkEnabled()) {
+                                elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
+                            } else {
+                                elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
+                            }
+                            break;
+                        case IGNORE:
+                            break;
+                        case FAIL:
+                            elasticsearchClient.failed(new PulsarClientException.InvalidMessageException("Unexpected null message value"));
+                            throw elasticsearchClient.irrecoverableError.get();
+                    }
+                } else {
+                    if (elasticSearchConfig.isBulkEnabled()) {
+                        elasticsearchClient.bulkIndex(record, idAndDoc);
+                    } else {
+                        elasticsearchClient.indexDocument(record, idAndDoc);
+                    }
+                }
+            } catch (Exception e) {
+                log.error("write error:", e);
+                throw e;
             }
-        } catch (final IOException ex) {
-            record.fail();
         }
     }
 
-    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
-        String key = record.getKey().orElse("");
-        return new KeyValue<>(key, record.getValue());
+    @VisibleForTesting
+    ElasticsearchClient getElasticsearchClient() {
+        return this.elasticsearchClient;
     }
 
-    private void createIndexIfNeeded() throws IOException {
-        GetIndexRequest request = new GetIndexRequest();
-        request.indices(elasticSearchConfig.getIndexName());
-        boolean exists = getClient().indices().exists(request, RequestOptions.DEFAULT);
+    /**
+     * Extract ES _id and _source using the Schema if available.
+     *
+     * @param record
+     * @return A pair for _id and _source
+     */
+    public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) {
+        Object key = null;
+        Object value = null;
+        Schema<?> keySchema = null;
+        Schema<?> valueSchema = null;
+
+        if (record.getSchema() instanceof KeyValueSchema) {
+            KeyValueSchema<GenericObject,GenericObject> keyValueSchema = (KeyValueSchema) record.getSchema();
+            keySchema = keyValueSchema.getKeySchema();
+            valueSchema = keyValueSchema.getValueSchema();
+            if (KeyValueEncodingType.SEPARATED.equals(keyValueSchema.getKeyValueEncodingType()) && record.getKey().isPresent()) {
+                key = keySchema.decode(record.getKey().get().getBytes(StandardCharsets.UTF_8));
+            }
+            if (record.getValue() != null) {
+                value = ((KeyValue) record.getValue().getNativeObject()).getValue();
+                if (KeyValueEncodingType.INLINE.equals(keyValueSchema.getKeyValueEncodingType())) {
+                    key = ((KeyValue) record.getValue().getNativeObject()).getKey();
+                }
+            }
+        } else {
+            key = record.getKey().orElse(null);
+            valueSchema = record.getSchema();
+            value = record.getValue() == null ? null : record.getValue().getNativeObject();
+        }
 
-        if (!exists) {
-            CreateIndexRequest cireq = new CreateIndexRequest(elasticSearchConfig.getIndexName());
+        String id = key.toString();
+        if (keySchema != null) {
+            id = stringifyKey(keySchema, key);
+        }
 
-            cireq.settings(Settings.builder()
-               .put("index.number_of_shards", elasticSearchConfig.getIndexNumberOfShards())
-               .put("index.number_of_replicas", elasticSearchConfig.getIndexNumberOfReplicas()));
+        String doc = null;
+        if (value != null) {
+            if (valueSchema != null) {
+                doc = stringifyValue(valueSchema, value);
+            } else {
+                doc = value.toString();
+            }
+        }
 
-            CreateIndexResponse ciresp = getClient().indices().create(cireq, RequestOptions.DEFAULT);
-            if (!ciresp.isAcknowledged() || !ciresp.isShardsAcknowledged()) {
-                throw new RuntimeException("Unable to create index.");
+        if (elasticSearchConfig.isKeyIgnore()) {
+            if (doc == null || Strings.isNullOrEmpty(elasticSearchConfig.getPrimaryFields())) {
+                id = Hex.encodeHexString(record.getMessage().get().getMessageId().toByteArray());
+            } else {
+                try {
+                    // extract the PK from the JSON document
+                    JsonNode jsonNode = objectMapper.readTree(doc);
+                    List<String> pkFields = Arrays.asList(elasticSearchConfig.getPrimaryFields().split(","));
+                    id = stringifyKey(jsonNode, pkFields);
+                } catch (JsonProcessingException e) {
+                    log.error("Failed to read JSON", e);
+                }
             }
         }
+
+        SchemaType schemaType = null;
+        if (record.getSchema() != null && record.getSchema().getSchemaInfo() != null) {
+            schemaType = record.getSchema().getSchemaInfo().getType();
+        }
+        log.debug("recordType={} schemaType={} id={} doc={}",
+                record.getClass().getName(),
+                schemaType,
+                id,
+                doc);
+        return Pair.of(id, doc);
     }
 
-    private URL getUrl() throws MalformedURLException {
-        if (url == null) {
-            url = new URL(elasticSearchConfig.getElasticSearchUrl());
+    public String stringifyKey(Schema<?> schema, Object val) {
+        switch (schema.getSchemaInfo().getType()) {
+            case INT8:
+                return Byte.toString((Byte) val);
+            case INT16:
+                return Short.toString((Short) val);
+            case INT32:
+                return Integer.toString((Integer) val);
+            case INT64:
+                return Long.toString((Long) val);
+            case STRING:
+                return (String) val;
+            case JSON:
+                try {
+                    GenericJsonRecord genericJsonRecord = (GenericJsonRecord) val;
+                    return stringifyKey(genericJsonRecord.getJsonNode());
+                } catch (JsonProcessingException e) {
+                    log.error("Failed to convert JSON to a JSON string", e);
+                    throw new RuntimeException(e);
+                }
+            case AVRO:
+                try {
+                    GenericAvroRecord genericAvroRecord = (GenericAvroRecord) val;
+                    JsonNode jsonNode = JsonConverter.toJson(genericAvroRecord.getAvroRecord());
+                    return stringifyKey(jsonNode);
+                } catch (Exception e) {
+                    log.error("Failed to convert AVRO to a JSON string", e);
+                    throw new RuntimeException(e);
+                }
+            default:
+                throw new UnsupportedOperationException("Unsupported key schemaType=" + schema.getSchemaInfo().getType());
         }
-        return url;
     }
 
-    private CredentialsProvider getCredentialsProvider() {
+    /**
+     * Convert a JsonNode to an Elasticsearch id.
+     */
+    public String stringifyKey(JsonNode jsonNode) throws JsonProcessingException {
+        List<String> fields = new ArrayList<>();
+        jsonNode.fieldNames().forEachRemaining(fields::add);
+        return stringifyKey(jsonNode, fields);
+    }
 
-        if (StringUtils.isEmpty(elasticSearchConfig.getUsername())
-            || StringUtils.isEmpty(elasticSearchConfig.getPassword())) {
-            return null;
+    public String stringifyKey(JsonNode jsonNode, List<String> fields) throws JsonProcessingException {
+        if (fields.size() == 1) {
+            JsonNode singleNode = jsonNode.get(fields.get(0));
+            String id = objectMapper.writeValueAsString(singleNode);
+            return (id.startsWith("\"") && id.endsWith("\""))
+                    ? id.substring(1, id.length() - 1)  // remove double quotes
+                    : id;
+        } else {
+            return JsonConverter.toJsonArray(jsonNode, fields).toString();
         }
-
-        credentialsProvider = new BasicCredentialsProvider();
-        credentialsProvider.setCredentials(AuthScope.ANY,
-                new UsernamePasswordCredentials(elasticSearchConfig.getUsername(),
-                        elasticSearchConfig.getPassword()));
-        return credentialsProvider;
     }
 
-    private RestHighLevelClient getClient() throws MalformedURLException {
-        if (client == null) {
-          CredentialsProvider cp = getCredentialsProvider();
-          RestClientBuilder builder = RestClient.builder(new HttpHost(getUrl().getHost(),
-                  getUrl().getPort(), getUrl().getProtocol()));
-
-          if (cp != null) {
-              builder.setHttpClientConfigCallback(httpClientBuilder ->
-              httpClientBuilder.setDefaultCredentialsProvider(cp));
-          }
-          client = new RestHighLevelClient(builder);
+    public String stringifyValue(Schema<?> schema, Object val) {
+        switch (schema.getSchemaInfo().getType()) {
+            case JSON:
+                try {
+                    GenericJsonRecord genericJsonRecord = (GenericJsonRecord) val;

Review comment:
       yes, fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613#discussion_r635179419



##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,254 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 /**
  * The base abstract class for ElasticSearch sinks.
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticsearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {
 
-    private URL url;
-    private RestHighLevelClient client;
-    private CredentialsProvider credentialsProvider;
-    private ElasticSearchConfig elasticSearchConfig;
+    private ElasticsearchConfig elasticSearchConfig;
+    private ElasticsearchClient elasticsearchClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        elasticSearchConfig = ElasticSearchConfig.load(config);
+        elasticSearchConfig = ElasticsearchConfig.load(config);
         elasticSearchConfig.validate();
-        createIndexIfNeeded();
+        elasticsearchClient = new ElasticsearchClient(elasticSearchConfig);
     }
 
     @Override
-    public void close() throws Exception {
-        client.close();
+    public void close() {
+        if (elasticsearchClient != null) {
+            elasticsearchClient.close();
+            elasticsearchClient = null;
+        }
     }
 
     @Override
-    public void write(Record<byte[]> record) {
-        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
-        IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName());
-        indexRequest.type(elasticSearchConfig.getTypeName());
-        indexRequest.source(keyValue.getValue(), XContentType.JSON);
-
-        try {
-        IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT);
-            if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
-                record.ack();
-            } else {
-                record.fail();
+    public void write(Record<GenericObject> record) throws Exception {
+        if (!elasticsearchClient.isFailed()) {
+            try {
+                Pair<String, String> idAndDoc = extractIdAndDocument(record);
+                if (idAndDoc.getRight() == null) {
+                    switch (elasticSearchConfig.getNullValueAction()) {
+                        case DELETE:
+                            if (elasticSearchConfig.isBulkEnabled()) {
+                                elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
+                            } else {
+                                elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
+                            }
+                            break;
+                        case IGNORE:
+                            break;
+                        case FAIL:
+                            elasticsearchClient.failed(new PulsarClientException.InvalidMessageException("Unexpected null message value"));
+                            throw elasticsearchClient.irrecoverableError.get();
+                    }
+                } else {
+                    if (elasticSearchConfig.isBulkEnabled()) {
+                        elasticsearchClient.bulkIndex(record, idAndDoc);
+                    } else {
+                        elasticsearchClient.indexDocument(record, idAndDoc);
+                    }
+                }
+            } catch (Exception e) {

Review comment:
       can we catch a more specific set of Exceptions ?

##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,254 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 /**
  * The base abstract class for ElasticSearch sinks.
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticsearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {
 
-    private URL url;
-    private RestHighLevelClient client;
-    private CredentialsProvider credentialsProvider;
-    private ElasticSearchConfig elasticSearchConfig;
+    private ElasticsearchConfig elasticSearchConfig;
+    private ElasticsearchClient elasticsearchClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        elasticSearchConfig = ElasticSearchConfig.load(config);
+        elasticSearchConfig = ElasticsearchConfig.load(config);
         elasticSearchConfig.validate();
-        createIndexIfNeeded();
+        elasticsearchClient = new ElasticsearchClient(elasticSearchConfig);
     }
 
     @Override
-    public void close() throws Exception {
-        client.close();
+    public void close() {
+        if (elasticsearchClient != null) {
+            elasticsearchClient.close();
+            elasticsearchClient = null;
+        }
     }
 
     @Override
-    public void write(Record<byte[]> record) {
-        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
-        IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName());
-        indexRequest.type(elasticSearchConfig.getTypeName());
-        indexRequest.source(keyValue.getValue(), XContentType.JSON);
-
-        try {
-        IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT);
-            if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
-                record.ack();
-            } else {
-                record.fail();
+    public void write(Record<GenericObject> record) throws Exception {
+        if (!elasticsearchClient.isFailed()) {
+            try {
+                Pair<String, String> idAndDoc = extractIdAndDocument(record);
+                if (idAndDoc.getRight() == null) {
+                    switch (elasticSearchConfig.getNullValueAction()) {
+                        case DELETE:
+                            if (elasticSearchConfig.isBulkEnabled()) {
+                                elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
+                            } else {
+                                elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
+                            }
+                            break;
+                        case IGNORE:
+                            break;
+                        case FAIL:
+                            elasticsearchClient.failed(new PulsarClientException.InvalidMessageException("Unexpected null message value"));
+                            throw elasticsearchClient.irrecoverableError.get();
+                    }
+                } else {
+                    if (elasticSearchConfig.isBulkEnabled()) {
+                        elasticsearchClient.bulkIndex(record, idAndDoc);
+                    } else {
+                        elasticsearchClient.indexDocument(record, idAndDoc);
+                    }
+                }
+            } catch (Exception e) {
+                log.error("write error:", e);
+                throw e;
             }
-        } catch (final IOException ex) {
-            record.fail();
         }
     }
 
-    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
-        String key = record.getKey().orElse("");
-        return new KeyValue<>(key, record.getValue());
+    @VisibleForTesting
+    ElasticsearchClient getElasticsearchClient() {
+        return this.elasticsearchClient;
     }
 
-    private void createIndexIfNeeded() throws IOException {
-        GetIndexRequest request = new GetIndexRequest();
-        request.indices(elasticSearchConfig.getIndexName());
-        boolean exists = getClient().indices().exists(request, RequestOptions.DEFAULT);
+    /**
+     * Extract ES _id and _source using the Schema if available.
+     *
+     * @param record
+     * @return A pair for _id and _source
+     */
+    public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) {
+        Object key = null;
+        Object value = null;
+        Schema<?> keySchema = null;
+        Schema<?> valueSchema = null;
+
+        if (record.getSchema() instanceof KeyValueSchema) {
+            KeyValueSchema<GenericObject,GenericObject> keyValueSchema = (KeyValueSchema) record.getSchema();
+            keySchema = keyValueSchema.getKeySchema();
+            valueSchema = keyValueSchema.getValueSchema();
+            if (KeyValueEncodingType.SEPARATED.equals(keyValueSchema.getKeyValueEncodingType()) && record.getKey().isPresent()) {
+                key = keySchema.decode(record.getKey().get().getBytes(StandardCharsets.UTF_8));
+            }
+            if (record.getValue() != null) {
+                value = ((KeyValue) record.getValue().getNativeObject()).getValue();
+                if (KeyValueEncodingType.INLINE.equals(keyValueSchema.getKeyValueEncodingType())) {
+                    key = ((KeyValue) record.getValue().getNativeObject()).getKey();
+                }
+            }
+        } else {
+            key = record.getKey().orElse(null);
+            valueSchema = record.getSchema();
+            value = record.getValue() == null ? null : record.getValue().getNativeObject();
+        }
 
-        if (!exists) {
-            CreateIndexRequest cireq = new CreateIndexRequest(elasticSearchConfig.getIndexName());
+        String id = key.toString();
+        if (keySchema != null) {
+            id = stringifyKey(keySchema, key);
+        }
 
-            cireq.settings(Settings.builder()
-               .put("index.number_of_shards", elasticSearchConfig.getIndexNumberOfShards())
-               .put("index.number_of_replicas", elasticSearchConfig.getIndexNumberOfReplicas()));
+        String doc = null;
+        if (value != null) {
+            if (valueSchema != null) {
+                doc = stringifyValue(valueSchema, value);
+            } else {
+                doc = value.toString();
+            }
+        }
 
-            CreateIndexResponse ciresp = getClient().indices().create(cireq, RequestOptions.DEFAULT);
-            if (!ciresp.isAcknowledged() || !ciresp.isShardsAcknowledged()) {
-                throw new RuntimeException("Unable to create index.");
+        if (elasticSearchConfig.isKeyIgnore()) {
+            if (doc == null || Strings.isNullOrEmpty(elasticSearchConfig.getPrimaryFields())) {
+                id = Hex.encodeHexString(record.getMessage().get().getMessageId().toByteArray());
+            } else {
+                try {
+                    // extract the PK from the JSON document
+                    JsonNode jsonNode = objectMapper.readTree(doc);
+                    List<String> pkFields = Arrays.asList(elasticSearchConfig.getPrimaryFields().split(","));
+                    id = stringifyKey(jsonNode, pkFields);
+                } catch (JsonProcessingException e) {
+                    log.error("Failed to read JSON", e);
+                }
             }
         }
+
+        SchemaType schemaType = null;
+        if (record.getSchema() != null && record.getSchema().getSchemaInfo() != null) {
+            schemaType = record.getSchema().getSchemaInfo().getType();
+        }
+        log.debug("recordType={} schemaType={} id={} doc={}",
+                record.getClass().getName(),
+                schemaType,
+                id,
+                doc);
+        return Pair.of(id, doc);
     }
 
-    private URL getUrl() throws MalformedURLException {
-        if (url == null) {
-            url = new URL(elasticSearchConfig.getElasticSearchUrl());
+    public String stringifyKey(Schema<?> schema, Object val) {
+        switch (schema.getSchemaInfo().getType()) {
+            case INT8:
+                return Byte.toString((Byte) val);
+            case INT16:
+                return Short.toString((Short) val);
+            case INT32:
+                return Integer.toString((Integer) val);
+            case INT64:
+                return Long.toString((Long) val);
+            case STRING:
+                return (String) val;
+            case JSON:
+                try {
+                    GenericJsonRecord genericJsonRecord = (GenericJsonRecord) val;

Review comment:
       no need to use non public API classes like GenericJsonRecord
   
   I suggest:
   `JsonNode node = (JsonNode) ((GenericRecord) val.getNativeObject());`

##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,254 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 /**
  * The base abstract class for ElasticSearch sinks.
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticsearchConfig.class

Review comment:
       why are we renaming this class ? ElasticSearchConfig -> ElasticsearchConfig

##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,254 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 /**
  * The base abstract class for ElasticSearch sinks.
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticsearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {
 
-    private URL url;
-    private RestHighLevelClient client;
-    private CredentialsProvider credentialsProvider;
-    private ElasticSearchConfig elasticSearchConfig;
+    private ElasticsearchConfig elasticSearchConfig;
+    private ElasticsearchClient elasticsearchClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        elasticSearchConfig = ElasticSearchConfig.load(config);
+        elasticSearchConfig = ElasticsearchConfig.load(config);
         elasticSearchConfig.validate();
-        createIndexIfNeeded();
+        elasticsearchClient = new ElasticsearchClient(elasticSearchConfig);
     }
 
     @Override
-    public void close() throws Exception {
-        client.close();
+    public void close() {
+        if (elasticsearchClient != null) {
+            elasticsearchClient.close();
+            elasticsearchClient = null;
+        }
     }
 
     @Override
-    public void write(Record<byte[]> record) {
-        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
-        IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName());
-        indexRequest.type(elasticSearchConfig.getTypeName());
-        indexRequest.source(keyValue.getValue(), XContentType.JSON);
-
-        try {
-        IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT);
-            if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
-                record.ack();
-            } else {
-                record.fail();
+    public void write(Record<GenericObject> record) throws Exception {
+        if (!elasticsearchClient.isFailed()) {
+            try {
+                Pair<String, String> idAndDoc = extractIdAndDocument(record);
+                if (idAndDoc.getRight() == null) {
+                    switch (elasticSearchConfig.getNullValueAction()) {
+                        case DELETE:
+                            if (elasticSearchConfig.isBulkEnabled()) {
+                                elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
+                            } else {
+                                elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
+                            }
+                            break;
+                        case IGNORE:
+                            break;
+                        case FAIL:
+                            elasticsearchClient.failed(new PulsarClientException.InvalidMessageException("Unexpected null message value"));
+                            throw elasticsearchClient.irrecoverableError.get();
+                    }
+                } else {
+                    if (elasticSearchConfig.isBulkEnabled()) {
+                        elasticsearchClient.bulkIndex(record, idAndDoc);
+                    } else {
+                        elasticsearchClient.indexDocument(record, idAndDoc);
+                    }
+                }
+            } catch (Exception e) {
+                log.error("write error:", e);
+                throw e;
             }
-        } catch (final IOException ex) {
-            record.fail();
         }
     }
 
-    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
-        String key = record.getKey().orElse("");
-        return new KeyValue<>(key, record.getValue());
+    @VisibleForTesting
+    ElasticsearchClient getElasticsearchClient() {
+        return this.elasticsearchClient;
     }
 
-    private void createIndexIfNeeded() throws IOException {
-        GetIndexRequest request = new GetIndexRequest();
-        request.indices(elasticSearchConfig.getIndexName());
-        boolean exists = getClient().indices().exists(request, RequestOptions.DEFAULT);
+    /**
+     * Extract ES _id and _source using the Schema if available.
+     *
+     * @param record
+     * @return A pair for _id and _source
+     */
+    public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) {
+        Object key = null;
+        Object value = null;
+        Schema<?> keySchema = null;
+        Schema<?> valueSchema = null;
+
+        if (record.getSchema() instanceof KeyValueSchema) {
+            KeyValueSchema<GenericObject,GenericObject> keyValueSchema = (KeyValueSchema) record.getSchema();
+            keySchema = keyValueSchema.getKeySchema();
+            valueSchema = keyValueSchema.getValueSchema();
+            if (KeyValueEncodingType.SEPARATED.equals(keyValueSchema.getKeyValueEncodingType()) && record.getKey().isPresent()) {

Review comment:
       there should not be a distinction between SEPARATED and INLINE
   
   I suggest to use:
   `key = ((KeyValue) record.getValue().getNativeObject()).getKey();`

##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,254 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 /**
  * The base abstract class for ElasticSearch sinks.
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticsearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {
 
-    private URL url;
-    private RestHighLevelClient client;
-    private CredentialsProvider credentialsProvider;
-    private ElasticSearchConfig elasticSearchConfig;
+    private ElasticsearchConfig elasticSearchConfig;
+    private ElasticsearchClient elasticsearchClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        elasticSearchConfig = ElasticSearchConfig.load(config);
+        elasticSearchConfig = ElasticsearchConfig.load(config);
         elasticSearchConfig.validate();
-        createIndexIfNeeded();
+        elasticsearchClient = new ElasticsearchClient(elasticSearchConfig);
     }
 
     @Override
-    public void close() throws Exception {
-        client.close();
+    public void close() {
+        if (elasticsearchClient != null) {
+            elasticsearchClient.close();
+            elasticsearchClient = null;
+        }
     }
 
     @Override
-    public void write(Record<byte[]> record) {
-        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
-        IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName());
-        indexRequest.type(elasticSearchConfig.getTypeName());
-        indexRequest.source(keyValue.getValue(), XContentType.JSON);
-
-        try {
-        IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT);
-            if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
-                record.ack();
-            } else {
-                record.fail();
+    public void write(Record<GenericObject> record) throws Exception {
+        if (!elasticsearchClient.isFailed()) {
+            try {
+                Pair<String, String> idAndDoc = extractIdAndDocument(record);
+                if (idAndDoc.getRight() == null) {
+                    switch (elasticSearchConfig.getNullValueAction()) {
+                        case DELETE:
+                            if (elasticSearchConfig.isBulkEnabled()) {
+                                elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
+                            } else {
+                                elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
+                            }
+                            break;
+                        case IGNORE:
+                            break;
+                        case FAIL:
+                            elasticsearchClient.failed(new PulsarClientException.InvalidMessageException("Unexpected null message value"));
+                            throw elasticsearchClient.irrecoverableError.get();
+                    }
+                } else {
+                    if (elasticSearchConfig.isBulkEnabled()) {
+                        elasticsearchClient.bulkIndex(record, idAndDoc);
+                    } else {
+                        elasticsearchClient.indexDocument(record, idAndDoc);
+                    }
+                }
+            } catch (Exception e) {
+                log.error("write error:", e);
+                throw e;
             }
-        } catch (final IOException ex) {
-            record.fail();
         }
     }
 
-    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
-        String key = record.getKey().orElse("");
-        return new KeyValue<>(key, record.getValue());
+    @VisibleForTesting
+    ElasticsearchClient getElasticsearchClient() {
+        return this.elasticsearchClient;
     }
 
-    private void createIndexIfNeeded() throws IOException {
-        GetIndexRequest request = new GetIndexRequest();
-        request.indices(elasticSearchConfig.getIndexName());
-        boolean exists = getClient().indices().exists(request, RequestOptions.DEFAULT);
+    /**
+     * Extract ES _id and _source using the Schema if available.
+     *
+     * @param record
+     * @return A pair for _id and _source
+     */
+    public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) {
+        Object key = null;
+        Object value = null;
+        Schema<?> keySchema = null;
+        Schema<?> valueSchema = null;
+
+        if (record.getSchema() instanceof KeyValueSchema) {
+            KeyValueSchema<GenericObject,GenericObject> keyValueSchema = (KeyValueSchema) record.getSchema();
+            keySchema = keyValueSchema.getKeySchema();
+            valueSchema = keyValueSchema.getValueSchema();
+            if (KeyValueEncodingType.SEPARATED.equals(keyValueSchema.getKeyValueEncodingType()) && record.getKey().isPresent()) {
+                key = keySchema.decode(record.getKey().get().getBytes(StandardCharsets.UTF_8));
+            }
+            if (record.getValue() != null) {
+                value = ((KeyValue) record.getValue().getNativeObject()).getValue();
+                if (KeyValueEncodingType.INLINE.equals(keyValueSchema.getKeyValueEncodingType())) {
+                    key = ((KeyValue) record.getValue().getNativeObject()).getKey();
+                }
+            }
+        } else {
+            key = record.getKey().orElse(null);
+            valueSchema = record.getSchema();
+            value = record.getValue() == null ? null : record.getValue().getNativeObject();
+        }
 
-        if (!exists) {
-            CreateIndexRequest cireq = new CreateIndexRequest(elasticSearchConfig.getIndexName());
+        String id = key.toString();

Review comment:
       key may be null

##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,254 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 /**
  * The base abstract class for ElasticSearch sinks.
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticsearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {
 
-    private URL url;
-    private RestHighLevelClient client;
-    private CredentialsProvider credentialsProvider;
-    private ElasticSearchConfig elasticSearchConfig;
+    private ElasticsearchConfig elasticSearchConfig;
+    private ElasticsearchClient elasticsearchClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        elasticSearchConfig = ElasticSearchConfig.load(config);
+        elasticSearchConfig = ElasticsearchConfig.load(config);
         elasticSearchConfig.validate();
-        createIndexIfNeeded();
+        elasticsearchClient = new ElasticsearchClient(elasticSearchConfig);
     }
 
     @Override
-    public void close() throws Exception {
-        client.close();
+    public void close() {
+        if (elasticsearchClient != null) {
+            elasticsearchClient.close();
+            elasticsearchClient = null;
+        }
     }
 
     @Override
-    public void write(Record<byte[]> record) {
-        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
-        IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName());
-        indexRequest.type(elasticSearchConfig.getTypeName());
-        indexRequest.source(keyValue.getValue(), XContentType.JSON);
-
-        try {
-        IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT);
-            if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
-                record.ack();
-            } else {
-                record.fail();
+    public void write(Record<GenericObject> record) throws Exception {
+        if (!elasticsearchClient.isFailed()) {
+            try {
+                Pair<String, String> idAndDoc = extractIdAndDocument(record);
+                if (idAndDoc.getRight() == null) {
+                    switch (elasticSearchConfig.getNullValueAction()) {
+                        case DELETE:
+                            if (elasticSearchConfig.isBulkEnabled()) {
+                                elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
+                            } else {
+                                elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
+                            }
+                            break;
+                        case IGNORE:
+                            break;
+                        case FAIL:
+                            elasticsearchClient.failed(new PulsarClientException.InvalidMessageException("Unexpected null message value"));
+                            throw elasticsearchClient.irrecoverableError.get();
+                    }
+                } else {
+                    if (elasticSearchConfig.isBulkEnabled()) {
+                        elasticsearchClient.bulkIndex(record, idAndDoc);
+                    } else {
+                        elasticsearchClient.indexDocument(record, idAndDoc);
+                    }
+                }
+            } catch (Exception e) {
+                log.error("write error:", e);
+                throw e;
             }
-        } catch (final IOException ex) {
-            record.fail();
         }
     }
 
-    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
-        String key = record.getKey().orElse("");
-        return new KeyValue<>(key, record.getValue());
+    @VisibleForTesting
+    ElasticsearchClient getElasticsearchClient() {
+        return this.elasticsearchClient;
     }
 
-    private void createIndexIfNeeded() throws IOException {
-        GetIndexRequest request = new GetIndexRequest();
-        request.indices(elasticSearchConfig.getIndexName());
-        boolean exists = getClient().indices().exists(request, RequestOptions.DEFAULT);
+    /**
+     * Extract ES _id and _source using the Schema if available.
+     *
+     * @param record
+     * @return A pair for _id and _source
+     */
+    public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) {
+        Object key = null;
+        Object value = null;
+        Schema<?> keySchema = null;
+        Schema<?> valueSchema = null;
+
+        if (record.getSchema() instanceof KeyValueSchema) {
+            KeyValueSchema<GenericObject,GenericObject> keyValueSchema = (KeyValueSchema) record.getSchema();
+            keySchema = keyValueSchema.getKeySchema();
+            valueSchema = keyValueSchema.getValueSchema();
+            if (KeyValueEncodingType.SEPARATED.equals(keyValueSchema.getKeyValueEncodingType()) && record.getKey().isPresent()) {
+                key = keySchema.decode(record.getKey().get().getBytes(StandardCharsets.UTF_8));
+            }
+            if (record.getValue() != null) {
+                value = ((KeyValue) record.getValue().getNativeObject()).getValue();
+                if (KeyValueEncodingType.INLINE.equals(keyValueSchema.getKeyValueEncodingType())) {
+                    key = ((KeyValue) record.getValue().getNativeObject()).getKey();
+                }
+            }
+        } else {
+            key = record.getKey().orElse(null);
+            valueSchema = record.getSchema();
+            value = record.getValue() == null ? null : record.getValue().getNativeObject();
+        }
 
-        if (!exists) {
-            CreateIndexRequest cireq = new CreateIndexRequest(elasticSearchConfig.getIndexName());
+        String id = key.toString();
+        if (keySchema != null) {
+            id = stringifyKey(keySchema, key);
+        }
 
-            cireq.settings(Settings.builder()
-               .put("index.number_of_shards", elasticSearchConfig.getIndexNumberOfShards())
-               .put("index.number_of_replicas", elasticSearchConfig.getIndexNumberOfReplicas()));
+        String doc = null;
+        if (value != null) {
+            if (valueSchema != null) {
+                doc = stringifyValue(valueSchema, value);
+            } else {
+                doc = value.toString();
+            }
+        }
 
-            CreateIndexResponse ciresp = getClient().indices().create(cireq, RequestOptions.DEFAULT);
-            if (!ciresp.isAcknowledged() || !ciresp.isShardsAcknowledged()) {
-                throw new RuntimeException("Unable to create index.");
+        if (elasticSearchConfig.isKeyIgnore()) {
+            if (doc == null || Strings.isNullOrEmpty(elasticSearchConfig.getPrimaryFields())) {
+                id = Hex.encodeHexString(record.getMessage().get().getMessageId().toByteArray());
+            } else {
+                try {
+                    // extract the PK from the JSON document
+                    JsonNode jsonNode = objectMapper.readTree(doc);
+                    List<String> pkFields = Arrays.asList(elasticSearchConfig.getPrimaryFields().split(","));
+                    id = stringifyKey(jsonNode, pkFields);
+                } catch (JsonProcessingException e) {
+                    log.error("Failed to read JSON", e);
+                }
             }
         }
+
+        SchemaType schemaType = null;
+        if (record.getSchema() != null && record.getSchema().getSchemaInfo() != null) {
+            schemaType = record.getSchema().getSchemaInfo().getType();
+        }
+        log.debug("recordType={} schemaType={} id={} doc={}",
+                record.getClass().getName(),
+                schemaType,
+                id,
+                doc);
+        return Pair.of(id, doc);
     }
 
-    private URL getUrl() throws MalformedURLException {
-        if (url == null) {
-            url = new URL(elasticSearchConfig.getElasticSearchUrl());
+    public String stringifyKey(Schema<?> schema, Object val) {
+        switch (schema.getSchemaInfo().getType()) {
+            case INT8:
+                return Byte.toString((Byte) val);
+            case INT16:
+                return Short.toString((Short) val);
+            case INT32:
+                return Integer.toString((Integer) val);
+            case INT64:
+                return Long.toString((Long) val);
+            case STRING:
+                return (String) val;
+            case JSON:
+                try {
+                    GenericJsonRecord genericJsonRecord = (GenericJsonRecord) val;
+                    return stringifyKey(genericJsonRecord.getJsonNode());
+                } catch (JsonProcessingException e) {
+                    log.error("Failed to convert JSON to a JSON string", e);
+                    throw new RuntimeException(e);
+                }
+            case AVRO:
+                try {
+                    GenericAvroRecord genericAvroRecord = (GenericAvroRecord) val;
+                    JsonNode jsonNode = JsonConverter.toJson(genericAvroRecord.getAvroRecord());
+                    return stringifyKey(jsonNode);
+                } catch (Exception e) {
+                    log.error("Failed to convert AVRO to a JSON string", e);
+                    throw new RuntimeException(e);
+                }
+            default:
+                throw new UnsupportedOperationException("Unsupported key schemaType=" + schema.getSchemaInfo().getType());
         }
-        return url;
     }
 
-    private CredentialsProvider getCredentialsProvider() {
+    /**
+     * Convert a JsonNode to an Elasticsearch id.
+     */
+    public String stringifyKey(JsonNode jsonNode) throws JsonProcessingException {
+        List<String> fields = new ArrayList<>();
+        jsonNode.fieldNames().forEachRemaining(fields::add);
+        return stringifyKey(jsonNode, fields);
+    }
 
-        if (StringUtils.isEmpty(elasticSearchConfig.getUsername())
-            || StringUtils.isEmpty(elasticSearchConfig.getPassword())) {
-            return null;
+    public String stringifyKey(JsonNode jsonNode, List<String> fields) throws JsonProcessingException {
+        if (fields.size() == 1) {
+            JsonNode singleNode = jsonNode.get(fields.get(0));
+            String id = objectMapper.writeValueAsString(singleNode);
+            return (id.startsWith("\"") && id.endsWith("\""))
+                    ? id.substring(1, id.length() - 1)  // remove double quotes
+                    : id;
+        } else {
+            return JsonConverter.toJsonArray(jsonNode, fields).toString();
         }
-
-        credentialsProvider = new BasicCredentialsProvider();
-        credentialsProvider.setCredentials(AuthScope.ANY,
-                new UsernamePasswordCredentials(elasticSearchConfig.getUsername(),
-                        elasticSearchConfig.getPassword()));
-        return credentialsProvider;
     }
 
-    private RestHighLevelClient getClient() throws MalformedURLException {
-        if (client == null) {
-          CredentialsProvider cp = getCredentialsProvider();
-          RestClientBuilder builder = RestClient.builder(new HttpHost(getUrl().getHost(),
-                  getUrl().getPort(), getUrl().getProtocol()));
-
-          if (cp != null) {
-              builder.setHttpClientConfigCallback(httpClientBuilder ->
-              httpClientBuilder.setDefaultCredentialsProvider(cp));
-          }
-          client = new RestHighLevelClient(builder);
+    public String stringifyValue(Schema<?> schema, Object val) {
+        switch (schema.getSchemaInfo().getType()) {
+            case JSON:
+                try {
+                    GenericJsonRecord genericJsonRecord = (GenericJsonRecord) val;

Review comment:
       same as above

##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,254 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 /**
  * The base abstract class for ElasticSearch sinks.
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticsearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {
 
-    private URL url;
-    private RestHighLevelClient client;
-    private CredentialsProvider credentialsProvider;
-    private ElasticSearchConfig elasticSearchConfig;
+    private ElasticsearchConfig elasticSearchConfig;
+    private ElasticsearchClient elasticsearchClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        elasticSearchConfig = ElasticSearchConfig.load(config);
+        elasticSearchConfig = ElasticsearchConfig.load(config);
         elasticSearchConfig.validate();
-        createIndexIfNeeded();
+        elasticsearchClient = new ElasticsearchClient(elasticSearchConfig);
     }
 
     @Override
-    public void close() throws Exception {
-        client.close();
+    public void close() {
+        if (elasticsearchClient != null) {
+            elasticsearchClient.close();
+            elasticsearchClient = null;
+        }
     }
 
     @Override
-    public void write(Record<byte[]> record) {
-        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
-        IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName());
-        indexRequest.type(elasticSearchConfig.getTypeName());
-        indexRequest.source(keyValue.getValue(), XContentType.JSON);
-
-        try {
-        IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT);
-            if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
-                record.ack();
-            } else {
-                record.fail();
+    public void write(Record<GenericObject> record) throws Exception {
+        if (!elasticsearchClient.isFailed()) {
+            try {
+                Pair<String, String> idAndDoc = extractIdAndDocument(record);
+                if (idAndDoc.getRight() == null) {
+                    switch (elasticSearchConfig.getNullValueAction()) {
+                        case DELETE:
+                            if (elasticSearchConfig.isBulkEnabled()) {
+                                elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
+                            } else {
+                                elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
+                            }
+                            break;
+                        case IGNORE:
+                            break;
+                        case FAIL:
+                            elasticsearchClient.failed(new PulsarClientException.InvalidMessageException("Unexpected null message value"));
+                            throw elasticsearchClient.irrecoverableError.get();
+                    }
+                } else {
+                    if (elasticSearchConfig.isBulkEnabled()) {
+                        elasticsearchClient.bulkIndex(record, idAndDoc);
+                    } else {
+                        elasticsearchClient.indexDocument(record, idAndDoc);
+                    }
+                }
+            } catch (Exception e) {
+                log.error("write error:", e);
+                throw e;
             }
-        } catch (final IOException ex) {
-            record.fail();
         }
     }
 
-    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
-        String key = record.getKey().orElse("");
-        return new KeyValue<>(key, record.getValue());
+    @VisibleForTesting
+    ElasticsearchClient getElasticsearchClient() {
+        return this.elasticsearchClient;
     }
 
-    private void createIndexIfNeeded() throws IOException {
-        GetIndexRequest request = new GetIndexRequest();
-        request.indices(elasticSearchConfig.getIndexName());
-        boolean exists = getClient().indices().exists(request, RequestOptions.DEFAULT);
+    /**
+     * Extract ES _id and _source using the Schema if available.
+     *
+     * @param record
+     * @return A pair for _id and _source
+     */
+    public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) {
+        Object key = null;
+        Object value = null;
+        Schema<?> keySchema = null;
+        Schema<?> valueSchema = null;
+
+        if (record.getSchema() instanceof KeyValueSchema) {
+            KeyValueSchema<GenericObject,GenericObject> keyValueSchema = (KeyValueSchema) record.getSchema();
+            keySchema = keyValueSchema.getKeySchema();
+            valueSchema = keyValueSchema.getValueSchema();
+            if (KeyValueEncodingType.SEPARATED.equals(keyValueSchema.getKeyValueEncodingType()) && record.getKey().isPresent()) {
+                key = keySchema.decode(record.getKey().get().getBytes(StandardCharsets.UTF_8));
+            }
+            if (record.getValue() != null) {
+                value = ((KeyValue) record.getValue().getNativeObject()).getValue();
+                if (KeyValueEncodingType.INLINE.equals(keyValueSchema.getKeyValueEncodingType())) {
+                    key = ((KeyValue) record.getValue().getNativeObject()).getKey();
+                }
+            }
+        } else {
+            key = record.getKey().orElse(null);
+            valueSchema = record.getSchema();
+            value = record.getValue() == null ? null : record.getValue().getNativeObject();
+        }
 
-        if (!exists) {
-            CreateIndexRequest cireq = new CreateIndexRequest(elasticSearchConfig.getIndexName());
+        String id = key.toString();
+        if (keySchema != null) {
+            id = stringifyKey(keySchema, key);
+        }
 
-            cireq.settings(Settings.builder()
-               .put("index.number_of_shards", elasticSearchConfig.getIndexNumberOfShards())
-               .put("index.number_of_replicas", elasticSearchConfig.getIndexNumberOfReplicas()));
+        String doc = null;
+        if (value != null) {
+            if (valueSchema != null) {
+                doc = stringifyValue(valueSchema, value);
+            } else {
+                doc = value.toString();
+            }
+        }
 
-            CreateIndexResponse ciresp = getClient().indices().create(cireq, RequestOptions.DEFAULT);
-            if (!ciresp.isAcknowledged() || !ciresp.isShardsAcknowledged()) {
-                throw new RuntimeException("Unable to create index.");
+        if (elasticSearchConfig.isKeyIgnore()) {
+            if (doc == null || Strings.isNullOrEmpty(elasticSearchConfig.getPrimaryFields())) {
+                id = Hex.encodeHexString(record.getMessage().get().getMessageId().toByteArray());
+            } else {
+                try {
+                    // extract the PK from the JSON document
+                    JsonNode jsonNode = objectMapper.readTree(doc);
+                    List<String> pkFields = Arrays.asList(elasticSearchConfig.getPrimaryFields().split(","));
+                    id = stringifyKey(jsonNode, pkFields);
+                } catch (JsonProcessingException e) {
+                    log.error("Failed to read JSON", e);
+                }
             }
         }
+
+        SchemaType schemaType = null;
+        if (record.getSchema() != null && record.getSchema().getSchemaInfo() != null) {
+            schemaType = record.getSchema().getSchemaInfo().getType();
+        }
+        log.debug("recordType={} schemaType={} id={} doc={}",

Review comment:
       please add:
   `if (log.isDebugEnabled()....)`

##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,254 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 /**
  * The base abstract class for ElasticSearch sinks.
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticsearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {
 
-    private URL url;
-    private RestHighLevelClient client;
-    private CredentialsProvider credentialsProvider;
-    private ElasticSearchConfig elasticSearchConfig;
+    private ElasticsearchConfig elasticSearchConfig;
+    private ElasticsearchClient elasticsearchClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        elasticSearchConfig = ElasticSearchConfig.load(config);
+        elasticSearchConfig = ElasticsearchConfig.load(config);
         elasticSearchConfig.validate();
-        createIndexIfNeeded();
+        elasticsearchClient = new ElasticsearchClient(elasticSearchConfig);
     }
 
     @Override
-    public void close() throws Exception {
-        client.close();
+    public void close() {
+        if (elasticsearchClient != null) {
+            elasticsearchClient.close();
+            elasticsearchClient = null;
+        }
     }
 
     @Override
-    public void write(Record<byte[]> record) {
-        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
-        IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName());
-        indexRequest.type(elasticSearchConfig.getTypeName());
-        indexRequest.source(keyValue.getValue(), XContentType.JSON);
-
-        try {
-        IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT);
-            if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
-                record.ack();
-            } else {
-                record.fail();
+    public void write(Record<GenericObject> record) throws Exception {
+        if (!elasticsearchClient.isFailed()) {
+            try {
+                Pair<String, String> idAndDoc = extractIdAndDocument(record);
+                if (idAndDoc.getRight() == null) {
+                    switch (elasticSearchConfig.getNullValueAction()) {
+                        case DELETE:
+                            if (elasticSearchConfig.isBulkEnabled()) {
+                                elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
+                            } else {
+                                elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
+                            }
+                            break;
+                        case IGNORE:
+                            break;
+                        case FAIL:
+                            elasticsearchClient.failed(new PulsarClientException.InvalidMessageException("Unexpected null message value"));
+                            throw elasticsearchClient.irrecoverableError.get();
+                    }
+                } else {
+                    if (elasticSearchConfig.isBulkEnabled()) {
+                        elasticsearchClient.bulkIndex(record, idAndDoc);
+                    } else {
+                        elasticsearchClient.indexDocument(record, idAndDoc);
+                    }
+                }
+            } catch (Exception e) {
+                log.error("write error:", e);
+                throw e;
             }
-        } catch (final IOException ex) {
-            record.fail();
         }
     }
 
-    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
-        String key = record.getKey().orElse("");
-        return new KeyValue<>(key, record.getValue());
+    @VisibleForTesting
+    ElasticsearchClient getElasticsearchClient() {
+        return this.elasticsearchClient;
     }
 
-    private void createIndexIfNeeded() throws IOException {
-        GetIndexRequest request = new GetIndexRequest();
-        request.indices(elasticSearchConfig.getIndexName());
-        boolean exists = getClient().indices().exists(request, RequestOptions.DEFAULT);
+    /**
+     * Extract ES _id and _source using the Schema if available.
+     *
+     * @param record
+     * @return A pair for _id and _source
+     */
+    public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) {
+        Object key = null;
+        Object value = null;
+        Schema<?> keySchema = null;
+        Schema<?> valueSchema = null;
+
+        if (record.getSchema() instanceof KeyValueSchema) {
+            KeyValueSchema<GenericObject,GenericObject> keyValueSchema = (KeyValueSchema) record.getSchema();
+            keySchema = keyValueSchema.getKeySchema();
+            valueSchema = keyValueSchema.getValueSchema();
+            if (KeyValueEncodingType.SEPARATED.equals(keyValueSchema.getKeyValueEncodingType()) && record.getKey().isPresent()) {
+                key = keySchema.decode(record.getKey().get().getBytes(StandardCharsets.UTF_8));
+            }
+            if (record.getValue() != null) {
+                value = ((KeyValue) record.getValue().getNativeObject()).getValue();
+                if (KeyValueEncodingType.INLINE.equals(keyValueSchema.getKeyValueEncodingType())) {
+                    key = ((KeyValue) record.getValue().getNativeObject()).getKey();
+                }
+            }
+        } else {
+            key = record.getKey().orElse(null);
+            valueSchema = record.getSchema();
+            value = record.getValue() == null ? null : record.getValue().getNativeObject();
+        }
 
-        if (!exists) {
-            CreateIndexRequest cireq = new CreateIndexRequest(elasticSearchConfig.getIndexName());
+        String id = key.toString();
+        if (keySchema != null) {
+            id = stringifyKey(keySchema, key);
+        }
 
-            cireq.settings(Settings.builder()
-               .put("index.number_of_shards", elasticSearchConfig.getIndexNumberOfShards())
-               .put("index.number_of_replicas", elasticSearchConfig.getIndexNumberOfReplicas()));
+        String doc = null;
+        if (value != null) {
+            if (valueSchema != null) {
+                doc = stringifyValue(valueSchema, value);
+            } else {
+                doc = value.toString();
+            }
+        }
 
-            CreateIndexResponse ciresp = getClient().indices().create(cireq, RequestOptions.DEFAULT);
-            if (!ciresp.isAcknowledged() || !ciresp.isShardsAcknowledged()) {
-                throw new RuntimeException("Unable to create index.");
+        if (elasticSearchConfig.isKeyIgnore()) {
+            if (doc == null || Strings.isNullOrEmpty(elasticSearchConfig.getPrimaryFields())) {
+                id = Hex.encodeHexString(record.getMessage().get().getMessageId().toByteArray());
+            } else {
+                try {
+                    // extract the PK from the JSON document
+                    JsonNode jsonNode = objectMapper.readTree(doc);
+                    List<String> pkFields = Arrays.asList(elasticSearchConfig.getPrimaryFields().split(","));
+                    id = stringifyKey(jsonNode, pkFields);
+                } catch (JsonProcessingException e) {
+                    log.error("Failed to read JSON", e);

Review comment:
       isn't this a hard error ?
   if we are not able to process the key we may write garbage to ES

##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,254 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 /**
  * The base abstract class for ElasticSearch sinks.
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticsearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {
 
-    private URL url;
-    private RestHighLevelClient client;
-    private CredentialsProvider credentialsProvider;
-    private ElasticSearchConfig elasticSearchConfig;
+    private ElasticsearchConfig elasticSearchConfig;
+    private ElasticsearchClient elasticsearchClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        elasticSearchConfig = ElasticSearchConfig.load(config);
+        elasticSearchConfig = ElasticsearchConfig.load(config);
         elasticSearchConfig.validate();
-        createIndexIfNeeded();
+        elasticsearchClient = new ElasticsearchClient(elasticSearchConfig);
     }
 
     @Override
-    public void close() throws Exception {
-        client.close();
+    public void close() {
+        if (elasticsearchClient != null) {
+            elasticsearchClient.close();
+            elasticsearchClient = null;
+        }
     }
 
     @Override
-    public void write(Record<byte[]> record) {
-        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
-        IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName());
-        indexRequest.type(elasticSearchConfig.getTypeName());
-        indexRequest.source(keyValue.getValue(), XContentType.JSON);
-
-        try {
-        IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT);
-            if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
-                record.ack();
-            } else {
-                record.fail();
+    public void write(Record<GenericObject> record) throws Exception {
+        if (!elasticsearchClient.isFailed()) {
+            try {
+                Pair<String, String> idAndDoc = extractIdAndDocument(record);
+                if (idAndDoc.getRight() == null) {
+                    switch (elasticSearchConfig.getNullValueAction()) {
+                        case DELETE:
+                            if (elasticSearchConfig.isBulkEnabled()) {
+                                elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
+                            } else {
+                                elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
+                            }
+                            break;
+                        case IGNORE:
+                            break;
+                        case FAIL:
+                            elasticsearchClient.failed(new PulsarClientException.InvalidMessageException("Unexpected null message value"));
+                            throw elasticsearchClient.irrecoverableError.get();
+                    }
+                } else {
+                    if (elasticSearchConfig.isBulkEnabled()) {
+                        elasticsearchClient.bulkIndex(record, idAndDoc);
+                    } else {
+                        elasticsearchClient.indexDocument(record, idAndDoc);
+                    }
+                }
+            } catch (Exception e) {
+                log.error("write error:", e);
+                throw e;
             }
-        } catch (final IOException ex) {
-            record.fail();
         }
     }
 
-    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
-        String key = record.getKey().orElse("");
-        return new KeyValue<>(key, record.getValue());
+    @VisibleForTesting
+    ElasticsearchClient getElasticsearchClient() {
+        return this.elasticsearchClient;
     }
 
-    private void createIndexIfNeeded() throws IOException {
-        GetIndexRequest request = new GetIndexRequest();
-        request.indices(elasticSearchConfig.getIndexName());
-        boolean exists = getClient().indices().exists(request, RequestOptions.DEFAULT);
+    /**
+     * Extract ES _id and _source using the Schema if available.
+     *
+     * @param record
+     * @return A pair for _id and _source
+     */
+    public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) {
+        Object key = null;
+        Object value = null;
+        Schema<?> keySchema = null;
+        Schema<?> valueSchema = null;
+
+        if (record.getSchema() instanceof KeyValueSchema) {
+            KeyValueSchema<GenericObject,GenericObject> keyValueSchema = (KeyValueSchema) record.getSchema();
+            keySchema = keyValueSchema.getKeySchema();
+            valueSchema = keyValueSchema.getValueSchema();
+            if (KeyValueEncodingType.SEPARATED.equals(keyValueSchema.getKeyValueEncodingType()) && record.getKey().isPresent()) {
+                key = keySchema.decode(record.getKey().get().getBytes(StandardCharsets.UTF_8));
+            }
+            if (record.getValue() != null) {
+                value = ((KeyValue) record.getValue().getNativeObject()).getValue();
+                if (KeyValueEncodingType.INLINE.equals(keyValueSchema.getKeyValueEncodingType())) {
+                    key = ((KeyValue) record.getValue().getNativeObject()).getKey();

Review comment:
       the same here

##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,254 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 /**
  * The base abstract class for ElasticSearch sinks.
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticsearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {
 
-    private URL url;
-    private RestHighLevelClient client;
-    private CredentialsProvider credentialsProvider;
-    private ElasticSearchConfig elasticSearchConfig;
+    private ElasticsearchConfig elasticSearchConfig;
+    private ElasticsearchClient elasticsearchClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        elasticSearchConfig = ElasticSearchConfig.load(config);
+        elasticSearchConfig = ElasticsearchConfig.load(config);
         elasticSearchConfig.validate();
-        createIndexIfNeeded();
+        elasticsearchClient = new ElasticsearchClient(elasticSearchConfig);
     }
 
     @Override
-    public void close() throws Exception {
-        client.close();
+    public void close() {
+        if (elasticsearchClient != null) {
+            elasticsearchClient.close();
+            elasticsearchClient = null;
+        }
     }
 
     @Override
-    public void write(Record<byte[]> record) {
-        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
-        IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName());
-        indexRequest.type(elasticSearchConfig.getTypeName());
-        indexRequest.source(keyValue.getValue(), XContentType.JSON);
-
-        try {
-        IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT);
-            if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
-                record.ack();
-            } else {
-                record.fail();
+    public void write(Record<GenericObject> record) throws Exception {
+        if (!elasticsearchClient.isFailed()) {
+            try {
+                Pair<String, String> idAndDoc = extractIdAndDocument(record);
+                if (idAndDoc.getRight() == null) {
+                    switch (elasticSearchConfig.getNullValueAction()) {
+                        case DELETE:
+                            if (elasticSearchConfig.isBulkEnabled()) {
+                                elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
+                            } else {
+                                elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
+                            }
+                            break;
+                        case IGNORE:
+                            break;
+                        case FAIL:
+                            elasticsearchClient.failed(new PulsarClientException.InvalidMessageException("Unexpected null message value"));
+                            throw elasticsearchClient.irrecoverableError.get();
+                    }
+                } else {
+                    if (elasticSearchConfig.isBulkEnabled()) {
+                        elasticsearchClient.bulkIndex(record, idAndDoc);
+                    } else {
+                        elasticsearchClient.indexDocument(record, idAndDoc);
+                    }
+                }
+            } catch (Exception e) {
+                log.error("write error:", e);
+                throw e;
             }
-        } catch (final IOException ex) {
-            record.fail();
         }
     }
 
-    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
-        String key = record.getKey().orElse("");
-        return new KeyValue<>(key, record.getValue());
+    @VisibleForTesting
+    ElasticsearchClient getElasticsearchClient() {
+        return this.elasticsearchClient;
     }
 
-    private void createIndexIfNeeded() throws IOException {
-        GetIndexRequest request = new GetIndexRequest();
-        request.indices(elasticSearchConfig.getIndexName());
-        boolean exists = getClient().indices().exists(request, RequestOptions.DEFAULT);
+    /**
+     * Extract ES _id and _source using the Schema if available.
+     *
+     * @param record
+     * @return A pair for _id and _source
+     */
+    public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) {
+        Object key = null;
+        Object value = null;
+        Schema<?> keySchema = null;
+        Schema<?> valueSchema = null;
+
+        if (record.getSchema() instanceof KeyValueSchema) {
+            KeyValueSchema<GenericObject,GenericObject> keyValueSchema = (KeyValueSchema) record.getSchema();
+            keySchema = keyValueSchema.getKeySchema();
+            valueSchema = keyValueSchema.getValueSchema();
+            if (KeyValueEncodingType.SEPARATED.equals(keyValueSchema.getKeyValueEncodingType()) && record.getKey().isPresent()) {
+                key = keySchema.decode(record.getKey().get().getBytes(StandardCharsets.UTF_8));
+            }
+            if (record.getValue() != null) {
+                value = ((KeyValue) record.getValue().getNativeObject()).getValue();
+                if (KeyValueEncodingType.INLINE.equals(keyValueSchema.getKeyValueEncodingType())) {
+                    key = ((KeyValue) record.getValue().getNativeObject()).getKey();
+                }
+            }
+        } else {
+            key = record.getKey().orElse(null);
+            valueSchema = record.getSchema();
+            value = record.getValue() == null ? null : record.getValue().getNativeObject();
+        }
 
-        if (!exists) {
-            CreateIndexRequest cireq = new CreateIndexRequest(elasticSearchConfig.getIndexName());
+        String id = key.toString();
+        if (keySchema != null) {
+            id = stringifyKey(keySchema, key);
+        }
 
-            cireq.settings(Settings.builder()
-               .put("index.number_of_shards", elasticSearchConfig.getIndexNumberOfShards())
-               .put("index.number_of_replicas", elasticSearchConfig.getIndexNumberOfReplicas()));
+        String doc = null;
+        if (value != null) {
+            if (valueSchema != null) {
+                doc = stringifyValue(valueSchema, value);
+            } else {
+                doc = value.toString();
+            }
+        }
 
-            CreateIndexResponse ciresp = getClient().indices().create(cireq, RequestOptions.DEFAULT);
-            if (!ciresp.isAcknowledged() || !ciresp.isShardsAcknowledged()) {
-                throw new RuntimeException("Unable to create index.");
+        if (elasticSearchConfig.isKeyIgnore()) {
+            if (doc == null || Strings.isNullOrEmpty(elasticSearchConfig.getPrimaryFields())) {
+                id = Hex.encodeHexString(record.getMessage().get().getMessageId().toByteArray());
+            } else {
+                try {
+                    // extract the PK from the JSON document
+                    JsonNode jsonNode = objectMapper.readTree(doc);
+                    List<String> pkFields = Arrays.asList(elasticSearchConfig.getPrimaryFields().split(","));
+                    id = stringifyKey(jsonNode, pkFields);
+                } catch (JsonProcessingException e) {
+                    log.error("Failed to read JSON", e);
+                }
             }
         }
+
+        SchemaType schemaType = null;
+        if (record.getSchema() != null && record.getSchema().getSchemaInfo() != null) {
+            schemaType = record.getSchema().getSchemaInfo().getType();
+        }
+        log.debug("recordType={} schemaType={} id={} doc={}",
+                record.getClass().getName(),
+                schemaType,
+                id,
+                doc);
+        return Pair.of(id, doc);
     }
 
-    private URL getUrl() throws MalformedURLException {
-        if (url == null) {
-            url = new URL(elasticSearchConfig.getElasticSearchUrl());
+    public String stringifyKey(Schema<?> schema, Object val) {
+        switch (schema.getSchemaInfo().getType()) {
+            case INT8:
+                return Byte.toString((Byte) val);
+            case INT16:
+                return Short.toString((Short) val);
+            case INT32:
+                return Integer.toString((Integer) val);
+            case INT64:
+                return Long.toString((Long) val);
+            case STRING:
+                return (String) val;
+            case JSON:
+                try {
+                    GenericJsonRecord genericJsonRecord = (GenericJsonRecord) val;
+                    return stringifyKey(genericJsonRecord.getJsonNode());
+                } catch (JsonProcessingException e) {
+                    log.error("Failed to convert JSON to a JSON string", e);
+                    throw new RuntimeException(e);
+                }
+            case AVRO:
+                try {
+                    GenericAvroRecord genericAvroRecord = (GenericAvroRecord) val;
+                    JsonNode jsonNode = JsonConverter.toJson(genericAvroRecord.getAvroRecord());

Review comment:
       `o.a.avro.GenericRecord node = (o.a.avro.GenericRecord) (GenericRecord) val.getNativeObject());`

##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticsearchClient.java
##########
@@ -0,0 +1,565 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.elasticsearch;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
+import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.nio.conn.NHttpClientConnectionManager;
+import org.apache.http.nio.conn.NoopIOSessionStrategy;
+import org.apache.http.nio.conn.SchemeIOSessionStrategy;
+import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
+import org.apache.http.nio.reactor.ConnectingIOReactor;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.functions.api.Record;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.client.*;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.CreateIndexResponse;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Slf4j
+public class ElasticsearchClient {
+
+    static final String[] malformedErrors = {
+            "mapper_parsing_exception",
+            "action_request_validation_exception",
+            "illegal_argument_exception"
+    };
+
+    private ElasticsearchConfig config;
+    private ConfigCallback configCallback;
+    private RestHighLevelClient client;
+
+    final Set<String> indexCache = new HashSet<>();
+    final Map<String, String> topicToIndexCache = new HashMap<>();
+
+    final RandomExponentialRetry backoffRetry;
+    final BulkProcessor bulkProcessor;
+    final ConcurrentMap<DocWriteRequest<?>, Record> records = new ConcurrentHashMap<>();
+    final AtomicReference<Exception> irrecoverableError = new AtomicReference<>();
+    final ScheduledExecutorService executorService;
+
+    ElasticsearchClient(ElasticsearchConfig elasticSearchConfig) throws MalformedURLException {
+        this.config = elasticSearchConfig;
+        this.configCallback = new ConfigCallback();
+        this.backoffRetry = new RandomExponentialRetry(elasticSearchConfig.getMaxRetryTimeInSec());
+        if (config.isBulkEnabled() == false) {
+            bulkProcessor = null;
+        } else {
+            BulkProcessor.Builder builder = BulkProcessor.builder(
+                    (bulkRequest, bulkResponseActionListener) -> client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener),
+                    new BulkProcessor.Listener() {
+                        @Override
+                        public void beforeBulk(long l, BulkRequest bulkRequest) {
+                        }
+
+                        @Override
+                        public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
+                            log.trace("Bulk request id={} size={}:", l, bulkRequest.requests().size());
+                            for (int i = 0; i < bulkResponse.getItems().length; i++) {
+                                DocWriteRequest<?> request = bulkRequest.requests().get(i);
+                                Record record = records.get(request);
+                                BulkItemResponse bulkItemResponse = bulkResponse.getItems()[i];
+                                if (bulkItemResponse.isFailed()) {
+                                    record.fail();
+                                    try {
+                                        hasIrrecoverableError(bulkItemResponse);
+                                    } catch(Exception e) {
+                                        log.warn("Unrecoverable error:", e);
+                                    }
+                                } else {
+                                    record.ack();
+                                }
+                                records.remove(request);
+                            }
+                        }
+
+                        @Override
+                        public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
+                            log.warn("Bulk request id={} failed:", l, throwable);
+                            for (DocWriteRequest<?> request : bulkRequest.requests()) {
+                                Record record = records.remove(request);
+                                record.fail();
+                            }
+                        }
+                    }
+            )
+                    .setBulkActions(config.getBulkActions())
+                    .setBulkSize(new ByteSizeValue(config.getBulkSizeInMb(), ByteSizeUnit.MB))
+                    .setConcurrentRequests(config.getBulkConcurrentRequests())
+                    .setBackoffPolicy(new RandomExponentialBackoffPolicy(backoffRetry,
+                            config.getRetryBackoffInMs(),
+                            config.getMaxRetries()
+                    ));
+            if (config.getBulkFlushIntervalInMs() > 0) {
+                builder.setFlushInterval(new TimeValue(config.getBulkFlushIntervalInMs(), TimeUnit.MILLISECONDS));
+            }
+            this.bulkProcessor = builder.build();
+        }
+
+        // idle+expired connection evictor thread
+        this.executorService = Executors.newSingleThreadScheduledExecutor();
+        this.executorService.scheduleAtFixedRate(new Runnable() {
+                                                     @Override
+                                                     public void run() {
+                                                         configCallback.connectionManager.closeExpiredConnections();
+                                                         configCallback.connectionManager.closeIdleConnections(
+                                                                 config.getConnectionIdleTimeoutInMs(), TimeUnit.MILLISECONDS);
+                                                     }
+                                                 },
+                config.getConnectionIdleTimeoutInMs(),
+                config.getConnectionIdleTimeoutInMs(),
+                TimeUnit.MILLISECONDS
+        );
+
+        URL url = new URL(config.getElasticSearchUrl());
+        RestClientBuilder builder = RestClient.builder(new HttpHost(url.getHost(), url.getPort(), url.getProtocol()))
+                .setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
+                    @Override
+                    public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
+                        return builder
+                                .setContentCompressionEnabled(config.isCompressionEnabled())
+                                .setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs())
+                                .setConnectTimeout(config.getConnectTimeoutInMs())
+                                .setSocketTimeout(config.getSocketTimeoutInMs());
+                    }
+                })
+                .setHttpClientConfigCallback(this.configCallback)
+                .setFailureListener(new RestClient.FailureListener() {
+                    public void onFailure(Node node) {
+                        log.warn("Node host={} failed", node.getHost());
+                    }
+                });
+        this.client = new RestHighLevelClient(builder);
+    }
+
+    void failed(Exception e) throws Exception {
+        if (irrecoverableError.compareAndSet(null, e)) {
+            log.error("Irrecoverable error:", e);
+        }
+    }
+
+    boolean isFailed() {
+        return irrecoverableError.get() != null;
+    }
+
+    void hasIrrecoverableError(BulkItemResponse bulkItemResponse) throws Exception {
+        for (String error : malformedErrors) {
+            if (bulkItemResponse.getFailureMessage().contains(error)) {
+                switch (config.getMalformedDocAction()) {
+                    case IGNORE:
+                        break;
+                    case WARN:
+                        log.warn("Ignoring malformed document index={} id={}",
+                                bulkItemResponse.getIndex(),
+                                bulkItemResponse.getId(),
+                                bulkItemResponse.getFailure().getCause());
+                        break;
+                    case FAIL:
+                        log.error("Failure due to the malformed document index={} id={}",
+                                bulkItemResponse.getIndex(),
+                                bulkItemResponse.getId(),
+                                bulkItemResponse.getFailure().getCause());
+                        failed(bulkItemResponse.getFailure().getCause());
+                        break;
+                }
+            }
+        }
+    }
+
+    public void bulkIndex(Record record, Pair<String, String> idAndDoc) throws Exception {
+        try {
+            checkNotFailed();
+            checkIndexExists(record.getTopicName());
+            IndexRequest indexRequest = Requests.indexRequest(config.getIndexName());
+            indexRequest.id(idAndDoc.getLeft());
+            indexRequest.type(config.getTypeName());
+            indexRequest.source(idAndDoc.getRight(), XContentType.JSON);
+
+            records.put(indexRequest, record);
+            bulkProcessor.add(indexRequest);
+        } catch(Exception e) {
+            log.debug("index failed id=" + idAndDoc.getLeft(), e);
+            record.fail();
+            throw e;
+        }
+    }
+
+    /**
+     * Index an elasticsearch document and ack the record.
+     * @param record
+     * @param idAndDoc
+     * @return
+     * @throws Exception
+     */
+    public boolean indexDocument(Record<GenericObject> record, Pair<String, String> idAndDoc) throws Exception {
+        try {
+            checkNotFailed();
+            checkIndexExists(record.getTopicName());
+            IndexRequest indexRequest = Requests.indexRequest(config.getIndexName());
+            indexRequest.id(idAndDoc.getLeft());
+            indexRequest.type(config.getTypeName());
+            indexRequest.source(idAndDoc.getRight(), XContentType.JSON);
+            IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
+            log.debug("index id={} result={}", idAndDoc.getLeft(), indexResponse.getResult());
+            if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED) ||
+                    indexResponse.getResult().equals(DocWriteResponse.Result.UPDATED)) {
+                record.ack();
+                return true;
+            } else {
+                record.fail();
+                return false;
+            }
+        } catch (final Exception ex) {
+            log.debug("index failed id=" + idAndDoc.getLeft(), ex);
+            record.fail();
+            throw ex;
+        }
+    }
+
+    public void bulkDelete(Record<GenericObject> record, String id) throws Exception {
+        try {
+            checkNotFailed();
+            checkIndexExists(record.getTopicName());
+            DeleteRequest deleteRequest = Requests.deleteRequest(config.getIndexName());
+            deleteRequest.id(id);
+            deleteRequest.type(config.getTypeName());
+
+            records.put(deleteRequest, record);
+            bulkProcessor.add(deleteRequest);
+        } catch(Exception e) {
+            log.debug("delete failed id=" + id, e);
+            record.fail();
+            throw e;
+        }
+    }
+
+    /**
+     * Delete an elasticsearch document and ack the record.
+     * @param record
+     * @param id
+     * @return
+     * @throws IOException
+     */
+    public boolean deleteDocument(Record<GenericObject> record, String id) throws Exception {
+        try {
+            checkNotFailed();
+            checkIndexExists(record.getTopicName());
+            DeleteRequest deleteRequest = Requests.deleteRequest(config.getIndexName());
+            deleteRequest.id(id);
+            deleteRequest.type(config.getTypeName());
+            DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);
+            log.debug("delete result=" + deleteResponse.getResult());
+            if (deleteResponse.getResult().equals(DocWriteResponse.Result.DELETED) ||
+                    deleteResponse.getResult().equals(DocWriteResponse.Result.NOT_FOUND)) {
+                record.ack();
+                return true;
+            }
+            record.fail();
+            return false;
+        } catch (final Exception ex) {
+            log.debug("index failed id=" + id, ex);
+            record.fail();
+            throw ex;
+        }
+    }
+
+    /**
+     * Flushes the bulk processor.
+     */
+    public void flush() {
+        bulkProcessor.flush();
+    }
+
+    public void close() {
+        try {
+            if (bulkProcessor != null) {
+                bulkProcessor.awaitClose(5000L, TimeUnit.MILLISECONDS);
+            }
+        } catch (InterruptedException e) {
+            log.warn("Elasticsearch bulk processor close error:", e);
+        }
+        try {
+            this.executorService.shutdown();
+            if (this.client != null) {
+                this.client.close();
+            }
+        } catch (IOException e) {
+            log.warn("Elasticsearch client close error:", e);
+        }
+    }
+
+    private void checkNotFailed() throws Exception {
+        if (irrecoverableError.get() != null) {
+            throw irrecoverableError.get();
+        }
+    }
+
+    private void checkIndexExists(Optional<String> topicName) throws IOException {
+        String indexName = indexName(topicName);
+        if (!indexCache.contains(indexName)) {
+            synchronized (this) {
+                if (!indexCache.contains(indexName)) {
+                    createIndexIfNeeded(indexName);
+                    indexCache.add(indexName);
+                }
+            }
+        }
+    }
+
+    private String indexName(Optional<String> topicName) throws IOException {
+        if (config.getIndexName() != null) {
+            // Use the configured indexName if provided.
+            return config.getIndexName();
+        }
+        if (!topicName.isPresent()) {
+            throw new IOException("Elasticsearch index name configuration and topic name are empty");
+        }
+        return topicToIndexName(topicName.get());
+    }
+
+    @VisibleForTesting
+    public String topicToIndexName(String topicName) {
+        return topicToIndexCache.computeIfAbsent(topicName, k -> {
+            // see elasticsearch limitations https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html#indices-create-api-path-params
+            String indexName = topicName.toLowerCase(Locale.ROOT);
+
+            // truncate to the max bytes length
+            while (indexName.getBytes(StandardCharsets.UTF_8).length > 255) {
+                indexName = indexName.substring(0, indexName.length() - 1);
+            }
+            if (indexName.length() <= 0 || !indexName.matches("[a-zA-Z\\.0-9][a-zA-Z_\\.\\-\\+0-9]*")) {
+                throw new RuntimeException(new IOException("Cannot convert the topic name='" + topicName + "' to a valid elasticsearch index name"));
+            }
+            log.debug("Translate topic={} to index={}", k, indexName);

Review comment:
       please use `if (log.isDebugEnabled()) `here and in other points




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] vroyer commented on a change in pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

Posted by GitBox <gi...@apache.org>.
vroyer commented on a change in pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613#discussion_r635410530



##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,254 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 /**
  * The base abstract class for ElasticSearch sinks.
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticsearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {
 
-    private URL url;
-    private RestHighLevelClient client;
-    private CredentialsProvider credentialsProvider;
-    private ElasticSearchConfig elasticSearchConfig;
+    private ElasticsearchConfig elasticSearchConfig;
+    private ElasticsearchClient elasticsearchClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        elasticSearchConfig = ElasticSearchConfig.load(config);
+        elasticSearchConfig = ElasticsearchConfig.load(config);
         elasticSearchConfig.validate();
-        createIndexIfNeeded();
+        elasticsearchClient = new ElasticsearchClient(elasticSearchConfig);
     }
 
     @Override
-    public void close() throws Exception {
-        client.close();
+    public void close() {
+        if (elasticsearchClient != null) {
+            elasticsearchClient.close();
+            elasticsearchClient = null;
+        }
     }
 
     @Override
-    public void write(Record<byte[]> record) {
-        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
-        IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName());
-        indexRequest.type(elasticSearchConfig.getTypeName());
-        indexRequest.source(keyValue.getValue(), XContentType.JSON);
-
-        try {
-        IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT);
-            if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
-                record.ack();
-            } else {
-                record.fail();
+    public void write(Record<GenericObject> record) throws Exception {
+        if (!elasticsearchClient.isFailed()) {
+            try {
+                Pair<String, String> idAndDoc = extractIdAndDocument(record);
+                if (idAndDoc.getRight() == null) {
+                    switch (elasticSearchConfig.getNullValueAction()) {
+                        case DELETE:
+                            if (elasticSearchConfig.isBulkEnabled()) {
+                                elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
+                            } else {
+                                elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
+                            }
+                            break;
+                        case IGNORE:
+                            break;
+                        case FAIL:
+                            elasticsearchClient.failed(new PulsarClientException.InvalidMessageException("Unexpected null message value"));
+                            throw elasticsearchClient.irrecoverableError.get();
+                    }
+                } else {
+                    if (elasticSearchConfig.isBulkEnabled()) {
+                        elasticsearchClient.bulkIndex(record, idAndDoc);
+                    } else {
+                        elasticsearchClient.indexDocument(record, idAndDoc);
+                    }
+                }
+            } catch (Exception e) {
+                log.error("write error:", e);
+                throw e;
             }
-        } catch (final IOException ex) {
-            record.fail();
         }
     }
 
-    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
-        String key = record.getKey().orElse("");
-        return new KeyValue<>(key, record.getValue());
+    @VisibleForTesting
+    ElasticsearchClient getElasticsearchClient() {
+        return this.elasticsearchClient;
     }
 
-    private void createIndexIfNeeded() throws IOException {
-        GetIndexRequest request = new GetIndexRequest();
-        request.indices(elasticSearchConfig.getIndexName());
-        boolean exists = getClient().indices().exists(request, RequestOptions.DEFAULT);
+    /**
+     * Extract ES _id and _source using the Schema if available.
+     *
+     * @param record
+     * @return A pair for _id and _source
+     */
+    public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) {
+        Object key = null;
+        Object value = null;
+        Schema<?> keySchema = null;
+        Schema<?> valueSchema = null;
+
+        if (record.getSchema() instanceof KeyValueSchema) {
+            KeyValueSchema<GenericObject,GenericObject> keyValueSchema = (KeyValueSchema) record.getSchema();
+            keySchema = keyValueSchema.getKeySchema();
+            valueSchema = keyValueSchema.getValueSchema();
+            if (KeyValueEncodingType.SEPARATED.equals(keyValueSchema.getKeyValueEncodingType()) && record.getKey().isPresent()) {
+                key = keySchema.decode(record.getKey().get().getBytes(StandardCharsets.UTF_8));
+            }
+            if (record.getValue() != null) {
+                value = ((KeyValue) record.getValue().getNativeObject()).getValue();
+                if (KeyValueEncodingType.INLINE.equals(keyValueSchema.getKeyValueEncodingType())) {
+                    key = ((KeyValue) record.getValue().getNativeObject()).getKey();
+                }
+            }
+        } else {
+            key = record.getKey().orElse(null);
+            valueSchema = record.getSchema();
+            value = record.getValue() == null ? null : record.getValue().getNativeObject();
+        }
 
-        if (!exists) {
-            CreateIndexRequest cireq = new CreateIndexRequest(elasticSearchConfig.getIndexName());
+        String id = key.toString();
+        if (keySchema != null) {
+            id = stringifyKey(keySchema, key);
+        }
 
-            cireq.settings(Settings.builder()
-               .put("index.number_of_shards", elasticSearchConfig.getIndexNumberOfShards())
-               .put("index.number_of_replicas", elasticSearchConfig.getIndexNumberOfReplicas()));
+        String doc = null;
+        if (value != null) {
+            if (valueSchema != null) {
+                doc = stringifyValue(valueSchema, value);
+            } else {
+                doc = value.toString();
+            }
+        }
 
-            CreateIndexResponse ciresp = getClient().indices().create(cireq, RequestOptions.DEFAULT);
-            if (!ciresp.isAcknowledged() || !ciresp.isShardsAcknowledged()) {
-                throw new RuntimeException("Unable to create index.");
+        if (elasticSearchConfig.isKeyIgnore()) {
+            if (doc == null || Strings.isNullOrEmpty(elasticSearchConfig.getPrimaryFields())) {
+                id = Hex.encodeHexString(record.getMessage().get().getMessageId().toByteArray());
+            } else {
+                try {
+                    // extract the PK from the JSON document
+                    JsonNode jsonNode = objectMapper.readTree(doc);
+                    List<String> pkFields = Arrays.asList(elasticSearchConfig.getPrimaryFields().split(","));
+                    id = stringifyKey(jsonNode, pkFields);
+                } catch (JsonProcessingException e) {
+                    log.error("Failed to read JSON", e);
+                }
             }
         }
+
+        SchemaType schemaType = null;
+        if (record.getSchema() != null && record.getSchema().getSchemaInfo() != null) {
+            schemaType = record.getSchema().getSchemaInfo().getType();
+        }
+        log.debug("recordType={} schemaType={} id={} doc={}",

Review comment:
       Awesome ! 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] vroyer commented on a change in pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

Posted by GitBox <gi...@apache.org>.
vroyer commented on a change in pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613#discussion_r635327270



##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,254 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 /**
  * The base abstract class for ElasticSearch sinks.
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticsearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {
 
-    private URL url;
-    private RestHighLevelClient client;
-    private CredentialsProvider credentialsProvider;
-    private ElasticSearchConfig elasticSearchConfig;
+    private ElasticsearchConfig elasticSearchConfig;
+    private ElasticsearchClient elasticsearchClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        elasticSearchConfig = ElasticSearchConfig.load(config);
+        elasticSearchConfig = ElasticsearchConfig.load(config);
         elasticSearchConfig.validate();
-        createIndexIfNeeded();
+        elasticsearchClient = new ElasticsearchClient(elasticSearchConfig);
     }
 
     @Override
-    public void close() throws Exception {
-        client.close();
+    public void close() {
+        if (elasticsearchClient != null) {
+            elasticsearchClient.close();
+            elasticsearchClient = null;
+        }
     }
 
     @Override
-    public void write(Record<byte[]> record) {
-        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
-        IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName());
-        indexRequest.type(elasticSearchConfig.getTypeName());
-        indexRequest.source(keyValue.getValue(), XContentType.JSON);
-
-        try {
-        IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT);
-            if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
-                record.ack();
-            } else {
-                record.fail();
+    public void write(Record<GenericObject> record) throws Exception {
+        if (!elasticsearchClient.isFailed()) {
+            try {
+                Pair<String, String> idAndDoc = extractIdAndDocument(record);
+                if (idAndDoc.getRight() == null) {
+                    switch (elasticSearchConfig.getNullValueAction()) {
+                        case DELETE:
+                            if (elasticSearchConfig.isBulkEnabled()) {
+                                elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
+                            } else {
+                                elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
+                            }
+                            break;
+                        case IGNORE:
+                            break;
+                        case FAIL:
+                            elasticsearchClient.failed(new PulsarClientException.InvalidMessageException("Unexpected null message value"));
+                            throw elasticsearchClient.irrecoverableError.get();
+                    }
+                } else {
+                    if (elasticSearchConfig.isBulkEnabled()) {
+                        elasticsearchClient.bulkIndex(record, idAndDoc);
+                    } else {
+                        elasticsearchClient.indexDocument(record, idAndDoc);
+                    }
+                }
+            } catch (Exception e) {
+                log.error("write error:", e);
+                throw e;
             }
-        } catch (final IOException ex) {
-            record.fail();
         }
     }
 
-    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
-        String key = record.getKey().orElse("");
-        return new KeyValue<>(key, record.getValue());
+    @VisibleForTesting
+    ElasticsearchClient getElasticsearchClient() {
+        return this.elasticsearchClient;
     }
 
-    private void createIndexIfNeeded() throws IOException {
-        GetIndexRequest request = new GetIndexRequest();
-        request.indices(elasticSearchConfig.getIndexName());
-        boolean exists = getClient().indices().exists(request, RequestOptions.DEFAULT);
+    /**
+     * Extract ES _id and _source using the Schema if available.
+     *
+     * @param record
+     * @return A pair for _id and _source
+     */
+    public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) {
+        Object key = null;
+        Object value = null;
+        Schema<?> keySchema = null;
+        Schema<?> valueSchema = null;
+
+        if (record.getSchema() instanceof KeyValueSchema) {
+            KeyValueSchema<GenericObject,GenericObject> keyValueSchema = (KeyValueSchema) record.getSchema();
+            keySchema = keyValueSchema.getKeySchema();
+            valueSchema = keyValueSchema.getValueSchema();
+            if (KeyValueEncodingType.SEPARATED.equals(keyValueSchema.getKeyValueEncodingType()) && record.getKey().isPresent()) {
+                key = keySchema.decode(record.getKey().get().getBytes(StandardCharsets.UTF_8));
+            }
+            if (record.getValue() != null) {
+                value = ((KeyValue) record.getValue().getNativeObject()).getValue();
+                if (KeyValueEncodingType.INLINE.equals(keyValueSchema.getKeyValueEncodingType())) {
+                    key = ((KeyValue) record.getValue().getNativeObject()).getKey();

Review comment:
       Yes, fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] vroyer commented on a change in pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

Posted by GitBox <gi...@apache.org>.
vroyer commented on a change in pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613#discussion_r635329745



##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,254 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 /**
  * The base abstract class for ElasticSearch sinks.
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticsearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {
 
-    private URL url;
-    private RestHighLevelClient client;
-    private CredentialsProvider credentialsProvider;
-    private ElasticSearchConfig elasticSearchConfig;
+    private ElasticsearchConfig elasticSearchConfig;
+    private ElasticsearchClient elasticsearchClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        elasticSearchConfig = ElasticSearchConfig.load(config);
+        elasticSearchConfig = ElasticsearchConfig.load(config);
         elasticSearchConfig.validate();
-        createIndexIfNeeded();
+        elasticsearchClient = new ElasticsearchClient(elasticSearchConfig);
     }
 
     @Override
-    public void close() throws Exception {
-        client.close();
+    public void close() {
+        if (elasticsearchClient != null) {
+            elasticsearchClient.close();
+            elasticsearchClient = null;
+        }
     }
 
     @Override
-    public void write(Record<byte[]> record) {
-        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
-        IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName());
-        indexRequest.type(elasticSearchConfig.getTypeName());
-        indexRequest.source(keyValue.getValue(), XContentType.JSON);
-
-        try {
-        IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT);
-            if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
-                record.ack();
-            } else {
-                record.fail();
+    public void write(Record<GenericObject> record) throws Exception {
+        if (!elasticsearchClient.isFailed()) {
+            try {
+                Pair<String, String> idAndDoc = extractIdAndDocument(record);
+                if (idAndDoc.getRight() == null) {
+                    switch (elasticSearchConfig.getNullValueAction()) {
+                        case DELETE:
+                            if (elasticSearchConfig.isBulkEnabled()) {
+                                elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
+                            } else {
+                                elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
+                            }
+                            break;
+                        case IGNORE:
+                            break;
+                        case FAIL:
+                            elasticsearchClient.failed(new PulsarClientException.InvalidMessageException("Unexpected null message value"));
+                            throw elasticsearchClient.irrecoverableError.get();
+                    }
+                } else {
+                    if (elasticSearchConfig.isBulkEnabled()) {
+                        elasticsearchClient.bulkIndex(record, idAndDoc);
+                    } else {
+                        elasticsearchClient.indexDocument(record, idAndDoc);
+                    }
+                }
+            } catch (Exception e) {
+                log.error("write error:", e);
+                throw e;
             }
-        } catch (final IOException ex) {
-            record.fail();
         }
     }
 
-    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
-        String key = record.getKey().orElse("");
-        return new KeyValue<>(key, record.getValue());
+    @VisibleForTesting
+    ElasticsearchClient getElasticsearchClient() {
+        return this.elasticsearchClient;
     }
 
-    private void createIndexIfNeeded() throws IOException {
-        GetIndexRequest request = new GetIndexRequest();
-        request.indices(elasticSearchConfig.getIndexName());
-        boolean exists = getClient().indices().exists(request, RequestOptions.DEFAULT);
+    /**
+     * Extract ES _id and _source using the Schema if available.
+     *
+     * @param record
+     * @return A pair for _id and _source
+     */
+    public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) {
+        Object key = null;
+        Object value = null;
+        Schema<?> keySchema = null;
+        Schema<?> valueSchema = null;
+
+        if (record.getSchema() instanceof KeyValueSchema) {
+            KeyValueSchema<GenericObject,GenericObject> keyValueSchema = (KeyValueSchema) record.getSchema();
+            keySchema = keyValueSchema.getKeySchema();
+            valueSchema = keyValueSchema.getValueSchema();
+            if (KeyValueEncodingType.SEPARATED.equals(keyValueSchema.getKeyValueEncodingType()) && record.getKey().isPresent()) {

Review comment:
       Yes, fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613#issuecomment-876316736


   This PR is superseded by #11263 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] vroyer commented on a change in pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

Posted by GitBox <gi...@apache.org>.
vroyer commented on a change in pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613#discussion_r635414435



##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,254 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 /**
  * The base abstract class for ElasticSearch sinks.
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticsearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {
 
-    private URL url;
-    private RestHighLevelClient client;
-    private CredentialsProvider credentialsProvider;
-    private ElasticSearchConfig elasticSearchConfig;
+    private ElasticsearchConfig elasticSearchConfig;
+    private ElasticsearchClient elasticsearchClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        elasticSearchConfig = ElasticSearchConfig.load(config);
+        elasticSearchConfig = ElasticsearchConfig.load(config);
         elasticSearchConfig.validate();
-        createIndexIfNeeded();
+        elasticsearchClient = new ElasticsearchClient(elasticSearchConfig);
     }
 
     @Override
-    public void close() throws Exception {
-        client.close();
+    public void close() {
+        if (elasticsearchClient != null) {
+            elasticsearchClient.close();
+            elasticsearchClient = null;
+        }
     }
 
     @Override
-    public void write(Record<byte[]> record) {
-        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
-        IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName());
-        indexRequest.type(elasticSearchConfig.getTypeName());
-        indexRequest.source(keyValue.getValue(), XContentType.JSON);
-
-        try {
-        IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT);
-            if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
-                record.ack();
-            } else {
-                record.fail();
+    public void write(Record<GenericObject> record) throws Exception {
+        if (!elasticsearchClient.isFailed()) {
+            try {
+                Pair<String, String> idAndDoc = extractIdAndDocument(record);
+                if (idAndDoc.getRight() == null) {
+                    switch (elasticSearchConfig.getNullValueAction()) {
+                        case DELETE:
+                            if (elasticSearchConfig.isBulkEnabled()) {
+                                elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
+                            } else {
+                                elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
+                            }
+                            break;
+                        case IGNORE:
+                            break;
+                        case FAIL:
+                            elasticsearchClient.failed(new PulsarClientException.InvalidMessageException("Unexpected null message value"));
+                            throw elasticsearchClient.irrecoverableError.get();
+                    }
+                } else {
+                    if (elasticSearchConfig.isBulkEnabled()) {
+                        elasticsearchClient.bulkIndex(record, idAndDoc);
+                    } else {
+                        elasticsearchClient.indexDocument(record, idAndDoc);
+                    }
+                }
+            } catch (Exception e) {
+                log.error("write error:", e);
+                throw e;
             }
-        } catch (final IOException ex) {
-            record.fail();
         }
     }
 
-    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
-        String key = record.getKey().orElse("");
-        return new KeyValue<>(key, record.getValue());
+    @VisibleForTesting
+    ElasticsearchClient getElasticsearchClient() {
+        return this.elasticsearchClient;
     }
 
-    private void createIndexIfNeeded() throws IOException {
-        GetIndexRequest request = new GetIndexRequest();
-        request.indices(elasticSearchConfig.getIndexName());
-        boolean exists = getClient().indices().exists(request, RequestOptions.DEFAULT);
+    /**
+     * Extract ES _id and _source using the Schema if available.
+     *
+     * @param record
+     * @return A pair for _id and _source
+     */
+    public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) {
+        Object key = null;
+        Object value = null;
+        Schema<?> keySchema = null;
+        Schema<?> valueSchema = null;
+
+        if (record.getSchema() instanceof KeyValueSchema) {
+            KeyValueSchema<GenericObject,GenericObject> keyValueSchema = (KeyValueSchema) record.getSchema();
+            keySchema = keyValueSchema.getKeySchema();
+            valueSchema = keyValueSchema.getValueSchema();
+            if (KeyValueEncodingType.SEPARATED.equals(keyValueSchema.getKeyValueEncodingType()) && record.getKey().isPresent()) {
+                key = keySchema.decode(record.getKey().get().getBytes(StandardCharsets.UTF_8));
+            }
+            if (record.getValue() != null) {
+                value = ((KeyValue) record.getValue().getNativeObject()).getValue();
+                if (KeyValueEncodingType.INLINE.equals(keyValueSchema.getKeyValueEncodingType())) {
+                    key = ((KeyValue) record.getValue().getNativeObject()).getKey();
+                }
+            }
+        } else {
+            key = record.getKey().orElse(null);
+            valueSchema = record.getSchema();
+            value = record.getValue() == null ? null : record.getValue().getNativeObject();
+        }
 
-        if (!exists) {
-            CreateIndexRequest cireq = new CreateIndexRequest(elasticSearchConfig.getIndexName());
+        String id = key.toString();
+        if (keySchema != null) {
+            id = stringifyKey(keySchema, key);
+        }
 
-            cireq.settings(Settings.builder()
-               .put("index.number_of_shards", elasticSearchConfig.getIndexNumberOfShards())
-               .put("index.number_of_replicas", elasticSearchConfig.getIndexNumberOfReplicas()));
+        String doc = null;
+        if (value != null) {
+            if (valueSchema != null) {
+                doc = stringifyValue(valueSchema, value);
+            } else {
+                doc = value.toString();
+            }
+        }
 
-            CreateIndexResponse ciresp = getClient().indices().create(cireq, RequestOptions.DEFAULT);
-            if (!ciresp.isAcknowledged() || !ciresp.isShardsAcknowledged()) {
-                throw new RuntimeException("Unable to create index.");
+        if (elasticSearchConfig.isKeyIgnore()) {
+            if (doc == null || Strings.isNullOrEmpty(elasticSearchConfig.getPrimaryFields())) {
+                id = Hex.encodeHexString(record.getMessage().get().getMessageId().toByteArray());
+            } else {
+                try {
+                    // extract the PK from the JSON document
+                    JsonNode jsonNode = objectMapper.readTree(doc);
+                    List<String> pkFields = Arrays.asList(elasticSearchConfig.getPrimaryFields().split(","));
+                    id = stringifyKey(jsonNode, pkFields);
+                } catch (JsonProcessingException e) {
+                    log.error("Failed to read JSON", e);
+                }
             }
         }
+
+        SchemaType schemaType = null;
+        if (record.getSchema() != null && record.getSchema().getSchemaInfo() != null) {
+            schemaType = record.getSchema().getSchemaInfo().getType();
+        }
+        log.debug("recordType={} schemaType={} id={} doc={}",

Review comment:
       It's already in a` if (log.isDebugEnabled()) {} ` block here, that's not enough ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] vroyer commented on a change in pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

Posted by GitBox <gi...@apache.org>.
vroyer commented on a change in pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613#discussion_r635331459



##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,254 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 /**
  * The base abstract class for ElasticSearch sinks.
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticsearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {
 
-    private URL url;
-    private RestHighLevelClient client;
-    private CredentialsProvider credentialsProvider;
-    private ElasticSearchConfig elasticSearchConfig;
+    private ElasticsearchConfig elasticSearchConfig;
+    private ElasticsearchClient elasticsearchClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        elasticSearchConfig = ElasticSearchConfig.load(config);
+        elasticSearchConfig = ElasticsearchConfig.load(config);
         elasticSearchConfig.validate();
-        createIndexIfNeeded();
+        elasticsearchClient = new ElasticsearchClient(elasticSearchConfig);
     }
 
     @Override
-    public void close() throws Exception {
-        client.close();
+    public void close() {
+        if (elasticsearchClient != null) {
+            elasticsearchClient.close();
+            elasticsearchClient = null;
+        }
     }
 
     @Override
-    public void write(Record<byte[]> record) {
-        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
-        IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName());
-        indexRequest.type(elasticSearchConfig.getTypeName());
-        indexRequest.source(keyValue.getValue(), XContentType.JSON);
-
-        try {
-        IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT);
-            if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
-                record.ack();
-            } else {
-                record.fail();
+    public void write(Record<GenericObject> record) throws Exception {
+        if (!elasticsearchClient.isFailed()) {
+            try {
+                Pair<String, String> idAndDoc = extractIdAndDocument(record);
+                if (idAndDoc.getRight() == null) {
+                    switch (elasticSearchConfig.getNullValueAction()) {
+                        case DELETE:
+                            if (elasticSearchConfig.isBulkEnabled()) {
+                                elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
+                            } else {
+                                elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
+                            }
+                            break;
+                        case IGNORE:
+                            break;
+                        case FAIL:
+                            elasticsearchClient.failed(new PulsarClientException.InvalidMessageException("Unexpected null message value"));
+                            throw elasticsearchClient.irrecoverableError.get();
+                    }
+                } else {
+                    if (elasticSearchConfig.isBulkEnabled()) {
+                        elasticsearchClient.bulkIndex(record, idAndDoc);
+                    } else {
+                        elasticsearchClient.indexDocument(record, idAndDoc);
+                    }
+                }
+            } catch (Exception e) {
+                log.error("write error:", e);
+                throw e;
             }
-        } catch (final IOException ex) {
-            record.fail();
         }
     }
 
-    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
-        String key = record.getKey().orElse("");
-        return new KeyValue<>(key, record.getValue());
+    @VisibleForTesting
+    ElasticsearchClient getElasticsearchClient() {
+        return this.elasticsearchClient;
     }
 
-    private void createIndexIfNeeded() throws IOException {
-        GetIndexRequest request = new GetIndexRequest();
-        request.indices(elasticSearchConfig.getIndexName());
-        boolean exists = getClient().indices().exists(request, RequestOptions.DEFAULT);
+    /**
+     * Extract ES _id and _source using the Schema if available.
+     *
+     * @param record
+     * @return A pair for _id and _source
+     */
+    public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) {
+        Object key = null;
+        Object value = null;
+        Schema<?> keySchema = null;
+        Schema<?> valueSchema = null;
+
+        if (record.getSchema() instanceof KeyValueSchema) {
+            KeyValueSchema<GenericObject,GenericObject> keyValueSchema = (KeyValueSchema) record.getSchema();
+            keySchema = keyValueSchema.getKeySchema();
+            valueSchema = keyValueSchema.getValueSchema();
+            if (KeyValueEncodingType.SEPARATED.equals(keyValueSchema.getKeyValueEncodingType()) && record.getKey().isPresent()) {
+                key = keySchema.decode(record.getKey().get().getBytes(StandardCharsets.UTF_8));
+            }
+            if (record.getValue() != null) {
+                value = ((KeyValue) record.getValue().getNativeObject()).getValue();
+                if (KeyValueEncodingType.INLINE.equals(keyValueSchema.getKeyValueEncodingType())) {
+                    key = ((KeyValue) record.getValue().getNativeObject()).getKey();
+                }
+            }
+        } else {
+            key = record.getKey().orElse(null);
+            valueSchema = record.getSchema();
+            value = record.getValue() == null ? null : record.getValue().getNativeObject();
+        }
 
-        if (!exists) {
-            CreateIndexRequest cireq = new CreateIndexRequest(elasticSearchConfig.getIndexName());
+        String id = key.toString();

Review comment:
       Yes, fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on a change in pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

Posted by GitBox <gi...@apache.org>.
sijie commented on a change in pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613#discussion_r643282567



##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,243 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
-/**
- * The base abstract class for ElasticSearch sinks.
- * Users need to implement extractKeyValue function to use this sink.
- * This class assumes that the input will be JSON documents
- */
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticSearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {

Review comment:
       @eolivelli Have you tested the upgraded process? Can you add an integration test for it because it changes the signature?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613#discussion_r643276283



##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,243 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
-/**
- * The base abstract class for ElasticSearch sinks.
- * Users need to implement extractKeyValue function to use this sink.
- * This class assumes that the input will be JSON documents
- */
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticSearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {

Review comment:
       This works well, it is not a breaking change, because with Sink<GenericObject> you can handle every Schema Type.
   it also works with Schema less topics.
   
   in case of a schema less topic you get a GenericObject with a byte[] payload (getNativeObject) and BYTES as SchemaType
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli closed pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

Posted by GitBox <gi...@apache.org>.
eolivelli closed pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] vroyer commented on a change in pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

Posted by GitBox <gi...@apache.org>.
vroyer commented on a change in pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613#discussion_r635330913



##########
File path: pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SslConfig.java
##########
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.core;
+
+import lombok.Data;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.Serializable;
+
+@Data
+@Accessors(chain = true)

Review comment:
       Moved to the es-sink




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] vroyer commented on a change in pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

Posted by GitBox <gi...@apache.org>.
vroyer commented on a change in pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613#discussion_r633847536



##########
File path: pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SslConfig.java
##########
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.core;
+
+import lombok.Data;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.Serializable;
+
+@Data
+@Accessors(chain = true)

Review comment:
       I can put this in the ES-sink, but all sink connectors will need a SSL/TLS client configuration, and this is why I have moved this in the pulsar-io core module.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] vroyer commented on a change in pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

Posted by GitBox <gi...@apache.org>.
vroyer commented on a change in pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613#discussion_r635414605



##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticsearchClient.java
##########
@@ -0,0 +1,565 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.elasticsearch;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
+import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.nio.conn.NHttpClientConnectionManager;
+import org.apache.http.nio.conn.NoopIOSessionStrategy;
+import org.apache.http.nio.conn.SchemeIOSessionStrategy;
+import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
+import org.apache.http.nio.reactor.ConnectingIOReactor;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.functions.api.Record;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.client.*;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.CreateIndexResponse;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Slf4j
+public class ElasticsearchClient {
+
+    static final String[] malformedErrors = {
+            "mapper_parsing_exception",
+            "action_request_validation_exception",
+            "illegal_argument_exception"
+    };
+
+    private ElasticsearchConfig config;
+    private ConfigCallback configCallback;
+    private RestHighLevelClient client;
+
+    final Set<String> indexCache = new HashSet<>();
+    final Map<String, String> topicToIndexCache = new HashMap<>();
+
+    final RandomExponentialRetry backoffRetry;
+    final BulkProcessor bulkProcessor;
+    final ConcurrentMap<DocWriteRequest<?>, Record> records = new ConcurrentHashMap<>();
+    final AtomicReference<Exception> irrecoverableError = new AtomicReference<>();
+    final ScheduledExecutorService executorService;
+
+    ElasticsearchClient(ElasticsearchConfig elasticSearchConfig) throws MalformedURLException {
+        this.config = elasticSearchConfig;
+        this.configCallback = new ConfigCallback();
+        this.backoffRetry = new RandomExponentialRetry(elasticSearchConfig.getMaxRetryTimeInSec());
+        if (config.isBulkEnabled() == false) {
+            bulkProcessor = null;
+        } else {
+            BulkProcessor.Builder builder = BulkProcessor.builder(
+                    (bulkRequest, bulkResponseActionListener) -> client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener),
+                    new BulkProcessor.Listener() {
+                        @Override
+                        public void beforeBulk(long l, BulkRequest bulkRequest) {
+                        }
+
+                        @Override
+                        public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
+                            log.trace("Bulk request id={} size={}:", l, bulkRequest.requests().size());
+                            for (int i = 0; i < bulkResponse.getItems().length; i++) {
+                                DocWriteRequest<?> request = bulkRequest.requests().get(i);
+                                Record record = records.get(request);
+                                BulkItemResponse bulkItemResponse = bulkResponse.getItems()[i];
+                                if (bulkItemResponse.isFailed()) {
+                                    record.fail();
+                                    try {
+                                        hasIrrecoverableError(bulkItemResponse);
+                                    } catch(Exception e) {
+                                        log.warn("Unrecoverable error:", e);
+                                    }
+                                } else {
+                                    record.ack();
+                                }
+                                records.remove(request);
+                            }
+                        }
+
+                        @Override
+                        public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
+                            log.warn("Bulk request id={} failed:", l, throwable);
+                            for (DocWriteRequest<?> request : bulkRequest.requests()) {
+                                Record record = records.remove(request);
+                                record.fail();
+                            }
+                        }
+                    }
+            )
+                    .setBulkActions(config.getBulkActions())
+                    .setBulkSize(new ByteSizeValue(config.getBulkSizeInMb(), ByteSizeUnit.MB))
+                    .setConcurrentRequests(config.getBulkConcurrentRequests())
+                    .setBackoffPolicy(new RandomExponentialBackoffPolicy(backoffRetry,
+                            config.getRetryBackoffInMs(),
+                            config.getMaxRetries()
+                    ));
+            if (config.getBulkFlushIntervalInMs() > 0) {
+                builder.setFlushInterval(new TimeValue(config.getBulkFlushIntervalInMs(), TimeUnit.MILLISECONDS));
+            }
+            this.bulkProcessor = builder.build();
+        }
+
+        // idle+expired connection evictor thread
+        this.executorService = Executors.newSingleThreadScheduledExecutor();
+        this.executorService.scheduleAtFixedRate(new Runnable() {
+                                                     @Override
+                                                     public void run() {
+                                                         configCallback.connectionManager.closeExpiredConnections();
+                                                         configCallback.connectionManager.closeIdleConnections(
+                                                                 config.getConnectionIdleTimeoutInMs(), TimeUnit.MILLISECONDS);
+                                                     }
+                                                 },
+                config.getConnectionIdleTimeoutInMs(),
+                config.getConnectionIdleTimeoutInMs(),
+                TimeUnit.MILLISECONDS
+        );
+
+        URL url = new URL(config.getElasticSearchUrl());
+        RestClientBuilder builder = RestClient.builder(new HttpHost(url.getHost(), url.getPort(), url.getProtocol()))
+                .setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
+                    @Override
+                    public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
+                        return builder
+                                .setContentCompressionEnabled(config.isCompressionEnabled())
+                                .setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs())
+                                .setConnectTimeout(config.getConnectTimeoutInMs())
+                                .setSocketTimeout(config.getSocketTimeoutInMs());
+                    }
+                })
+                .setHttpClientConfigCallback(this.configCallback)
+                .setFailureListener(new RestClient.FailureListener() {
+                    public void onFailure(Node node) {
+                        log.warn("Node host={} failed", node.getHost());
+                    }
+                });
+        this.client = new RestHighLevelClient(builder);
+    }
+
+    void failed(Exception e) throws Exception {
+        if (irrecoverableError.compareAndSet(null, e)) {
+            log.error("Irrecoverable error:", e);
+        }
+    }
+
+    boolean isFailed() {
+        return irrecoverableError.get() != null;
+    }
+
+    void hasIrrecoverableError(BulkItemResponse bulkItemResponse) throws Exception {
+        for (String error : malformedErrors) {
+            if (bulkItemResponse.getFailureMessage().contains(error)) {
+                switch (config.getMalformedDocAction()) {
+                    case IGNORE:
+                        break;
+                    case WARN:
+                        log.warn("Ignoring malformed document index={} id={}",
+                                bulkItemResponse.getIndex(),
+                                bulkItemResponse.getId(),
+                                bulkItemResponse.getFailure().getCause());
+                        break;
+                    case FAIL:
+                        log.error("Failure due to the malformed document index={} id={}",
+                                bulkItemResponse.getIndex(),
+                                bulkItemResponse.getId(),
+                                bulkItemResponse.getFailure().getCause());
+                        failed(bulkItemResponse.getFailure().getCause());
+                        break;
+                }
+            }
+        }
+    }
+
+    public void bulkIndex(Record record, Pair<String, String> idAndDoc) throws Exception {
+        try {
+            checkNotFailed();
+            checkIndexExists(record.getTopicName());
+            IndexRequest indexRequest = Requests.indexRequest(config.getIndexName());
+            indexRequest.id(idAndDoc.getLeft());
+            indexRequest.type(config.getTypeName());
+            indexRequest.source(idAndDoc.getRight(), XContentType.JSON);
+
+            records.put(indexRequest, record);
+            bulkProcessor.add(indexRequest);
+        } catch(Exception e) {
+            log.debug("index failed id=" + idAndDoc.getLeft(), e);
+            record.fail();
+            throw e;
+        }
+    }
+
+    /**
+     * Index an elasticsearch document and ack the record.
+     * @param record
+     * @param idAndDoc
+     * @return
+     * @throws Exception
+     */
+    public boolean indexDocument(Record<GenericObject> record, Pair<String, String> idAndDoc) throws Exception {
+        try {
+            checkNotFailed();
+            checkIndexExists(record.getTopicName());
+            IndexRequest indexRequest = Requests.indexRequest(config.getIndexName());
+            indexRequest.id(idAndDoc.getLeft());
+            indexRequest.type(config.getTypeName());
+            indexRequest.source(idAndDoc.getRight(), XContentType.JSON);
+            IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
+            log.debug("index id={} result={}", idAndDoc.getLeft(), indexResponse.getResult());
+            if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED) ||
+                    indexResponse.getResult().equals(DocWriteResponse.Result.UPDATED)) {
+                record.ack();
+                return true;
+            } else {
+                record.fail();
+                return false;
+            }
+        } catch (final Exception ex) {
+            log.debug("index failed id=" + idAndDoc.getLeft(), ex);
+            record.fail();
+            throw ex;
+        }
+    }
+
+    public void bulkDelete(Record<GenericObject> record, String id) throws Exception {
+        try {
+            checkNotFailed();
+            checkIndexExists(record.getTopicName());
+            DeleteRequest deleteRequest = Requests.deleteRequest(config.getIndexName());
+            deleteRequest.id(id);
+            deleteRequest.type(config.getTypeName());
+
+            records.put(deleteRequest, record);
+            bulkProcessor.add(deleteRequest);
+        } catch(Exception e) {
+            log.debug("delete failed id=" + id, e);
+            record.fail();
+            throw e;
+        }
+    }
+
+    /**
+     * Delete an elasticsearch document and ack the record.
+     * @param record
+     * @param id
+     * @return
+     * @throws IOException
+     */
+    public boolean deleteDocument(Record<GenericObject> record, String id) throws Exception {
+        try {
+            checkNotFailed();
+            checkIndexExists(record.getTopicName());
+            DeleteRequest deleteRequest = Requests.deleteRequest(config.getIndexName());
+            deleteRequest.id(id);
+            deleteRequest.type(config.getTypeName());
+            DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);
+            log.debug("delete result=" + deleteResponse.getResult());
+            if (deleteResponse.getResult().equals(DocWriteResponse.Result.DELETED) ||
+                    deleteResponse.getResult().equals(DocWriteResponse.Result.NOT_FOUND)) {
+                record.ack();
+                return true;
+            }
+            record.fail();
+            return false;
+        } catch (final Exception ex) {
+            log.debug("index failed id=" + id, ex);
+            record.fail();
+            throw ex;
+        }
+    }
+
+    /**
+     * Flushes the bulk processor.
+     */
+    public void flush() {
+        bulkProcessor.flush();
+    }
+
+    public void close() {
+        try {
+            if (bulkProcessor != null) {
+                bulkProcessor.awaitClose(5000L, TimeUnit.MILLISECONDS);
+            }
+        } catch (InterruptedException e) {
+            log.warn("Elasticsearch bulk processor close error:", e);
+        }
+        try {
+            this.executorService.shutdown();
+            if (this.client != null) {
+                this.client.close();
+            }
+        } catch (IOException e) {
+            log.warn("Elasticsearch client close error:", e);
+        }
+    }
+
+    private void checkNotFailed() throws Exception {
+        if (irrecoverableError.get() != null) {
+            throw irrecoverableError.get();
+        }
+    }
+
+    private void checkIndexExists(Optional<String> topicName) throws IOException {
+        String indexName = indexName(topicName);
+        if (!indexCache.contains(indexName)) {
+            synchronized (this) {
+                if (!indexCache.contains(indexName)) {
+                    createIndexIfNeeded(indexName);
+                    indexCache.add(indexName);
+                }
+            }
+        }
+    }
+
+    private String indexName(Optional<String> topicName) throws IOException {
+        if (config.getIndexName() != null) {
+            // Use the configured indexName if provided.
+            return config.getIndexName();
+        }
+        if (!topicName.isPresent()) {
+            throw new IOException("Elasticsearch index name configuration and topic name are empty");
+        }
+        return topicToIndexName(topicName.get());
+    }
+
+    @VisibleForTesting
+    public String topicToIndexName(String topicName) {
+        return topicToIndexCache.computeIfAbsent(topicName, k -> {
+            // see elasticsearch limitations https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html#indices-create-api-path-params
+            String indexName = topicName.toLowerCase(Locale.ROOT);
+
+            // truncate to the max bytes length
+            while (indexName.getBytes(StandardCharsets.UTF_8).length > 255) {
+                indexName = indexName.substring(0, indexName.length() - 1);
+            }
+            if (indexName.length() <= 0 || !indexName.matches("[a-zA-Z\\.0-9][a-zA-Z_\\.\\-\\+0-9]*")) {
+                throw new RuntimeException(new IOException("Cannot convert the topic name='" + topicName + "' to a valid elasticsearch index name"));
+            }
+            log.debug("Translate topic={} to index={}", k, indexName);

Review comment:
       Ok, fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on a change in pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

Posted by GitBox <gi...@apache.org>.
sijie commented on a change in pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613#discussion_r643265710



##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,243 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
-/**
- * The base abstract class for ElasticSearch sinks.
- * Users need to implement extractKeyValue function to use this sink.
- * This class assumes that the input will be JSON documents
- */
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticSearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {

Review comment:
       I see this is a breaking change for the existing ElasticSearch user. Can you please provide a separate class for receiving `GenericObject`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] vroyer commented on a change in pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

Posted by GitBox <gi...@apache.org>.
vroyer commented on a change in pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613#discussion_r635362061



##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,254 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 /**
  * The base abstract class for ElasticSearch sinks.
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticsearchConfig.class

Review comment:
       The good spelling should be Elasticsearch (and the module should be elasticsearch, not elastic-search), but to keep the old name style, I have renamed all classes with ElasticSearchXxxx.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] vroyer commented on a change in pull request #10613: [Elasticsearch-sink] Enhance the Elasticsearch sink connector

Posted by GitBox <gi...@apache.org>.
vroyer commented on a change in pull request #10613:
URL: https://github.com/apache/pulsar/pull/10613#discussion_r635330632



##########
File path: pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
##########
@@ -18,139 +18,254 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 /**
  * The base abstract class for ElasticSearch sinks.
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
 @Connector(
-    name = "elastic_search",
-    type = IOType.SINK,
-    help = "A sink connector that sends pulsar messages to elastic search",
-    configClass = ElasticSearchConfig.class
+        name = "elastic_search",
+        type = IOType.SINK,
+        help = "A sink connector that sends pulsar messages to elastic search",
+        configClass = ElasticsearchConfig.class
 )
-public class ElasticSearchSink implements Sink<byte[]> {
+@Slf4j
+public class ElasticSearchSink implements Sink<GenericObject> {
 
-    private URL url;
-    private RestHighLevelClient client;
-    private CredentialsProvider credentialsProvider;
-    private ElasticSearchConfig elasticSearchConfig;
+    private ElasticsearchConfig elasticSearchConfig;
+    private ElasticsearchClient elasticsearchClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        elasticSearchConfig = ElasticSearchConfig.load(config);
+        elasticSearchConfig = ElasticsearchConfig.load(config);
         elasticSearchConfig.validate();
-        createIndexIfNeeded();
+        elasticsearchClient = new ElasticsearchClient(elasticSearchConfig);
     }
 
     @Override
-    public void close() throws Exception {
-        client.close();
+    public void close() {
+        if (elasticsearchClient != null) {
+            elasticsearchClient.close();
+            elasticsearchClient = null;
+        }
     }
 
     @Override
-    public void write(Record<byte[]> record) {
-        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
-        IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName());
-        indexRequest.type(elasticSearchConfig.getTypeName());
-        indexRequest.source(keyValue.getValue(), XContentType.JSON);
-
-        try {
-        IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT);
-            if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
-                record.ack();
-            } else {
-                record.fail();
+    public void write(Record<GenericObject> record) throws Exception {
+        if (!elasticsearchClient.isFailed()) {
+            try {
+                Pair<String, String> idAndDoc = extractIdAndDocument(record);
+                if (idAndDoc.getRight() == null) {
+                    switch (elasticSearchConfig.getNullValueAction()) {
+                        case DELETE:
+                            if (elasticSearchConfig.isBulkEnabled()) {
+                                elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
+                            } else {
+                                elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
+                            }
+                            break;
+                        case IGNORE:
+                            break;
+                        case FAIL:
+                            elasticsearchClient.failed(new PulsarClientException.InvalidMessageException("Unexpected null message value"));
+                            throw elasticsearchClient.irrecoverableError.get();
+                    }
+                } else {
+                    if (elasticSearchConfig.isBulkEnabled()) {
+                        elasticsearchClient.bulkIndex(record, idAndDoc);
+                    } else {
+                        elasticsearchClient.indexDocument(record, idAndDoc);
+                    }
+                }
+            } catch (Exception e) {

Review comment:
       This is to trace all unexpected Exception.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org