You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/01/09 22:24:14 UTC

[iceberg] branch master updated: Flink 1.13: Fix SerializableTable with Kryo (#3857)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new f3a27b4  Flink 1.13: Fix SerializableTable with Kryo (#3857)
f3a27b4 is described below

commit f3a27b433ba46dcdfcb0ee2210f0baf5dc463744
Author: openinx <op...@gmail.com>
AuthorDate: Mon Jan 10 06:23:58 2022 +0800

    Flink 1.13: Fix SerializableTable with Kryo (#3857)
---
 .../java/org/apache/iceberg/SerializableTable.java |   8 +-
 .../java/org/apache/iceberg/flink/TestHelpers.java |  14 +++
 .../iceberg/flink/TestTableSerialization.java      | 115 +++++++++++++++++++++
 3 files changed, 134 insertions(+), 3 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java
index 492a900..403e2e3 100644
--- a/core/src/main/java/org/apache/iceberg/SerializableTable.java
+++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.SerializableSupplier;
 
 /**
  * A read-only serializable table that can be sent to other nodes in a cluster.
@@ -107,7 +108,7 @@ public class SerializableTable implements Table, Serializable {
 
   private FileIO fileIO(Table table) {
     if (table.io() instanceof HadoopConfigurable) {
-      ((HadoopConfigurable) table.io()).serializeConfWith(conf -> new SerializableConfiguration(conf)::get);
+      ((HadoopConfigurable) table.io()).serializeConfWith(SerializableConfSupplier::new);
     }
 
     return table.io();
@@ -362,16 +363,17 @@ public class SerializableTable implements Table, Serializable {
   }
 
   // captures the current state of a Hadoop configuration in a serializable manner
-  private static class SerializableConfiguration implements Serializable {
+  private static class SerializableConfSupplier implements SerializableSupplier<Configuration> {
 
     private final Map<String, String> confAsMap;
     private transient volatile Configuration conf = null;
 
-    SerializableConfiguration(Configuration conf) {
+    SerializableConfSupplier(Configuration conf) {
       this.confAsMap = Maps.newHashMapWithExpectedSize(conf.size());
       conf.forEach(entry -> confAsMap.put(entry.getKey(), entry.getValue()));
     }
 
+    @Override
     public Configuration get() {
       if (conf == null) {
         synchronized (this) {
diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
index 367bab8..e2bf306 100644
--- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
+++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
@@ -33,7 +33,11 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.MapData;
@@ -67,6 +71,16 @@ public class TestHelpers {
   private TestHelpers() {
   }
 
+  public static <T> T roundTripKryoSerialize(Class<T> clazz, T table) throws IOException {
+    KryoSerializer<T> kryo = new KryoSerializer<>(clazz, new ExecutionConfig());
+
+    DataOutputSerializer outputView = new DataOutputSerializer(1024);
+    kryo.serialize(table, outputView);
+
+    DataInputDeserializer inputView = new DataInputDeserializer(outputView.getCopyOfBuffer());
+    return kryo.deserialize(inputView);
+  }
+
   public static RowData copyRowData(RowData from, RowType rowType) {
     TypeSerializer[] fieldSerializers = rowType.getChildren().stream()
         .map((LogicalType type) -> InternalSerializers.create(type))
diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java
new file mode 100644
index 0000000..0d5d5a4
--- /dev/null
+++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.flink.TestHelpers.roundTripKryoSerialize;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestTableSerialization {
+  private static final HadoopTables TABLES = new HadoopTables();
+
+  private static final Schema SCHEMA = new Schema(
+      required(1, "id", Types.LongType.get()),
+      optional(2, "data", Types.StringType.get()),
+      required(3, "date", Types.StringType.get()),
+      optional(4, "double", Types.DoubleType.get()));
+
+  private static final PartitionSpec SPEC = PartitionSpec
+      .builderFor(SCHEMA)
+      .identity("date")
+      .build();
+
+  private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA)
+      .asc("id")
+      .build();
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private Table table;
+
+  @Before
+  public void initTable() throws IOException {
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+
+    File tableLocation = temp.newFolder();
+    Assert.assertTrue(tableLocation.delete());
+
+    this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString());
+  }
+
+  @Test
+  public void testSerializableTableKryoSerialization() throws IOException {
+    SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table);
+    org.apache.iceberg.TestHelpers.assertSerializedAndLoadedMetadata(table,
+        roundTripKryoSerialize(SerializableTable.class, serializableTable));
+  }
+
+  @Test
+  public void testSerializableMetadataTableKryoSerialization() throws IOException {
+    for (MetadataTableType type : MetadataTableType.values()) {
+      TableOperations ops = ((HasTableOperations) table).operations();
+      Table metadataTable = MetadataTableUtils.createMetadataTableInstance(ops, table.name(), "meta", type);
+      SerializableTable serializableMetadataTable = (SerializableTable) SerializableTable.copyOf(metadataTable);
+
+      org.apache.iceberg.TestHelpers.assertSerializedAndLoadedMetadata(
+          metadataTable,
+          roundTripKryoSerialize(SerializableTable.class, serializableMetadataTable));
+    }
+  }
+
+  @Test
+  public void testSerializableTransactionTableKryoSerialization() throws IOException {
+    Transaction txn = table.newTransaction();
+
+    txn.updateProperties()
+        .set("k1", "v1")
+        .commit();
+
+    Table txnTable = txn.table();
+    SerializableTable serializableTxnTable = (SerializableTable) SerializableTable.copyOf(txnTable);
+
+    TestHelpers.assertSerializedMetadata(txnTable,
+        roundTripKryoSerialize(SerializableTable.class, serializableTxnTable));
+  }
+}