You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/10/06 15:15:45 UTC
[incubator-seatunnel] branch dev updated: [Improve][connector][console] print subtask index (#3000)
This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new de345783d [Improve][connector][console] print subtask index (#3000)
de345783d is described below
commit de345783d910ea59fb4bd3b0103f7d8bd287ff95
Author: liugddx <80...@qq.com>
AuthorDate: Thu Oct 6 23:15:38 2022 +0800
[Improve][connector][console] print subtask index (#3000)
---
.../seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java | 2 +-
.../connectors/seatunnel/console/sink/ConsoleSinkWriter.java | 7 +++++--
.../connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java | 2 +-
3 files changed, 7 insertions(+), 4 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index fee2a1fb5..9613b40c8 100644
--- a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++ b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -47,7 +47,7 @@ public class ConsoleSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
@Override
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) {
- return new ConsoleSinkWriter(seaTunnelRowType);
+ return new ConsoleSinkWriter(seaTunnelRowType, context);
}
@Override
diff --git a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index c3ccbae72..88d0584e7 100644
--- a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.console.sink;
+import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -34,9 +35,11 @@ public class ConsoleSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
private final SeaTunnelRowType seaTunnelRowType;
public static final AtomicLong CNT = new AtomicLong(0);
+ public SinkWriter.Context context;
- public ConsoleSinkWriter(SeaTunnelRowType seaTunnelRowType) {
+ public ConsoleSinkWriter(SeaTunnelRowType seaTunnelRowType, SinkWriter.Context context) {
this.seaTunnelRowType = seaTunnelRowType;
+ this.context = context;
System.out.printf("fields : %s%n", StringUtils.join(seaTunnelRowType.getFieldNames(), ", "));
System.out.printf("types : %s%n", StringUtils.join(seaTunnelRowType.getFieldTypes(), ", "));
}
@@ -50,7 +53,7 @@ public class ConsoleSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
for (int i = 0; i < fieldTypes.length; i++) {
arr[i] = fieldToString(fieldTypes[i], fields[i]);
}
- System.out.format("row=%s : %s%n", CNT.incrementAndGet(), StringUtils.join(arr, ", "));
+ System.out.format("subtaskIndex=%s: row=%s : %s%n", context.getIndexOfSubtask(), CNT.incrementAndGet(), StringUtils.join(arr, ", "));
}
@Override
diff --git a/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java b/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java
index bfd2c38f7..71f8a5f69 100644
--- a/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java
+++ b/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java
@@ -47,7 +47,7 @@ public class ConsoleSinkWriterIT {
String[] fieldNames = {};
SeaTunnelDataType<?>[] fieldTypes = {};
SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(fieldNames, fieldTypes);
- consoleSinkWriter = new ConsoleSinkWriter(seaTunnelRowType);
+ consoleSinkWriter = new ConsoleSinkWriter(seaTunnelRowType, null);
}
private Object fieldToStringTest(SeaTunnelDataType<?> dataType, Object value) {