You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/26 01:58:08 UTC

[incubator-seatunnel] branch dev updated: [Improve][Console] improve console to printf schema and deepToString fields (#2517)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 963387d37 [Improve][Console] improve console to printf schema and deepToString fields (#2517)
963387d37 is described below

commit 963387d3750f9baa40815d5f7b1b3360f07641ca
Author: Laglangyue <35...@users.noreply.github.com>
AuthorDate: Fri Aug 26 09:58:02 2022 +0800

    [Improve][Console] improve console to printf schema and deepToString fields (#2517)
    
    * deep toString
    
    * checkstyle
    
    * fix some mistakes
    
    * delete judging
    
    * not print array,and print after fix the bug for array
---
 .../seatunnel/console/sink/ConsoleSinkWriter.java  | 33 ++++++++++++++++++----
 1 file changed, 28 insertions(+), 5 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index e226f1623..921aef06c 100644
--- a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -17,33 +17,56 @@
 
 package org.apache.seatunnel.connectors.seatunnel.console.sink;
 
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.commons.lang3.StringUtils;
 
 import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class ConsoleSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(ConsoleSinkWriter.class);
-
     private final SeaTunnelRowType seaTunnelRowType;
+    public static final AtomicLong CNT = new AtomicLong(0);
 
     public ConsoleSinkWriter(SeaTunnelRowType seaTunnelRowType) {
         this.seaTunnelRowType = seaTunnelRowType;
+        System.out.printf("fields : %s%n", StringUtils.join(seaTunnelRowType.getFieldNames(), ", "));
+        System.out.printf("types : %s%n", StringUtils.join(seaTunnelRowType.getFieldTypes(), ", "));
     }
 
     @Override
     @SuppressWarnings("checkstyle:RegexpSingleline")
     public void write(SeaTunnelRow element) {
-        System.out.println(Arrays.toString(element.getFields()));
+        String[] arr = new String[seaTunnelRowType.getTotalFields()];
+        SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+        Object[] fields = element.getFields();
+        for (int i = 0; i < fieldTypes.length; i++) {
+            arr[i] = fieldToString(fieldTypes[i], fields[i]);
+        }
+        System.out.format("row=%s : %s%n", CNT.incrementAndGet(), StringUtils.join(arr, ", "));
     }
 
     @Override
     public void close() {
         // nothing
     }
+
+    private String fieldToString(SeaTunnelDataType<?> type, Object value) {
+        switch (type.getSqlType()) {
+            case ARRAY:
+            case BYTES:
+                return Arrays.toString((Object[]) value);
+            case MAP:
+                return JsonUtils.toJsonString(value);
+            case ROW:
+                return fieldToString(type, value);
+            default:
+                return String.valueOf(value);
+        }
+    }
 }