You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/07/03 07:53:26 UTC

[GitHub] [inlong] Oneal65 opened a new pull request, #4849: [INLONG-4659][Sort] Support field routing for ElasticSearch connector

Oneal65 opened a new pull request, #4849:
URL: https://github.com/apache/inlong/pull/4849

   ### Prepare a Pull Request
   
   - Fixes #4659 
   
   ### Motivation
   
   Support field routing for ElasticSearch connector
   
   ### Modifications
   
   according to  https://github.com/apache/flink/pull/14493/files
    and https://github.com/apache/flink/tree/release-1.13.2-rc2
   
   modify the whole sort-connector-elasticsearch6 and sort-connector-elasticsearch7 code
   and add sort-connector-elasticsearch-base module
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
     *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
     *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Oneal65 commented on a diff in pull request #4849: [INLONG-4659][Sort] Support field routing for Elasticsearch connector

Posted by GitBox <gi...@apache.org>.
Oneal65 commented on code in PR #4849:
URL: https://github.com/apache/inlong/pull/4849#discussion_r915510690


##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ElasticsearchLoadNode.java:
##########
@@ -94,6 +97,7 @@ public ElasticsearchLoadNode(@JsonProperty("id") String id,
         this.index = Preconditions.checkNotNull(index, "index is null");
         this.documentType = documentType;
         this.primaryKey = primaryKey;
+        this.routingFieldName = primaryKey;

Review Comment:
   @gong OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] dockerzhang merged pull request #4849: [INLONG-4659][Sort] Support field routing for Elasticsearch connector

Posted by GitBox <gi...@apache.org>.
dockerzhang merged PR #4849:
URL: https://github.com/apache/inlong/pull/4849


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Oneal65 commented on a diff in pull request #4849: [INLONG-4659][Sort] Support field routing for Elasticsearch connector

Posted by GitBox <gi...@apache.org>.
Oneal65 commented on code in PR #4849:
URL: https://github.com/apache/inlong/pull/4849#discussion_r915421705


