You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by cz...@apache.org on 2023/03/21 06:06:06 UTC

[incubator-paimon] branch master updated: [FLINK-31433] Make SchemaChange serializable

This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3179e184d [FLINK-31433] Make SchemaChange serializable
3179e184d is described below

commit 3179e184da24a09fa67e3bab6bbbcd9a338634e5
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Mar 21 14:06:01 2023 +0800

    [FLINK-31433] Make SchemaChange serializable
    
    This closes #671.
---
 .../org/apache/paimon/schema/SchemaChange.java     | 54 ++++++++++++++++++-
 .../flink/SchemaChangeSerializationTest.java       | 63 ++++++++++++++++++++++
 2 files changed, 115 insertions(+), 2 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
index 83dceab07..93061b2d3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
@@ -23,6 +23,7 @@ import org.apache.paimon.types.DataType;
 
 import javax.annotation.Nullable;
 
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Objects;
 
@@ -32,7 +33,7 @@ import java.util.Objects;
  * @since 0.4.0
  */
 @Experimental
-public interface SchemaChange {
+public interface SchemaChange extends Serializable {
 
     static SchemaChange setOption(String key, String value) {
         return new SetOption(key, value);
@@ -76,6 +77,9 @@ public interface SchemaChange {
 
     /** A SchemaChange to set a table option. */
     final class SetOption implements SchemaChange {
+
+        private static final long serialVersionUID = 1L;
+
         private final String key;
         private final String value;
 
@@ -112,6 +116,9 @@ public interface SchemaChange {
 
     /** A SchemaChange to remove a table option. */
     final class RemoveOption implements SchemaChange {
+
+        private static final long serialVersionUID = 1L;
+
         private final String key;
 
         private RemoveOption(String key) {
@@ -142,6 +149,9 @@ public interface SchemaChange {
 
     /** A SchemaChange to add a field. */
     final class AddColumn implements SchemaChange {
+
+        private static final long serialVersionUID = 1L;
+
         private final String fieldName;
         private final DataType dataType;
         private final String description;
@@ -198,6 +208,9 @@ public interface SchemaChange {
 
     /** A SchemaChange to rename a field. */
     final class RenameColumn implements SchemaChange {
+
+        private static final long serialVersionUID = 1L;
+
         private final String fieldName;
         private final String newName;
 
@@ -237,6 +250,9 @@ public interface SchemaChange {
 
     /** A SchemaChange to drop a field. */
     final class DropColumn implements SchemaChange {
+
+        private static final long serialVersionUID = 1L;
+
         private final String fieldName;
 
         private DropColumn(String fieldName) {
@@ -267,6 +283,9 @@ public interface SchemaChange {
 
     /** A SchemaChange to update the field type. */
     final class UpdateColumnType implements SchemaChange {
+
+        private static final long serialVersionUID = 1L;
+
         private final String fieldName;
         private final DataType newDataType;
 
@@ -306,6 +325,9 @@ public interface SchemaChange {
 
     /** A SchemaChange to update the field position. */
     final class UpdateColumnPosition implements SchemaChange {
+
+        private static final long serialVersionUID = 1L;
+
         private final Move move;
 
         private UpdateColumnPosition(Move move) {
@@ -335,7 +357,8 @@ public interface SchemaChange {
     }
 
     /** Represents a requested column move in a struct. */
-    class Move {
+    class Move implements Serializable {
+
         public enum MoveType {
             FIRST,
             AFTER
@@ -349,6 +372,8 @@ public interface SchemaChange {
             return new Move(fieldName, referenceFieldName, MoveType.AFTER);
         }
 
+        private static final long serialVersionUID = 1L;
+
         private final String fieldName;
         private final String referenceFieldName;
         private final MoveType type;
@@ -370,10 +395,32 @@ public interface SchemaChange {
         public MoveType type() {
             return type;
         }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            Move move = (Move) o;
+            return Objects.equals(fieldName, move.fieldName)
+                    && Objects.equals(referenceFieldName, move.referenceFieldName)
+                    && Objects.equals(type, move.type);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(fieldName, referenceFieldName, type);
+        }
     }
 
     /** A SchemaChange to update the (nested) field nullability. */
     final class UpdateColumnNullability implements SchemaChange {
+
+        private static final long serialVersionUID = 1L;
+
         private final String[] fieldNames;
         private final boolean newNullability;
 
@@ -413,6 +460,9 @@ public interface SchemaChange {
 
     /** A SchemaChange to update the (nested) field comment. */
     final class UpdateColumnComment implements SchemaChange {
+
+        private static final long serialVersionUID = 1L;
+
         private final String[] fieldNames;
         private final String newDescription;
 
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeSerializationTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeSerializationTest.java
new file mode 100644
index 000000000..4606bd0d0
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeSerializationTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for serializing {@link SchemaChange}. */
+public class SchemaChangeSerializationTest {
+
+    @Test
+    public void testSerialization() throws Exception {
+        runTest(SchemaChange.setOption("key", "value"));
+        runTest(SchemaChange.removeOption("key"));
+        runTest(
+                SchemaChange.addColumn(
+                        "col", DataTypes.INT(), "comment", SchemaChange.Move.first("col")));
+        runTest(SchemaChange.renameColumn("col", "new_col"));
+        runTest(SchemaChange.dropColumn("col"));
+        runTest(SchemaChange.updateColumnType("col", DataTypes.INT()));
+        runTest(SchemaChange.updateColumnNullability(new String[] {"col1", "col2"}, true));
+        runTest(SchemaChange.updateColumnComment(new String[] {"col1", "col2"}, "comment"));
+        runTest(SchemaChange.updateColumnPosition(SchemaChange.Move.after("col", "ref")));
+    }
+
+    private void runTest(SchemaChange schemaChange) throws Exception {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(schemaChange);
+        oos.close();
+        byte[] bytes = baos.toByteArray();
+
+        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+        ObjectInputStream ois = new ObjectInputStream(bais);
+        Object actual = ois.readObject();
+        assertThat(actual).isEqualTo(schemaChange);
+    }
+}