You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/10/21 06:24:33 UTC

[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #3146: [Improve][Connector-V2][ElasticSearch] Add index_id option to specify the primary keys for index

EricJoy2048 commented on code in PR #3146:
URL: https://github.com/apache/incubator-seatunnel/pull/3146#discussion_r1001405329


##########
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java:
##########
@@ -19,19 +19,25 @@
 
 import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
 
+import java.util.List;
+
 /**
  * index config by seatunnel
  */
 public class IndexInfo {
 
     private String index;
     private String type;
+    private List<String> ids;

Review Comment:
   suggest make ids final if it will never been update.



##########
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java:
##########
@@ -31,41 +38,56 @@
 import com.fasterxml.jackson.databind.SerializationFeature;
 import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * use in elasticsearch version >= 7.*
  */
 public class ElasticsearchRowSerializer implements SeaTunnelRowSerializer {
-    private final SeaTunnelRowType seaTunnelRowType;
     private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule())
-        .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+            .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
 
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final IndexInfo indexInfo;
     private final IndexSerializer indexSerializer;
-
     private final IndexTypeSerializer indexTypeSerializer;
+    private boolean hasKeys;
 
     public ElasticsearchRowSerializer(ElasticsearchVersion elasticsearchVersion, IndexInfo indexInfo, SeaTunnelRowType seaTunnelRowType) {
         this.indexTypeSerializer = IndexTypeSerializerFactory.getIndexTypeSerializer(elasticsearchVersion, indexInfo.getType());
         this.indexSerializer = IndexSerializerFactory.getIndexSerializer(indexInfo.getIndex(), seaTunnelRowType);
         this.seaTunnelRowType = seaTunnelRowType;
+        this.indexInfo = indexInfo;
+        this.hasKeys = indexInfo.getIds() != null && indexInfo.getIds().size() > 0 ? true : false;

Review Comment:
   suggest use `CollectionUtils.isEmpty(indexInfo)`



##########
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java:
##########
@@ -81,4 +103,28 @@ public String serializeRow(SeaTunnelRow row){
 
         return sb.toString();
     }
+
+    private String convertKey(SeaTunnelDataType<?> dataType, Object value) {
+        if (dataType instanceof BasicType
+                || dataType instanceof DecimalType
+                || dataType instanceof PrimitiveByteArrayType
+                || dataType instanceof LocalTimeType) {
+            return String.valueOf(value);
+        } else if (dataType instanceof MapType) {

Review Comment:
   You already `return` in the if case, So `else if` is redundant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org