You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/01/09 22:24:14 UTC
[iceberg] branch master updated: Flink 1.13: Fix SerializableTable with Kryo (#3857)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new f3a27b4 Flink 1.13: Fix SerializableTable with Kryo (#3857)
f3a27b4 is described below
commit f3a27b433ba46dcdfcb0ee2210f0baf5dc463744
Author: openinx <op...@gmail.com>
AuthorDate: Mon Jan 10 06:23:58 2022 +0800
Flink 1.13: Fix SerializableTable with Kryo (#3857)
---
.../java/org/apache/iceberg/SerializableTable.java | 8 +-
.../java/org/apache/iceberg/flink/TestHelpers.java | 14 +++
.../iceberg/flink/TestTableSerialization.java | 115 +++++++++++++++++++++
3 files changed, 134 insertions(+), 3 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java
index 492a900..403e2e3 100644
--- a/core/src/main/java/org/apache/iceberg/SerializableTable.java
+++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.SerializableSupplier;
/**
* A read-only serializable table that can be sent to other nodes in a cluster.
@@ -107,7 +108,7 @@ public class SerializableTable implements Table, Serializable {
private FileIO fileIO(Table table) {
if (table.io() instanceof HadoopConfigurable) {
- ((HadoopConfigurable) table.io()).serializeConfWith(conf -> new SerializableConfiguration(conf)::get);
+ ((HadoopConfigurable) table.io()).serializeConfWith(SerializableConfSupplier::new);
}
return table.io();
@@ -362,16 +363,17 @@ public class SerializableTable implements Table, Serializable {
}
// captures the current state of a Hadoop configuration in a serializable manner
- private static class SerializableConfiguration implements Serializable {
+ private static class SerializableConfSupplier implements SerializableSupplier<Configuration> {
private final Map<String, String> confAsMap;
private transient volatile Configuration conf = null;
- SerializableConfiguration(Configuration conf) {
+ SerializableConfSupplier(Configuration conf) {
this.confAsMap = Maps.newHashMapWithExpectedSize(conf.size());
conf.forEach(entry -> confAsMap.put(entry.getKey(), entry.getValue()));
}
+ @Override
public Configuration get() {
if (conf == null) {
synchronized (this) {
diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
index 367bab8..e2bf306 100644
--- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
+++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
@@ -33,7 +33,11 @@ import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.MapData;
@@ -67,6 +71,16 @@ public class TestHelpers {
private TestHelpers() {
}
+ public static <T> T roundTripKryoSerialize(Class<T> clazz, T table) throws IOException {
+ KryoSerializer<T> kryo = new KryoSerializer<>(clazz, new ExecutionConfig());
+
+ DataOutputSerializer outputView = new DataOutputSerializer(1024);
+ kryo.serialize(table, outputView);
+
+ DataInputDeserializer inputView = new DataInputDeserializer(outputView.getCopyOfBuffer());
+ return kryo.deserialize(inputView);
+ }
+
public static RowData copyRowData(RowData from, RowType rowType) {
TypeSerializer[] fieldSerializers = rowType.getChildren().stream()
.map((LogicalType type) -> InternalSerializers.create(type))
diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java
new file mode 100644
index 0000000..0d5d5a4
--- /dev/null
+++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.flink.TestHelpers.roundTripKryoSerialize;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestTableSerialization {
+ private static final HadoopTables TABLES = new HadoopTables();
+
+ private static final Schema SCHEMA = new Schema(
+ required(1, "id", Types.LongType.get()),
+ optional(2, "data", Types.StringType.get()),
+ required(3, "date", Types.StringType.get()),
+ optional(4, "double", Types.DoubleType.get()));
+
+ private static final PartitionSpec SPEC = PartitionSpec
+ .builderFor(SCHEMA)
+ .identity("date")
+ .build();
+
+ private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA)
+ .asc("id")
+ .build();
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+ private Table table;
+
+ @Before
+ public void initTable() throws IOException {
+ Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+
+ File tableLocation = temp.newFolder();
+ Assert.assertTrue(tableLocation.delete());
+
+ this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString());
+ }
+
+ @Test
+ public void testSerializableTableKryoSerialization() throws IOException {
+ SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table);
+ org.apache.iceberg.TestHelpers.assertSerializedAndLoadedMetadata(table,
+ roundTripKryoSerialize(SerializableTable.class, serializableTable));
+ }
+
+ @Test
+ public void testSerializableMetadataTableKryoSerialization() throws IOException {
+ for (MetadataTableType type : MetadataTableType.values()) {
+ TableOperations ops = ((HasTableOperations) table).operations();
+ Table metadataTable = MetadataTableUtils.createMetadataTableInstance(ops, table.name(), "meta", type);
+ SerializableTable serializableMetadataTable = (SerializableTable) SerializableTable.copyOf(metadataTable);
+
+ org.apache.iceberg.TestHelpers.assertSerializedAndLoadedMetadata(
+ metadataTable,
+ roundTripKryoSerialize(SerializableTable.class, serializableMetadataTable));
+ }
+ }
+
+ @Test
+ public void testSerializableTransactionTableKryoSerialization() throws IOException {
+ Transaction txn = table.newTransaction();
+
+ txn.updateProperties()
+ .set("k1", "v1")
+ .commit();
+
+ Table txnTable = txn.table();
+ SerializableTable serializableTxnTable = (SerializableTable) SerializableTable.copyOf(txnTable);
+
+ TestHelpers.assertSerializedMetadata(txnTable,
+ roundTripKryoSerialize(SerializableTable.class, serializableTxnTable));
+ }
+}