##########
inlong-dashboard/package-lock.json:
##########
@@ -11486,7 +11486,7 @@
     "json2mq": {
       "version": "0.2.0",
       "resolved": "https://registry.npmjs.org/json2mq/-/json2mq-0.2.0.tgz",
-      "integrity": "sha512-SzoRg7ux5DWTII9J2qkrZrqV1gt+rTaoufMxEzXbS26Uid0NwaJd123HcoB80TgubEppxxIGdNxCx50fEoEWQA==",
+      "integrity": "sha1-tje9O6nqvhIsg+lyBIOusQ0skEo=",
       "requires": {
         "string-convert": "^0.2.0"
       }

Review Comment:
   DONE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Oneal65 commented on a diff in pull request #4849: [INLONG-4659][Sort] Support field routing for Elasticsearch connector

Posted by GitBox <gi...@apache.org>.
Oneal65 commented on code in PR #4849:
URL: https://github.com/apache/inlong/pull/4849#discussion_r913707586


##########
.idea/vcs.xml:
##########
@@ -1,36 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!--

Review Comment:
   DONE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Oneal65 commented on a diff in pull request #4849: [INLONG-4659][Sort] Support field routing for Elasticsearch connector

Posted by GitBox <gi...@apache.org>.
Oneal65 commented on code in PR #4849:
URL: https://github.com/apache/inlong/pull/4849#discussion_r915491588


##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ElasticsearchLoadNode.java:
##########
@@ -94,6 +97,7 @@ public ElasticsearchLoadNode(@JsonProperty("id") String id,
         this.index = Preconditions.checkNotNull(index, "index is null");
         this.documentType = documentType;
         this.primaryKey = primaryKey;
+        this.routingFieldName = primaryKey;

Review Comment:
   the default value for inlong-es-connector 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] dockerzhang commented on pull request #4849: [INLONG-4659][Sort] Support field routing for Elasticsearch connector

Posted by GitBox <gi...@apache.org>.
dockerzhang commented on PR #4849:
URL: https://github.com/apache/inlong/pull/4849#issuecomment-1177264948

   please unify all the dependency versions for the `elasticsearch6` and `elasticsearch7`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Oneal65 commented on pull request #4849: [INLONG-4659][Sort] Support field routing for Elasticsearch connector

Posted by GitBox <gi...@apache.org>.
Oneal65 commented on PR #4849:
URL: https://github.com/apache/inlong/pull/4849#issuecomment-1174970832

   > 1.The problem background needs to be added. 2.Design ideas and schemes are necessary. 3.Add the copied source code to the LICENSE.
   
   @GanfengTan Now, you can see it In Background, Design ideas and others


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Oneal65 commented on a diff in pull request #4849: [INLONG-4659][Sort] Support field routing for Elasticsearch connector

Posted by GitBox <gi...@apache.org>.
Oneal65 commented on code in PR #4849:
URL: https://github.com/apache/inlong/pull/4849#discussion_r915444349


##########
inlong-sort/sort-connectors/elasticsearch-base/pom.xml:
##########
@@ -0,0 +1,180 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~  Licensed to the Apache Software Foundation (ASF) under one
+  ~  or more contributor license agreements.  See the NOTICE file
+  ~  distributed with this work for additional information
+  ~  regarding copyright ownership.  The ASF licenses this file
+  ~  to you under the Apache License, Version 2.0 (the
+  ~  "License"); you may not use this file except in compliance
+  ~  with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>sort-connectors</artifactId>
+        <groupId>org.apache.inlong</groupId>
+        <version>1.3.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>sort-connector-elasticsearch-base</artifactId>
+    <name>Apache InLong - Sort-connector-elasticsearch-base</name>
+    <packaging>jar</packaging>
+
+
+    <properties>
+        <elasticsearch.version>6.8.17</elasticsearch.version>
+    </properties>
+

Review Comment:
   DONE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Oneal65 closed pull request #4849: [INLONG-4659][Sort] Support field routing for Elasticsearch connector

Posted by GitBox <gi...@apache.org>.
Oneal65 closed pull request #4849: [INLONG-4659][Sort] Support field routing for Elasticsearch connector
URL: https://github.com/apache/inlong/pull/4849


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Oneal65 commented on a diff in pull request #4849: [INLONG-4659][Sort] Support field routing for Elasticsearch connector

Posted by GitBox <gi...@apache.org>.
Oneal65 commented on code in PR #4849:
URL: https://github.com/apache/inlong/pull/4849#discussion_r915429448


##########
inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.Period;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/** An extractor for a Elasticsearch key from a {@link RowData}. */
+public class KeyExtractor implements Function<RowData, String>, Serializable {
+    private final FieldFormatter[] fieldFormatters;
+    private final String keyDelimiter;
+
+    private interface FieldFormatter extends Serializable {
+        String format(RowData rowData);
+    }
+
+    private KeyExtractor(FieldFormatter[] fieldFormatters, String keyDelimiter) {
+        this.fieldFormatters = fieldFormatters;
+        this.keyDelimiter = keyDelimiter;
+    }
+
+    @Override
+    public String apply(RowData rowData) {
+        final StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < fieldFormatters.length; i++) {
+            if (i > 0) {
+                builder.append(keyDelimiter);
+            }
+            final String value = fieldFormatters[i].format(rowData);
+            builder.append(value);
+        }
+        return builder.toString();
+    }
+
+    private static class ColumnWithIndex {
+        public TableColumn column;
+        public int index;
+
+        public ColumnWithIndex(TableColumn column, int index) {
+            this.column = column;
+            this.index = index;
+        }
+
+        public LogicalType getType() {
+            return column.getType().getLogicalType();
+        }
+
+        public int getIndex() {
+            return index;
+        }
+    }
+
+    public static Function<RowData, String> createKeyExtractor(
+            TableSchema schema, String keyDelimiter) {

Review Comment:
   the same reason.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Oneal65 commented on a diff in pull request #4849: [INLONG-4659][Sort] Support field routing for Elasticsearch connector

Posted by GitBox <gi...@apache.org>.
Oneal65 commented on code in PR #4849:
URL: https://github.com/apache/inlong/pull/4849#discussion_r915428380


##########
inlong-dashboard/package-lock.json:
##########
@@ -11486,7 +11486,7 @@
     "json2mq": {
       "version": "0.2.0",
       "resolved": "https://registry.npmjs.org/json2mq/-/json2mq-0.2.0.tgz",
-      "integrity": "sha512-SzoRg7ux5DWTII9J2qkrZrqV1gt+rTaoufMxEzXbS26Uid0NwaJd123HcoB80TgubEppxxIGdNxCx50fEoEWQA==",
+      "integrity": "sha1-tje9O6nqvhIsg+lyBIOusQ0skEo=",
       "requires": {
         "string-convert": "^0.2.0"
       }

Review Comment:
   It changed automatically.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on pull request #4849: [INLONG-4659][Sort] Support field routing for Elasticsearch connector

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on PR #4849:
URL: https://github.com/apache/inlong/pull/4849#issuecomment-1173269161

   1.The problem background needs to be added.
   2.Design ideas and schemes are necessary.
   3.Add the copied source code to the LICENSE.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Oneal65 commented on a diff in pull request #4849: [INLONG-4659][Sort] Support field routing for Elasticsearch connector

Posted by GitBox <gi...@apache.org>.
Oneal65 commented on code in PR #4849:
URL: https://github.com/apache/inlong/pull/4849#discussion_r915418373


##########
.idea/vcs.xml:
##########
@@ -33,4 +33,4 @@
       </list>
     </option>
   </component>
-</project>
+</project>

Review Comment:
   there is no changes in diff



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Oneal65 commented on pull request #4849: [INLONG-4659][Sort] Support field routing for Elasticsearch connector

Posted by GitBox <gi...@apache.org>.
Oneal65 commented on PR #4849:
URL: https://github.com/apache/inlong/pull/4849#issuecomment-1177279401

   > please unify all the dependency versions for the `elasticsearch6` and `elasticsearch7`
   
   @dockerzhang done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] dockerzhang commented on pull request #4849: [INLONG-4659][Sort] Support field routing for ElasticSearch connector

Posted by GitBox <gi...@apache.org>.
dockerzhang commented on PR #4849:
URL: https://github.com/apache/inlong/pull/4849#issuecomment-1173099109

   @Oneal65 there are UTs failed, please check again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #4849: [INLONG-4659][Sort] Support field routing for Elasticsearch connector

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #4849:
URL: https://github.com/apache/inlong/pull/4849#discussion_r915472791


##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ElasticsearchLoadNode.java:
##########
@@ -94,6 +97,7 @@ public ElasticsearchLoadNode(@JsonProperty("id") String id,
         this.index = Preconditions.checkNotNull(index, "index is null");
         this.documentType = documentType;
         this.primaryKey = primaryKey;
+        this.routingFieldName = primaryKey;

Review Comment:
   `routingFieldName = primaryKey;`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on pull request #4849: [INLONG-4659][Sort] Support field routing for ElasticSearch connector

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on PR #4849:
URL: https://github.com/apache/inlong/pull/4849#issuecomment-1173034029

   1.Multiple module titles must correspond.
   2.Commit history error.
   <img width="926" alt="image" src="https://user-images.githubusercontent.com/17596132/177030932-9690123a-11e2-420d-a180-6d891c5dfb05.png">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Oneal65 commented on pull request #4849: [INLONG-4659][Sort] Support field routing for Elasticsearch connector

Posted by GitBox <gi...@apache.org>.
Oneal65 commented on PR #4849:
URL: https://github.com/apache/inlong/pull/4849#issuecomment-1174966121

   > 1.The problem background needs to be added. 2.Design ideas and schemes are necessary. 3.Add the copied source code to the LICENSE.
   
   
   
   > 1.The problem background needs to be added. 2.Design ideas and schemes are necessary. 3.Add the copied source code to the LICENSE.
   
   
   
   > 1.The problem background needs to be added. 2.Design ideas and schemes are necessary. 3.Add the copied source code to the LICENSE.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Oneal65 commented on pull request #4849: [INLONG-4659][Sort] Support field routing for Elasticsearch connector

Posted by GitBox <gi...@apache.org>.
Oneal65 commented on PR #4849:
URL: https://github.com/apache/inlong/pull/4849#issuecomment-1174969045

   > @Oneal65 there are UTs failed, please check again.
   
   
   
   > @Oneal65 there are UTs failed, please check again.
   
   
   
   > @Oneal65 there are UTs failed, please check again.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #4849: [INLONG-4659][Sort] Support field routing for ElasticSearch connector

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #4849:
URL: https://github.com/apache/inlong/pull/4849#discussion_r912449186


##########
.idea/vcs.xml:
##########
@@ -1,36 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!--

Review Comment:
   Please do not change this file, it was used for quick links to GitHub from IDEA.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #4849: [INLONG-4659][Sort] Support field routing for Elasticsearch connector

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #4849:
URL: https://github.com/apache/inlong/pull/4849#discussion_r915496300


##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ElasticsearchLoadNode.java:
##########
@@ -94,6 +97,7 @@ public ElasticsearchLoadNode(@JsonProperty("id") String id,
         this.index = Preconditions.checkNotNull(index, "index is null");
         this.documentType = documentType;
         this.primaryKey = primaryKey;
+        this.routingFieldName = primaryKey;

Review Comment:
   @Oneal65 User can not set it. Maybe `tableOptions` method can set defaut value for `routing.field-name`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #4849: [INLONG-4659][Sort] Support field routing for Elasticsearch connector

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #4849:
URL: https://github.com/apache/inlong/pull/4849#discussion_r914464624


##########
inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchOptions.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/**
+ * Options for {@link org.apache.flink.table.factories.DynamicTableSinkFactory} for Elasticsearch.
+ */
+public class ElasticsearchOptions {
+    /**
+     * Backoff strategy. Extends {@link ElasticsearchSinkBase.FlushBackoffType} with {@code
+     * DISABLED} option.
+     */
+    public enum BackOffType {
+        DISABLED,
+        CONSTANT,
+        EXPONENTIAL
+    }
+
+    public static final ConfigOption<List<String>> HOSTS_OPTION =
+            ConfigOptions.key("hosts")
+                    .stringType()
+                    .asList()
+                    .noDefaultValue()
+                    .withDescription("Elasticsearch hosts to connect to.");
+    public static final ConfigOption<String> INDEX_OPTION =
+            ConfigOptions.key("index")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Elasticsearch index for every record.");
+    public static final ConfigOption<String> DOCUMENT_TYPE_OPTION =
+            ConfigOptions.key("document-type")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Elasticsearch document type.");
+    public static final ConfigOption<String> PASSWORD_OPTION =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Password used to connect to Elasticsearch instance.");
+    public static final ConfigOption<String> USERNAME_OPTION =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Username used to connect to Elasticsearch instance.");
+    public static final ConfigOption<String> KEY_DELIMITER_OPTION =
+            ConfigOptions.key("document-id.key-delimiter")
+                    .stringType()
+                    .defaultValue("_")
+                    .withDescription(
+                            "Delimiter for composite keys e.g., \"$\" would result in IDs \"KEY1$KEY2$KEY3\".");
+
+    public static final ConfigOption<String> ROUTING_FIELD_NAME =
+            ConfigOptions.key("routing.field-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Elasticsearch routing filed.");
+
+    public static final ConfigOption<String> FAILURE_HANDLER_OPTION =
+            ConfigOptions.key("failure-handler")

Review Comment:
   Please add java doc for static method.



##########
inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Factory of {@link IndexGenerator}.
+ *
+ * <p>Flink supports both static index and dynamic index.
+ *
+ * <p>If you want to have a static index, this option value should be a plain string, e.g.
+ * 'myusers', all the records will be consistently written into "myusers" index.
+ *
+ * <p>If you want to have a dynamic index, you can use '{field_name}' to reference a field value in
+ * the record to dynamically generate a target index. You can also use
+ * '{field_name|date_format_string}' to convert a field value of TIMESTAMP/DATE/TIME type into the
+ * format specified by date_format_string. The date_format_string is compatible with {@link
+ * java.text.SimpleDateFormat}. For example, if the option value is 'myusers_{log_ts|yyyy-MM-dd}',
+ * then a record with log_ts field value 2020-03-27 12:25:55 will be written into
+ * "myusers_2020-03-27" index.
+ */
+public final class IndexGeneratorFactory {
+
+    private IndexGeneratorFactory() {
+
+    }
+
+    public static IndexGenerator createIndexGenerator(String index, TableSchema schema) {

Review Comment:
   Please add java doc



##########
inlong-sort/sort-connectors/elasticsearch-base/pom.xml:
##########
@@ -0,0 +1,180 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~  Licensed to the Apache Software Foundation (ASF) under one
+  ~  or more contributor license agreements.  See the NOTICE file
+  ~  distributed with this work for additional information
+  ~  regarding copyright ownership.  The ASF licenses this file
+  ~  to you under the Apache License, Version 2.0 (the
+  ~  "License"); you may not use this file except in compliance
+  ~  with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>sort-connectors</artifactId>
+        <groupId>org.apache.inlong</groupId>
+        <version>1.3.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>sort-connector-elasticsearch-base</artifactId>
+    <name>Apache InLong - Sort-connector-elasticsearch-base</name>
+    <packaging>jar</packaging>
+
+
+    <properties>
+        <elasticsearch.version>6.8.17</elasticsearch.version>
+    </properties>
+

Review Comment:
   Put it in the project's dependency version management.



##########
.idea/vcs.xml:
##########
@@ -33,4 +33,4 @@
       </list>
     </option>
   </component>
-</project>
+</project>

Review Comment:
   Your local format is not accurate.



##########
inlong-dashboard/package-lock.json:
##########
@@ -11486,7 +11486,7 @@
     "json2mq": {
       "version": "0.2.0",
       "resolved": "https://registry.npmjs.org/json2mq/-/json2mq-0.2.0.tgz",
-      "integrity": "sha512-SzoRg7ux5DWTII9J2qkrZrqV1gt+rTaoufMxEzXbS26Uid0NwaJd123HcoB80TgubEppxxIGdNxCx50fEoEWQA==",
+      "integrity": "sha1-tje9O6nqvhIsg+lyBIOusQ0skEo=",
       "requires": {
         "string-convert": "^0.2.0"
       }

Review Comment:
   Only the functionality of a single module should be included under the Sort module.



##########
inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Factory of {@link IndexGenerator}.
+ *
+ * <p>Flink supports both static index and dynamic index.
+ *
+ * <p>If you want to have a static index, this option value should be a plain string, e.g.
+ * 'myusers', all the records will be consistently written into "myusers" index.
+ *
+ * <p>If you want to have a dynamic index, you can use '{field_name}' to reference a field value in
+ * the record to dynamically generate a target index. You can also use
+ * '{field_name|date_format_string}' to convert a field value of TIMESTAMP/DATE/TIME type into the
+ * format specified by date_format_string. The date_format_string is compatible with {@link
+ * java.text.SimpleDateFormat}. For example, if the option value is 'myusers_{log_ts|yyyy-MM-dd}',
+ * then a record with log_ts field value 2020-03-27 12:25:55 will be written into
+ * "myusers_2020-03-27" index.
+ */
+public final class IndexGeneratorFactory {
+
+    private IndexGeneratorFactory() {
+
+    }
+
+    public static IndexGenerator createIndexGenerator(String index, TableSchema schema) {

Review Comment:
   Please add java doc



##########
inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.Period;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/** An extractor for a Elasticsearch key from a {@link RowData}. */
+public class KeyExtractor implements Function<RowData, String>, Serializable {
+    private final FieldFormatter[] fieldFormatters;
+    private final String keyDelimiter;
+
+    private interface FieldFormatter extends Serializable {
+        String format(RowData rowData);
+    }
+
+    private KeyExtractor(FieldFormatter[] fieldFormatters, String keyDelimiter) {
+        this.fieldFormatters = fieldFormatters;
+        this.keyDelimiter = keyDelimiter;
+    }
+
+    @Override
+    public String apply(RowData rowData) {
+        final StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < fieldFormatters.length; i++) {
+            if (i > 0) {
+                builder.append(keyDelimiter);
+            }
+            final String value = fieldFormatters[i].format(rowData);
+            builder.append(value);
+        }
+        return builder.toString();
+    }
+
+    private static class ColumnWithIndex {
+        public TableColumn column;
+        public int index;
+
+        public ColumnWithIndex(TableColumn column, int index) {
+            this.column = column;
+            this.index = index;
+        }
+
+        public LogicalType getType() {
+            return column.getType().getLogicalType();
+        }
+
+        public int getIndex() {
+            return index;
+        }
+    }
+
+    public static Function<RowData, String> createKeyExtractor(
+            TableSchema schema, String keyDelimiter) {

Review Comment:
   Please add java doc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Oneal65 commented on a diff in pull request #4849: [INLONG-4659][Sort] Support field routing for Elasticsearch connector

Posted by GitBox <gi...@apache.org>.
Oneal65 commented on code in PR #4849:
URL: https://github.com/apache/inlong/pull/4849#discussion_r915429076


##########
inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchOptions.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/**
+ * Options for {@link org.apache.flink.table.factories.DynamicTableSinkFactory} for Elasticsearch.
+ */
+public class ElasticsearchOptions {
+    /**
+     * Backoff strategy. Extends {@link ElasticsearchSinkBase.FlushBackoffType} with {@code
+     * DISABLED} option.
+     */
+    public enum BackOffType {
+        DISABLED,
+        CONSTANT,
+        EXPONENTIAL
+    }
+
+    public static final ConfigOption<List<String>> HOSTS_OPTION =
+            ConfigOptions.key("hosts")
+                    .stringType()
+                    .asList()
+                    .noDefaultValue()
+                    .withDescription("Elasticsearch hosts to connect to.");
+    public static final ConfigOption<String> INDEX_OPTION =
+            ConfigOptions.key("index")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Elasticsearch index for every record.");
+    public static final ConfigOption<String> DOCUMENT_TYPE_OPTION =
+            ConfigOptions.key("document-type")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Elasticsearch document type.");
+    public static final ConfigOption<String> PASSWORD_OPTION =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Password used to connect to Elasticsearch instance.");
+    public static final ConfigOption<String> USERNAME_OPTION =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Username used to connect to Elasticsearch instance.");
+    public static final ConfigOption<String> KEY_DELIMITER_OPTION =
+            ConfigOptions.key("document-id.key-delimiter")
+                    .stringType()
+                    .defaultValue("_")
+                    .withDescription(
+                            "Delimiter for composite keys e.g., \"$\" would result in IDs \"KEY1$KEY2$KEY3\".");
+
+    public static final ConfigOption<String> ROUTING_FIELD_NAME =
+            ConfigOptions.key("routing.field-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Elasticsearch routing filed.");
+
+    public static final ConfigOption<String> FAILURE_HANDLER_OPTION =
+            ConfigOptions.key("failure-handler")

Review Comment:
   This is the Flink source code. we should not rewrite it for diff code in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Oneal65 commented on a diff in pull request #4849: [INLONG-4659][Sort] Support field routing for Elasticsearch connector

Posted by GitBox <gi...@apache.org>.
Oneal65 commented on code in PR #4849:
URL: https://github.com/apache/inlong/pull/4849#discussion_r915429363


##########
inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Factory of {@link IndexGenerator}.
+ *
+ * <p>Flink supports both static index and dynamic index.
+ *
+ * <p>If you want to have a static index, this option value should be a plain string, e.g.
+ * 'myusers', all the records will be consistently written into "myusers" index.
+ *
+ * <p>If you want to have a dynamic index, you can use '{field_name}' to reference a field value in
+ * the record to dynamically generate a target index. You can also use
+ * '{field_name|date_format_string}' to convert a field value of TIMESTAMP/DATE/TIME type into the
+ * format specified by date_format_string. The date_format_string is compatible with {@link
+ * java.text.SimpleDateFormat}. For example, if the option value is 'myusers_{log_ts|yyyy-MM-dd}',
+ * then a record with log_ts field value 2020-03-27 12:25:55 will be written into
+ * "myusers_2020-03-27" index.
+ */
+public final class IndexGeneratorFactory {
+
+    private IndexGeneratorFactory() {
+
+    }
+
+    public static IndexGenerator createIndexGenerator(String index, TableSchema schema) {

Review Comment:
   the same reason.



##########
inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Factory of {@link IndexGenerator}.
+ *
+ * <p>Flink supports both static index and dynamic index.
+ *
+ * <p>If you want to have a static index, this option value should be a plain string, e.g.
+ * 'myusers', all the records will be consistently written into "myusers" index.
+ *
+ * <p>If you want to have a dynamic index, you can use '{field_name}' to reference a field value in
+ * the record to dynamically generate a target index. You can also use
+ * '{field_name|date_format_string}' to convert a field value of TIMESTAMP/DATE/TIME type into the
+ * format specified by date_format_string. The date_format_string is compatible with {@link
+ * java.text.SimpleDateFormat}. For example, if the option value is 'myusers_{log_ts|yyyy-MM-dd}',
+ * then a record with log_ts field value 2020-03-27 12:25:55 will be written into
+ * "myusers_2020-03-27" index.
+ */
+public final class IndexGeneratorFactory {
+
+    private IndexGeneratorFactory() {
+
+    }
+
+    public static IndexGenerator createIndexGenerator(String index, TableSchema schema) {

Review Comment:
   the same reason.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org