You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/18 00:32:14 UTC

[GitHub] [flink] dianfu commented on a diff in pull request #20002: [FLINK-28107][python][connector/elasticsearch] Support id of document is null

dianfu commented on code in PR #20002:
URL: https://github.com/apache/flink/pull/20002#discussion_r900607863


##########
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/SimpleElasticsearchEmitter.java:
##########
@@ -20,76 +20,87 @@
 
 import org.apache.flink.api.connector.sink2.SinkWriter;
 
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.update.UpdateRequest;
 
-import java.io.Serializable;
+import javax.annotation.Nullable;
+
 import java.util.Map;
-import java.util.function.Function;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
-/** A ElasticsearchEmitter that is currently used Python Flink Connector. */
+/** A simple ElasticsearchEmitter which is currently used in PyFlink ES connector. */
 public class SimpleElasticsearchEmitter implements ElasticsearchEmitter<Map<String, Object>> {
 
     private static final long serialVersionUID = 1L;
-    private Function<Map<String, Object>, UpdateRequest> requestGenerator;
+
+    private final String index;
+    private @Nullable final String documentType;
+    private @Nullable final String idFieldName;
+    private final boolean isDynamicIndex;
+
+    private transient BiConsumer<Map<String, Object>, RequestIndexer> requestGenerator;
 
     public SimpleElasticsearchEmitter(
-            String index, String documentType, String idFieldName, boolean isDynamicIndex) {
-        // If this issue resolve https://issues.apache.org/jira/browse/MSHADE-260
-        // we can replace requestGenerator with lambda.
-        // Other corresponding issues https://issues.apache.org/jira/browse/FLINK-18857 and
-        // https://issues.apache.org/jira/browse/FLINK-18006
+            String index,
+            @Nullable String documentType,
+            @Nullable String idFieldName,
+            boolean isDynamicIndex) {
+        this.index = checkNotNull(index);
+        this.documentType = documentType;
+        this.idFieldName = idFieldName;
+        this.isDynamicIndex = isDynamicIndex;
+    }
+
+    @Override
+    public void open() throws Exception {
         if (isDynamicIndex) {
-            this.requestGenerator =
-                    new DynamicIndexRequestGenerator(index, documentType, idFieldName);
+            final String indexFieldName = index;
+            requestGenerator =

Review Comment:
   The logic could be further simplified by making the following abstraction:
   {code}
           Function<Map<String, Object>, String> indexProvider;
           if (isDynamicIndex) {
               indexProvider = doc -> doc.get(index).toString();
           } else {
               indexProvider = doc -> index;
           }
   {code}



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org