You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/09 06:52:59 UTC

[incubator-seatunnel] branch api-draft updated: Rename Row to SeaTunnelRow (#1832)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 609f5871 Rename Row to SeaTunnelRow (#1832)
609f5871 is described below

commit 609f587162a8970982e081bcfaa35c58bbf55a95
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Mon May 9 14:52:54 2022 +0800

    Rename Row to SeaTunnelRow (#1832)
---
 .../org/apache/seatunnel/api/table/type/Row.java   | 24 --------
 .../seatunnel/api/table/type/SeaTunnelRow.java     | 69 ++++++++++++++++++++++
 .../serialization/RowSerialization.java            |  6 +-
 .../flink/serialization/FlinkRowSerialization.java | 15 +++--
 4 files changed, 82 insertions(+), 32 deletions(-)

diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Row.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Row.java
deleted file mode 100644
index f355cd52..00000000
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Row.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.table.type;
-
-import java.io.Serializable;
-
-public final class Row implements Serializable {
-    private int tableId;
-}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
new file mode 100644
index 00000000..12ee5760
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.type;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * SeaTunnel row type.
+ */
+public final class SeaTunnelRow implements Serializable {
+    private static final long serialVersionUID = -1L;
+    private final int tableId;
+    // todo: add row kind
+    private final Object[] fields;
+    private final Map<String, Object> fieldMap;
+
+    public SeaTunnelRow(Object[] fields) {
+        this(fields, null);
+
+    }
+
+    public SeaTunnelRow(Object[] fields, Map<String, Object> fieldMap) {
+        this(fields, fieldMap, -1);
+    }
+
+    public SeaTunnelRow(Object[] fields, Map<String, Object> fieldMap, int tableId) {
+        this.fields = fields;
+        this.fieldMap = fieldMap;
+        this.tableId = tableId;
+    }
+
+    public int getTableId() {
+        return tableId;
+    }
+
+    public Object[] getFields() {
+        return fields;
+    }
+
+    public Map<String, Object> getFieldMap() {
+        return fieldMap;
+    }
+
+    @Override
+    public String toString() {
+        return "SeaTunnelRow{" +
+            "tableId=" + tableId +
+            ", fields=" + Arrays.toString(fields) +
+            ", fieldMap=" + fieldMap +
+            '}';
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowSerialization.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowSerialization.java
index 06bd52e9..253f369d 100644
--- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowSerialization.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowSerialization.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.translation.serialization;
 
-import org.apache.seatunnel.api.table.type.Row;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
 import java.io.IOException;
 
@@ -30,7 +30,7 @@ public interface RowSerialization<T> {
      * @return The serialized data (bytes).
      * @throws IOException Thrown, if the serialization fails.
      */
-    T serialize(Row seaTunnelRow) throws IOException;
+    T serialize(SeaTunnelRow seaTunnelRow) throws IOException;
 
     /**
      * De-serializes the given data (bytes).
@@ -39,5 +39,5 @@ public interface RowSerialization<T> {
      * @return The SeaTunnel Row
      * @throws IOException Thrown, if the deserialization fails.
      */
-    Row deserialize(T engineRow) throws IOException;
+    SeaTunnelRow deserialize(T engineRow) throws IOException;
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowSerialization.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowSerialization.java
index e5026108..b6f3867c 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowSerialization.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowSerialization.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.translation.flink.serialization;
 
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.translation.serialization.RowSerialization;
 
 import org.apache.flink.types.Row;
@@ -26,13 +27,17 @@ import java.io.IOException;
 public class FlinkRowSerialization implements RowSerialization<Row> {
 
     @Override
-    public Row serialize(org.apache.seatunnel.api.table.type.Row seaTunnelRow) throws IOException {
-
-        return null;
+    public Row serialize(SeaTunnelRow seaTunnelRow) throws IOException {
+        return Row.of(seaTunnelRow.getFields());
     }
 
     @Override
-    public org.apache.seatunnel.api.table.type.Row deserialize(Row engineRow) throws IOException {
-        return null;
+    public SeaTunnelRow deserialize(Row engineRow) throws IOException {
+        int arity = engineRow.getArity();
+        Object[] fields = new Object[arity];
+        for (int i = 0; i < arity; i++) {
+            fields[i] = engineRow.getField(i);
+        }
+        return new SeaTunnelRow(fields);
     }
 }