You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/17 08:55:15 UTC

[GitHub] [flink] deadwind4 opened a new pull request, #20002: [FLINK-28107][python][connector/elasticsearch] Support id of document is null

deadwind4 opened a new pull request, #20002:
URL: https://github.com/apache/flink/pull/20002

   ## What is the purpose of the change
   
   Support id of document is null
   
   ## Brief change log
   
     - *Add IndexRequest in SimpleElasticsearchEmitter*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *Added integration tests for end-to-end deployment with large payloads (100MB)*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dianfu commented on a diff in pull request #20002: [FLINK-28107][python][connector/elasticsearch] Support id of document is null

Posted by GitBox <gi...@apache.org>.
dianfu commented on code in PR #20002:
URL: https://github.com/apache/flink/pull/20002#discussion_r900607863


##########
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/SimpleElasticsearchEmitter.java:
##########
@@ -20,76 +20,87 @@
 
 import org.apache.flink.api.connector.sink2.SinkWriter;
 
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.update.UpdateRequest;
 
-import java.io.Serializable;
+import javax.annotation.Nullable;
+
 import java.util.Map;
-import java.util.function.Function;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
-/** A ElasticsearchEmitter that is currently used Python Flink Connector. */
+/** A simple ElasticsearchEmitter which is currently used in PyFlink ES connector. */
 public class SimpleElasticsearchEmitter implements ElasticsearchEmitter<Map<String, Object>> {
 
     private static final long serialVersionUID = 1L;
-    private Function<Map<String, Object>, UpdateRequest> requestGenerator;
+
+    private final String index;
+    private @Nullable final String documentType;
+    private @Nullable final String idFieldName;
+    private final boolean isDynamicIndex;
+
+    private transient BiConsumer<Map<String, Object>, RequestIndexer> requestGenerator;
 
     public SimpleElasticsearchEmitter(
-            String index, String documentType, String idFieldName, boolean isDynamicIndex) {
-        // If this issue resolve https://issues.apache.org/jira/browse/MSHADE-260
-        // we can replace requestGenerator with lambda.
-        // Other corresponding issues https://issues.apache.org/jira/browse/FLINK-18857 and
-        // https://issues.apache.org/jira/browse/FLINK-18006
+            String index,
+            @Nullable String documentType,
+            @Nullable String idFieldName,
+            boolean isDynamicIndex) {
+        this.index = checkNotNull(index);
+        this.documentType = documentType;
+        this.idFieldName = idFieldName;
+        this.isDynamicIndex = isDynamicIndex;
+    }
+
+    @Override
+    public void open() throws Exception {
         if (isDynamicIndex) {
-            this.requestGenerator =
-                    new DynamicIndexRequestGenerator(index, documentType, idFieldName);
+            final String indexFieldName = index;
+            requestGenerator =

Review Comment:
   The logic could be further simplified by making the following abstraction:
   ```
           Function<Map<String, Object>, String> indexProvider;
           if (isDynamicIndex) {
               indexProvider = doc -> doc.get(index).toString();
           } else {
               indexProvider = doc -> index;
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dianfu closed pull request #20002: [FLINK-28107][python][connector/elasticsearch] Support id of document is null

Posted by GitBox <gi...@apache.org>.
dianfu closed pull request #20002: [FLINK-28107][python][connector/elasticsearch] Support id of document is null
URL: https://github.com/apache/flink/pull/20002


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20002: [FLINK-28107][python][connector/elasticsearch] Support id of document is null

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20002:
URL: https://github.com/apache/flink/pull/20002#issuecomment-1158658645

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8b75c0fc11b7ac3e83a73c317d0a12f5cd1116bf",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8b75c0fc11b7ac3e83a73c317d0a12f5cd1116bf",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8b75c0fc11b7ac3e83a73c317d0a12f5cd1116bf UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dianfu commented on a diff in pull request #20002: [FLINK-28107][python][connector/elasticsearch] Support id of document is null

Posted by GitBox <gi...@apache.org>.
dianfu commented on code in PR #20002:
URL: https://github.com/apache/flink/pull/20002#discussion_r900607863


##########
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/SimpleElasticsearchEmitter.java:
##########
@@ -20,76 +20,87 @@
 
 import org.apache.flink.api.connector.sink2.SinkWriter;
 
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.update.UpdateRequest;
 
-import java.io.Serializable;
+import javax.annotation.Nullable;
+
 import java.util.Map;
-import java.util.function.Function;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
-/** A ElasticsearchEmitter that is currently used Python Flink Connector. */
+/** A simple ElasticsearchEmitter which is currently used in PyFlink ES connector. */
 public class SimpleElasticsearchEmitter implements ElasticsearchEmitter<Map<String, Object>> {
 
     private static final long serialVersionUID = 1L;
-    private Function<Map<String, Object>, UpdateRequest> requestGenerator;
+
+    private final String index;
+    private @Nullable final String documentType;
+    private @Nullable final String idFieldName;
+    private final boolean isDynamicIndex;
+
+    private transient BiConsumer<Map<String, Object>, RequestIndexer> requestGenerator;
 
     public SimpleElasticsearchEmitter(
-            String index, String documentType, String idFieldName, boolean isDynamicIndex) {
-        // If this issue resolve https://issues.apache.org/jira/browse/MSHADE-260
-        // we can replace requestGenerator with lambda.
-        // Other corresponding issues https://issues.apache.org/jira/browse/FLINK-18857 and
-        // https://issues.apache.org/jira/browse/FLINK-18006
+            String index,
+            @Nullable String documentType,
+            @Nullable String idFieldName,
+            boolean isDynamicIndex) {
+        this.index = checkNotNull(index);
+        this.documentType = documentType;
+        this.idFieldName = idFieldName;
+        this.isDynamicIndex = isDynamicIndex;
+    }
+
+    @Override
+    public void open() throws Exception {
         if (isDynamicIndex) {
-            this.requestGenerator =
-                    new DynamicIndexRequestGenerator(index, documentType, idFieldName);
+            final String indexFieldName = index;
+            requestGenerator =

Review Comment:
   The logic could be further simplified by making the following abstraction:
   {code}
           Function<Map<String, Object>, String> indexProvider;
           if (isDynamicIndex) {
               indexProvider = doc -> doc.get(index).toString();
           } else {
               indexProvider = doc -> index;
           }
   {code}



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dianfu commented on pull request #20002: [FLINK-28107][python][connector/elasticsearch] Support id of document is null

Posted by GitBox <gi...@apache.org>.
dianfu commented on PR #20002:
URL: https://github.com/apache/flink/pull/20002#issuecomment-1159753671

   @deadwind4 Thanks for the update! LGTM overall. I'm wondering if we should rename SimpleElasticsearchEmitter to MapElasticsearchEmitter to indicate that it actually accepts Map as input. What's your thought?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] deadwind4 commented on pull request #20002: [FLINK-28107][python][connector/elasticsearch] Support id of document is null

Posted by GitBox <gi...@apache.org>.
deadwind4 commented on PR #20002:
URL: https://github.com/apache/flink/pull/20002#issuecomment-1159838998

   @dianfu make sense to me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org