You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/09 06:52:59 UTC
[incubator-seatunnel] branch api-draft updated: Rename Row to SeaTunnelRow (#1832)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 609f5871 Rename Row to SeaTunnelRow (#1832)
609f5871 is described below
commit 609f587162a8970982e081bcfaa35c58bbf55a95
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Mon May 9 14:52:54 2022 +0800
Rename Row to SeaTunnelRow (#1832)
---
.../org/apache/seatunnel/api/table/type/Row.java | 24 --------
.../seatunnel/api/table/type/SeaTunnelRow.java | 69 ++++++++++++++++++++++
.../serialization/RowSerialization.java | 6 +-
.../flink/serialization/FlinkRowSerialization.java | 15 +++--
4 files changed, 82 insertions(+), 32 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Row.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Row.java
deleted file mode 100644
index f355cd52..00000000
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Row.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.table.type;
-
-import java.io.Serializable;
-
-public final class Row implements Serializable {
- private int tableId;
-}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
new file mode 100644
index 00000000..12ee5760
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.type;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * SeaTunnel row type.
+ */
+public final class SeaTunnelRow implements Serializable {
+ private static final long serialVersionUID = -1L;
+ private final int tableId;
+ // todo: add row kind
+ private final Object[] fields;
+ private final Map<String, Object> fieldMap;
+
+ public SeaTunnelRow(Object[] fields) {
+ this(fields, null);
+
+ }
+
+ public SeaTunnelRow(Object[] fields, Map<String, Object> fieldMap) {
+ this(fields, fieldMap, -1);
+ }
+
+ public SeaTunnelRow(Object[] fields, Map<String, Object> fieldMap, int tableId) {
+ this.fields = fields;
+ this.fieldMap = fieldMap;
+ this.tableId = tableId;
+ }
+
+ public int getTableId() {
+ return tableId;
+ }
+
+ public Object[] getFields() {
+ return fields;
+ }
+
+ public Map<String, Object> getFieldMap() {
+ return fieldMap;
+ }
+
+ @Override
+ public String toString() {
+ return "SeaTunnelRow{" +
+ "tableId=" + tableId +
+ ", fields=" + Arrays.toString(fields) +
+ ", fieldMap=" + fieldMap +
+ '}';
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowSerialization.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowSerialization.java
index 06bd52e9..253f369d 100644
--- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowSerialization.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowSerialization.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.translation.serialization;
-import org.apache.seatunnel.api.table.type.Row;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import java.io.IOException;
@@ -30,7 +30,7 @@ public interface RowSerialization<T> {
* @return The serialized data (bytes).
* @throws IOException Thrown, if the serialization fails.
*/
- T serialize(Row seaTunnelRow) throws IOException;
+ T serialize(SeaTunnelRow seaTunnelRow) throws IOException;
/**
* De-serializes the given data (bytes).
@@ -39,5 +39,5 @@ public interface RowSerialization<T> {
* @return The SeaTunnel Row
* @throws IOException Thrown, if the deserialization fails.
*/
- Row deserialize(T engineRow) throws IOException;
+ SeaTunnelRow deserialize(T engineRow) throws IOException;
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowSerialization.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowSerialization.java
index e5026108..b6f3867c 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowSerialization.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowSerialization.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.translation.flink.serialization;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.translation.serialization.RowSerialization;
import org.apache.flink.types.Row;
@@ -26,13 +27,17 @@ import java.io.IOException;
public class FlinkRowSerialization implements RowSerialization<Row> {
@Override
- public Row serialize(org.apache.seatunnel.api.table.type.Row seaTunnelRow) throws IOException {
-
- return null;
+ public Row serialize(SeaTunnelRow seaTunnelRow) throws IOException {
+ return Row.of(seaTunnelRow.getFields());
}
@Override
- public org.apache.seatunnel.api.table.type.Row deserialize(Row engineRow) throws IOException {
- return null;
+ public SeaTunnelRow deserialize(Row engineRow) throws IOException {
+ int arity = engineRow.getArity();
+ Object[] fields = new Object[arity];
+ for (int i = 0; i < arity; i++) {
+ fields[i] = engineRow.getField(i);
+ }
+ return new SeaTunnelRow(fields);
}
}