You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/26 01:58:08 UTC
[incubator-seatunnel] branch dev updated: [Improve][Console] improve console to printf schema and deepToString fields (#2517)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 963387d37 [Improve][Console] improve console to printf schema and deepToString fields (#2517)
963387d37 is described below
commit 963387d3750f9baa40815d5f7b1b3360f07641ca
Author: Laglangyue <35...@users.noreply.github.com>
AuthorDate: Fri Aug 26 09:58:02 2022 +0800
[Improve][Console] improve console to printf schema and deepToString fields (#2517)
* deep toString
* checkstyle
* fix some mistakes
* delete judging
* not print array,and print after fix the bug for array
---
.../seatunnel/console/sink/ConsoleSinkWriter.java | 33 ++++++++++++++++++----
1 file changed, 28 insertions(+), 5 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index e226f1623..921aef06c 100644
--- a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -17,33 +17,56 @@
package org.apache.seatunnel.connectors.seatunnel.console.sink;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.commons.lang3.StringUtils;
import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicLong;
public class ConsoleSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
- private static final Logger LOGGER = LoggerFactory.getLogger(ConsoleSinkWriter.class);
-
private final SeaTunnelRowType seaTunnelRowType;
+ public static final AtomicLong CNT = new AtomicLong(0);
public ConsoleSinkWriter(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
+ System.out.printf("fields : %s%n", StringUtils.join(seaTunnelRowType.getFieldNames(), ", "));
+ System.out.printf("types : %s%n", StringUtils.join(seaTunnelRowType.getFieldTypes(), ", "));
}
@Override
@SuppressWarnings("checkstyle:RegexpSingleline")
public void write(SeaTunnelRow element) {
- System.out.println(Arrays.toString(element.getFields()));
+ String[] arr = new String[seaTunnelRowType.getTotalFields()];
+ SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+ Object[] fields = element.getFields();
+ for (int i = 0; i < fieldTypes.length; i++) {
+ arr[i] = fieldToString(fieldTypes[i], fields[i]);
+ }
+ System.out.format("row=%s : %s%n", CNT.incrementAndGet(), StringUtils.join(arr, ", "));
}
@Override
public void close() {
// nothing
}
+
+ private String fieldToString(SeaTunnelDataType<?> type, Object value) {
+ switch (type.getSqlType()) {
+ case ARRAY:
+ case BYTES:
+ return Arrays.toString((Object[]) value);
+ case MAP:
+ return JsonUtils.toJsonString(value);
+ case ROW:
+ return fieldToString(type, value);
+ default:
+ return String.valueOf(value);
+ }
+ }
}