You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/10/06 15:15:45 UTC

[incubator-seatunnel] branch dev updated: [Improve][connector][console] print subtask index (#3000)

This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new de345783d [Improve][connector][console] print subtask index (#3000)
de345783d is described below

commit de345783d910ea59fb4bd3b0103f7d8bd287ff95
Author: liugddx <80...@qq.com>
AuthorDate: Thu Oct 6 23:15:38 2022 +0800

    [Improve][connector][console] print subtask index (#3000)
---
 .../seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java   | 2 +-
 .../connectors/seatunnel/console/sink/ConsoleSinkWriter.java       | 7 +++++--
 .../connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java     | 2 +-
 3 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index fee2a1fb5..9613b40c8 100644
--- a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++ b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -47,7 +47,7 @@ public class ConsoleSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
 
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) {
-        return new ConsoleSinkWriter(seaTunnelRowType);
+        return new ConsoleSinkWriter(seaTunnelRowType, context);
     }
 
     @Override
diff --git a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index c3ccbae72..88d0584e7 100644
--- a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.console.sink;
 
+import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -34,9 +35,11 @@ public class ConsoleSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
 
     private final SeaTunnelRowType seaTunnelRowType;
     public static final AtomicLong CNT = new AtomicLong(0);
+    public SinkWriter.Context context;
 
-    public ConsoleSinkWriter(SeaTunnelRowType seaTunnelRowType) {
+    public ConsoleSinkWriter(SeaTunnelRowType seaTunnelRowType, SinkWriter.Context context) {
         this.seaTunnelRowType = seaTunnelRowType;
+        this.context = context;
         System.out.printf("fields : %s%n", StringUtils.join(seaTunnelRowType.getFieldNames(), ", "));
         System.out.printf("types : %s%n", StringUtils.join(seaTunnelRowType.getFieldTypes(), ", "));
     }
@@ -50,7 +53,7 @@ public class ConsoleSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
         for (int i = 0; i < fieldTypes.length; i++) {
             arr[i] = fieldToString(fieldTypes[i], fields[i]);
         }
-        System.out.format("row=%s : %s%n", CNT.incrementAndGet(), StringUtils.join(arr, ", "));
+        System.out.format("subtaskIndex=%s: row=%s : %s%n", context.getIndexOfSubtask(), CNT.incrementAndGet(), StringUtils.join(arr, ", "));
     }
 
     @Override
diff --git a/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java b/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java
index bfd2c38f7..71f8a5f69 100644
--- a/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java
+++ b/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java
@@ -47,7 +47,7 @@ public class ConsoleSinkWriterIT {
         String[] fieldNames = {};
         SeaTunnelDataType<?>[] fieldTypes = {};
         SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(fieldNames, fieldTypes);
-        consoleSinkWriter = new ConsoleSinkWriter(seaTunnelRowType);
+        consoleSinkWriter = new ConsoleSinkWriter(seaTunnelRowType, null);
     }
 
     private Object fieldToStringTest(SeaTunnelDataType<?> dataType, Object value) {