You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by cz...@apache.org on 2023/03/21 06:06:06 UTC
[incubator-paimon] branch master updated: [FLINK-31433] Make SchemaChange serializable
This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3179e184d [FLINK-31433] Make SchemaChange serializable
3179e184d is described below
commit 3179e184da24a09fa67e3bab6bbbcd9a338634e5
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Mar 21 14:06:01 2023 +0800
[FLINK-31433] Make SchemaChange serializable
This closes #671.
---
.../org/apache/paimon/schema/SchemaChange.java | 54 ++++++++++++++++++-
.../flink/SchemaChangeSerializationTest.java | 63 ++++++++++++++++++++++
2 files changed, 115 insertions(+), 2 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
index 83dceab07..93061b2d3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
@@ -23,6 +23,7 @@ import org.apache.paimon.types.DataType;
import javax.annotation.Nullable;
+import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;
@@ -32,7 +33,7 @@ import java.util.Objects;
* @since 0.4.0
*/
@Experimental
-public interface SchemaChange {
+public interface SchemaChange extends Serializable {
static SchemaChange setOption(String key, String value) {
return new SetOption(key, value);
@@ -76,6 +77,9 @@ public interface SchemaChange {
/** A SchemaChange to set a table option. */
final class SetOption implements SchemaChange {
+
+ private static final long serialVersionUID = 1L;
+
private final String key;
private final String value;
@@ -112,6 +116,9 @@ public interface SchemaChange {
/** A SchemaChange to remove a table option. */
final class RemoveOption implements SchemaChange {
+
+ private static final long serialVersionUID = 1L;
+
private final String key;
private RemoveOption(String key) {
@@ -142,6 +149,9 @@ public interface SchemaChange {
/** A SchemaChange to add a field. */
final class AddColumn implements SchemaChange {
+
+ private static final long serialVersionUID = 1L;
+
private final String fieldName;
private final DataType dataType;
private final String description;
@@ -198,6 +208,9 @@ public interface SchemaChange {
/** A SchemaChange to rename a field. */
final class RenameColumn implements SchemaChange {
+
+ private static final long serialVersionUID = 1L;
+
private final String fieldName;
private final String newName;
@@ -237,6 +250,9 @@ public interface SchemaChange {
/** A SchemaChange to drop a field. */
final class DropColumn implements SchemaChange {
+
+ private static final long serialVersionUID = 1L;
+
private final String fieldName;
private DropColumn(String fieldName) {
@@ -267,6 +283,9 @@ public interface SchemaChange {
/** A SchemaChange to update the field type. */
final class UpdateColumnType implements SchemaChange {
+
+ private static final long serialVersionUID = 1L;
+
private final String fieldName;
private final DataType newDataType;
@@ -306,6 +325,9 @@ public interface SchemaChange {
/** A SchemaChange to update the field position. */
final class UpdateColumnPosition implements SchemaChange {
+
+ private static final long serialVersionUID = 1L;
+
private final Move move;
private UpdateColumnPosition(Move move) {
@@ -335,7 +357,8 @@ public interface SchemaChange {
}
/** Represents a requested column move in a struct. */
- class Move {
+ class Move implements Serializable {
+
public enum MoveType {
FIRST,
AFTER
@@ -349,6 +372,8 @@ public interface SchemaChange {
return new Move(fieldName, referenceFieldName, MoveType.AFTER);
}
+ private static final long serialVersionUID = 1L;
+
private final String fieldName;
private final String referenceFieldName;
private final MoveType type;
@@ -370,10 +395,32 @@ public interface SchemaChange {
public MoveType type() {
return type;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Move move = (Move) o;
+ return Objects.equals(fieldName, move.fieldName)
+ && Objects.equals(referenceFieldName, move.referenceFieldName)
+ && Objects.equals(type, move.type);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(fieldName, referenceFieldName, type);
+ }
}
/** A SchemaChange to update the (nested) field nullability. */
final class UpdateColumnNullability implements SchemaChange {
+
+ private static final long serialVersionUID = 1L;
+
private final String[] fieldNames;
private final boolean newNullability;
@@ -413,6 +460,9 @@ public interface SchemaChange {
/** A SchemaChange to update the (nested) field comment. */
final class UpdateColumnComment implements SchemaChange {
+
+ private static final long serialVersionUID = 1L;
+
private final String[] fieldNames;
private final String newDescription;
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeSerializationTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeSerializationTest.java
new file mode 100644
index 000000000..4606bd0d0
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeSerializationTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for serializing {@link SchemaChange}. */
+public class SchemaChangeSerializationTest {
+
+ @Test
+ public void testSerialization() throws Exception {
+ runTest(SchemaChange.setOption("key", "value"));
+ runTest(SchemaChange.removeOption("key"));
+ runTest(
+ SchemaChange.addColumn(
+ "col", DataTypes.INT(), "comment", SchemaChange.Move.first("col")));
+ runTest(SchemaChange.renameColumn("col", "new_col"));
+ runTest(SchemaChange.dropColumn("col"));
+ runTest(SchemaChange.updateColumnType("col", DataTypes.INT()));
+ runTest(SchemaChange.updateColumnNullability(new String[] {"col1", "col2"}, true));
+ runTest(SchemaChange.updateColumnComment(new String[] {"col1", "col2"}, "comment"));
+ runTest(SchemaChange.updateColumnPosition(SchemaChange.Move.after("col", "ref")));
+ }
+
+ private void runTest(SchemaChange schemaChange) throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(schemaChange);
+ oos.close();
+ byte[] bytes = baos.toByteArray();
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bais);
+ Object actual = ois.readObject();
+ assertThat(actual).isEqualTo(schemaChange);
+ }
+}