You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/01/07 06:55:11 UTC

[GitHub] [iceberg] openinx opened a new pull request #3857: Flink 1.13: Add SerializableTable test cases.

openinx opened a new pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857


   The `SerializableTable`  cannot be kyro serializable because it will encounter the anonymous lambda registration issues. See the following stacktrace: 
   
   ```java
   com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
   Serialization trace:
   hadoopConf (org.apache.iceberg.hadoop.HadoopFileIO)
   io (org.apache.iceberg.SerializableTable)
   
   	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
   	at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
   	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
   	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
   	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:316)
   	at org.apache.iceberg.flink.TestHelpers.roundTripKryoSerialize(TestHelpers.java:78)
   	at org.apache.iceberg.flink.TestTableSerialization.testSerializableTableKryoSerialization(TestTableSerialization.java:85)
   Caused by: java.lang.NullPointerException
   	at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80)
   	at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
   	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:57)
   	... 35 more
   ```
   
   ![image](https://user-images.githubusercontent.com/5028729/148504196-44c15a99-8c98-4691-a634-f20ce3bba2e0.png)
   
   This patch is trying to expand the anonymous lambda to a deterministic defined class to solve this serialize issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#issuecomment-1007971517


   > @openinx I'm assuming we would need a similar solution on the Hive side for this lamba? https://github.com/apache/iceberg/blob/master/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java#L209
   
   Will apache hive use the kryo library to serialize & deserialize the table ? If sure, then I think it is also a bug that we hive need to fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick edited a comment on pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
kbendick edited a comment on pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#issuecomment-1007212455






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#discussion_r780663479



##########
File path: flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
##########
@@ -67,6 +71,16 @@
   private TestHelpers() {
   }
 
+  public static <T> T roundTripKryoSerialize(Class<T> clazz, T table) throws IOException {

Review comment:
       I put this round trip kryo serialize method in this flink TestHelpers, because I think other object that needs to validate kyro issues can also share this method.  In the previous `TestManifestFileSerialization` for example,  it's good to share this method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#discussion_r780839138



##########
File path: core/src/main/java/org/apache/iceberg/SerializableTable.java
##########
@@ -107,7 +108,7 @@ private String metadataFileLocation(Table table) {
 
   private FileIO fileIO(Table table) {
     if (table.io() instanceof HadoopConfigurable) {
-      ((HadoopConfigurable) table.io()).serializeConfWith(conf -> new SerializableConfiguration(conf)::get);
+      ((HadoopConfigurable) table.io()).serializeConfWith(SerializableConfSupplier::new);

Review comment:
       What's happening is that the lambda is creating a `SerializableConfiguration` and returning its `get` method as a lambda that implements `SerializableSupplier`. In the new version, the lambda is removed and the returned object is a `SerializableSupplier`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#discussion_r780663579



##########
File path: core/src/main/java/org/apache/iceberg/SerializableTable.java
##########
@@ -361,6 +362,19 @@ protected Table newTable(TableOperations ops, String tableName) {
     }
   }
 
+  private static class SerializableConfSupplier implements SerializableSupplier<Configuration> {

Review comment:
       Yes, that make sense !




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#issuecomment-1008435722


   Thanks, @openinx! Do you want to port this to Flink 1.12 and 1.14 as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#discussion_r780827423



##########
File path: core/src/main/java/org/apache/iceberg/SerializableTable.java
##########
@@ -107,7 +108,7 @@ private String metadataFileLocation(Table table) {
 
   private FileIO fileIO(Table table) {
     if (table.io() instanceof HadoopConfigurable) {
-      ((HadoopConfigurable) table.io()).serializeConfWith(conf -> new SerializableConfiguration(conf)::get);
+      ((HadoopConfigurable) table.io()).serializeConfWith(SerializableConfSupplier::new);

Review comment:
       I’m surprised we don’t have to update more code to call `.get` now. I remember that the reason for adding `::get`to `serializeConfWith` was to avoid users needing to call `.get` depending on the place.
   
   Since things are all passing, I’m guessing not?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#discussion_r780664600



##########
File path: flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.flink.TestHelpers.roundTripKryoSerialize;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestTableSerialization {
+  private static final HadoopTables TABLES = new HadoopTables();
+
+  private static final Schema SCHEMA = new Schema(
+      required(1, "id", Types.LongType.get()),
+      optional(2, "data", Types.StringType.get()),
+      required(3, "date", Types.StringType.get()),
+      optional(4, "double", Types.DoubleType.get()));
+
+  private static final PartitionSpec SPEC = PartitionSpec
+      .builderFor(SCHEMA)
+      .identity("date")
+      .build();
+
+  private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA)
+      .asc("id")
+      .build();
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private Table table;
+
+  @Before
+  public void initTable() throws IOException {
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+
+    File tableLocation = temp.newFolder();
+    Assert.assertTrue(tableLocation.delete());
+
+    this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString());
+  }
+
+  @Test
+  public void testSerializableTableKryoSerialization() throws IOException {
+    SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table);
+    org.apache.iceberg.TestHelpers.assertSerializedAndLoadedMetadata(table,
+        roundTripKryoSerialize(SerializableTable.class, serializableTable));
+  }
+
+  @Test
+  public void testSerializableMetadataTableKryoSerialization() throws IOException {
+    for (MetadataTableType type : MetadataTableType.values()) {
+      TableOperations ops = ((HasTableOperations) table).operations();
+      Table metadataTable = MetadataTableUtils.createMetadataTableInstance(ops, table.name(), "meta", type);
+      SerializableTable serializableMetadataTable = (SerializableTable) SerializableTable.copyOf(metadataTable);
+
+      org.apache.iceberg.TestHelpers.assertSerializedAndLoadedMetadata(
+          metadataTable,
+          roundTripKryoSerialize(SerializableTable.class, serializableMetadataTable));
+    }
+  }
+
+  @Test
+  public void testSerializableTransactionTableKryoSerialization() throws IOException {
+    Transaction txn = table.newTransaction();
+
+    txn.updateProperties()
+        .set("k1", "v1")
+        .commit();
+
+    Table txnTable = txn.table();
+    SerializableTable serializableTxnTable = (SerializableTable) SerializableTable.copyOf(txnTable);

Review comment:
       As the `serializableTxnTable` is an instance of `SerializableTable` now,  the commit operation has been disabled for that serializable table.  so we don't need to do the extra commit check now.  Thanks @stevenzwu for the comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
kbendick commented on pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#issuecomment-1007212455


   Thanks @openinx 
   
   The SerializableSupplier is written as is to avoid calls to get on the user-side, but if that's prohibiting Kryo usage then I don't think it's a big deal. We should likely consider being consistent across versions.
   
   Is this due to Flink 1.13 only (e.g. fixed in 1.14) or did you just choose to implement in 1.13 for another reason?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#discussion_r780398650



##########
File path: flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.flink.TestHelpers.roundTripKryoSerialize;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestTableSerialization {
+  private static final HadoopTables TABLES = new HadoopTables();
+
+  private static final Schema SCHEMA = new Schema(
+      required(1, "id", Types.LongType.get()),
+      optional(2, "data", Types.StringType.get()),
+      required(3, "date", Types.StringType.get()),
+      optional(4, "double", Types.DoubleType.get()));
+
+  private static final PartitionSpec SPEC = PartitionSpec
+      .builderFor(SCHEMA)
+      .identity("date")
+      .build();
+
+  private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA)
+      .asc("id")
+      .build();
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private Table table;
+
+  @Before
+  public void initTable() throws IOException {
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+
+    File tableLocation = temp.newFolder();
+    Assert.assertTrue(tableLocation.delete());
+
+    this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString());
+  }
+
+  @Test
+  public void testSerializableTableKryoSerialization() throws IOException {
+    SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table);
+    org.apache.iceberg.TestHelpers.assertSerializedAndLoadedMetadata(table,

Review comment:
       import TestHelpers




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#discussion_r780395042



##########
File path: flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
##########
@@ -67,6 +71,16 @@
   private TestHelpers() {
   }
 
+  public static <T> T roundTripKryoSerialize(Class<T> clazz, T table) throws IOException {

Review comment:
       I am wondering if this should stay in `TestTableSerialization` class only.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#discussion_r780826280



##########
File path: flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.flink.TestHelpers.roundTripKryoSerialize;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestTableSerialization {
+  private static final HadoopTables TABLES = new HadoopTables();
+
+  private static final Schema SCHEMA = new Schema(
+      required(1, "id", Types.LongType.get()),
+      optional(2, "data", Types.StringType.get()),
+      required(3, "date", Types.StringType.get()),
+      optional(4, "double", Types.DoubleType.get()));
+
+  private static final PartitionSpec SPEC = PartitionSpec
+      .builderFor(SCHEMA)
+      .identity("date")
+      .build();
+
+  private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA)
+      .asc("id")
+      .build();
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private Table table;
+
+  @Before
+  public void initTable() throws IOException {
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+
+    File tableLocation = temp.newFolder();
+    Assert.assertTrue(tableLocation.delete());
+
+    this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString());
+  }
+
+  @Test
+  public void testSerializableTableKryoSerialization() throws IOException {
+    SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table);
+    org.apache.iceberg.TestHelpers.assertSerializedAndLoadedMetadata(table,

Review comment:
       Oh that’s a bummer.
   
   Question: in a follow up PR, would it be possible to change one of the `TestHelpers` name or to start using a utility class with a different name in Flink so that we don’t have the conflict? Not necessary for this PR but I could open an issue to look at it later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#discussion_r780398209



##########
File path: core/src/main/java/org/apache/iceberg/SerializableTable.java
##########
@@ -361,6 +362,19 @@ protected Table newTable(TableOperations ops, String tableName) {
     }
   }
 
+  private static class SerializableConfSupplier implements SerializableSupplier<Configuration> {

Review comment:
       maybe we can merge the nested `SerializableConfiguration` class code into this supplier class?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick edited a comment on pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
kbendick edited a comment on pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#issuecomment-1007212455


   Thanks @openinx 
   
   _Some questions for more context:_
   
   Is this due to Flink 1.13 only (e.g. fixed / not occurring in 1.14) or did you just choose to implement in 1.13 for another reason?
   
   _Personal thoughts / comments:_
   
   The SerializableSupplier is written as is to avoid calls to get on the user-side, but if that's prohibiting Kryo usage then I agree that it should be changed if need be.
   
   We should likely consider updating the usage of SerializableSupplier to be consistent across versions eventually, even if it only affects one version, for ease of backporting etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick edited a comment on pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
kbendick edited a comment on pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#issuecomment-1007212455


   Thanks @openinx 
   
   Some questions for more context:
   
   Is this due to Flink 1.13 only (e.g. fixed in 1.14) or did you just choose to implement in 1.13 for another reason?
   
   Personal thoughts / comments:
   
   The SerializableSupplier is written as is to avoid calls to get on the user-side, but if that's prohibiting Kryo usage then I agree that it should be changed if need be.
   
   We should likely consider updating the usage of SerializableSupplier to be consistent across versions eventually, even if it only affects one version, for ease of backporting etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#discussion_r780395042



##########
File path: flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
##########
@@ -67,6 +71,16 @@
   private TestHelpers() {
   }
 
+  public static <T> T roundTripKryoSerialize(Class<T> clazz, T table) throws IOException {

Review comment:
       I am wondering if this should stay in `TestTableSerialization` class only.

##########
File path: core/src/main/java/org/apache/iceberg/SerializableTable.java
##########
@@ -361,6 +362,19 @@ protected Table newTable(TableOperations ops, String tableName) {
     }
   }
 
+  private static class SerializableConfSupplier implements SerializableSupplier<Configuration> {

Review comment:
       maybe we can merge the nested `SerializableConfiguration` class code into this supplier class?

##########
File path: flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.flink.TestHelpers.roundTripKryoSerialize;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestTableSerialization {
+  private static final HadoopTables TABLES = new HadoopTables();
+
+  private static final Schema SCHEMA = new Schema(
+      required(1, "id", Types.LongType.get()),
+      optional(2, "data", Types.StringType.get()),
+      required(3, "date", Types.StringType.get()),
+      optional(4, "double", Types.DoubleType.get()));
+
+  private static final PartitionSpec SPEC = PartitionSpec
+      .builderFor(SCHEMA)
+      .identity("date")
+      .build();
+
+  private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA)
+      .asc("id")
+      .build();
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private Table table;
+
+  @Before
+  public void initTable() throws IOException {
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+
+    File tableLocation = temp.newFolder();
+    Assert.assertTrue(tableLocation.delete());
+
+    this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString());
+  }
+
+  @Test
+  public void testSerializableTableKryoSerialization() throws IOException {
+    SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table);
+    org.apache.iceberg.TestHelpers.assertSerializedAndLoadedMetadata(table,

Review comment:
       import TestHelpers

##########
File path: flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.flink.TestHelpers.roundTripKryoSerialize;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestTableSerialization {
+  private static final HadoopTables TABLES = new HadoopTables();
+
+  private static final Schema SCHEMA = new Schema(
+      required(1, "id", Types.LongType.get()),
+      optional(2, "data", Types.StringType.get()),
+      required(3, "date", Types.StringType.get()),
+      optional(4, "double", Types.DoubleType.get()));
+
+  private static final PartitionSpec SPEC = PartitionSpec
+      .builderFor(SCHEMA)
+      .identity("date")
+      .build();
+
+  private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA)
+      .asc("id")
+      .build();
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private Table table;
+
+  @Before
+  public void initTable() throws IOException {
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+
+    File tableLocation = temp.newFolder();
+    Assert.assertTrue(tableLocation.delete());
+
+    this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString());
+  }
+
+  @Test
+  public void testSerializableTableKryoSerialization() throws IOException {
+    SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table);
+    org.apache.iceberg.TestHelpers.assertSerializedAndLoadedMetadata(table,
+        roundTripKryoSerialize(SerializableTable.class, serializableTable));
+  }
+
+  @Test
+  public void testSerializableMetadataTableKryoSerialization() throws IOException {
+    for (MetadataTableType type : MetadataTableType.values()) {
+      TableOperations ops = ((HasTableOperations) table).operations();
+      Table metadataTable = MetadataTableUtils.createMetadataTableInstance(ops, table.name(), "meta", type);
+      SerializableTable serializableMetadataTable = (SerializableTable) SerializableTable.copyOf(metadataTable);
+
+      org.apache.iceberg.TestHelpers.assertSerializedAndLoadedMetadata(
+          metadataTable,
+          roundTripKryoSerialize(SerializableTable.class, serializableMetadataTable));
+    }
+  }
+
+  @Test
+  public void testSerializableTransactionTableKryoSerialization() throws IOException {
+    Transaction txn = table.newTransaction();
+
+    txn.updateProperties()
+        .set("k1", "v1")
+        .commit();
+
+    Table txnTable = txn.table();
+    SerializableTable serializableTxnTable = (SerializableTable) SerializableTable.copyOf(txnTable);

Review comment:
       should we add test that the `serializableTxnTable` can perform commit operation?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#discussion_r780826375



##########
File path: flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.flink.TestHelpers.roundTripKryoSerialize;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestTableSerialization {
+  private static final HadoopTables TABLES = new HadoopTables();
+
+  private static final Schema SCHEMA = new Schema(
+      required(1, "id", Types.LongType.get()),
+      optional(2, "data", Types.StringType.get()),
+      required(3, "date", Types.StringType.get()),
+      optional(4, "double", Types.DoubleType.get()));
+
+  private static final PartitionSpec SPEC = PartitionSpec
+      .builderFor(SCHEMA)
+      .identity("date")
+      .build();
+
+  private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA)
+      .asc("id")
+      .build();
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private Table table;
+
+  @Before
+  public void initTable() throws IOException {
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+
+    File tableLocation = temp.newFolder();
+    Assert.assertTrue(tableLocation.delete());
+
+    this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString());
+  }
+
+  @Test
+  public void testSerializableTableKryoSerialization() throws IOException {
+    SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table);
+    org.apache.iceberg.TestHelpers.assertSerializedAndLoadedMetadata(table,
+        roundTripKryoSerialize(SerializableTable.class, serializableTable));
+  }
+
+  @Test
+  public void testSerializableMetadataTableKryoSerialization() throws IOException {
+    for (MetadataTableType type : MetadataTableType.values()) {
+      TableOperations ops = ((HasTableOperations) table).operations();
+      Table metadataTable = MetadataTableUtils.createMetadataTableInstance(ops, table.name(), "meta", type);
+      SerializableTable serializableMetadataTable = (SerializableTable) SerializableTable.copyOf(metadataTable);
+
+      org.apache.iceberg.TestHelpers.assertSerializedAndLoadedMetadata(
+          metadataTable,
+          roundTripKryoSerialize(SerializableTable.class, serializableMetadataTable));
+    }
+  }
+
+  @Test
+  public void testSerializableTransactionTableKryoSerialization() throws IOException {
+    Transaction txn = table.newTransaction();
+
+    txn.updateProperties()
+        .set("k1", "v1")
+        .commit();
+
+    Table txnTable = txn.table();
+    SerializableTable serializableTxnTable = (SerializableTable) SerializableTable.copyOf(txnTable);

Review comment:
       Good point @openinx. I missed that in the Javadoc of SerializableTable as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#discussion_r780827168



##########
File path: flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.flink.TestHelpers.roundTripKryoSerialize;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestTableSerialization {
+  private static final HadoopTables TABLES = new HadoopTables();
+
+  private static final Schema SCHEMA = new Schema(
+      required(1, "id", Types.LongType.get()),
+      optional(2, "data", Types.StringType.get()),
+      required(3, "date", Types.StringType.get()),
+      optional(4, "double", Types.DoubleType.get()));
+
+  private static final PartitionSpec SPEC = PartitionSpec
+      .builderFor(SCHEMA)
+      .identity("date")
+      .build();
+
+  private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA)
+      .asc("id")
+      .build();
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private Table table;
+
+  @Before
+  public void initTable() throws IOException {
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+
+    File tableLocation = temp.newFolder();
+    Assert.assertTrue(tableLocation.delete());
+
+    this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString());
+  }

Review comment:
       Nit / Question: Do we need an `@After` method for any cleanup beyond the temporary folder rule?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#discussion_r780839322



##########
File path: flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.flink.TestHelpers.roundTripKryoSerialize;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestTableSerialization {
+  private static final HadoopTables TABLES = new HadoopTables();
+
+  private static final Schema SCHEMA = new Schema(
+      required(1, "id", Types.LongType.get()),
+      optional(2, "data", Types.StringType.get()),
+      required(3, "date", Types.StringType.get()),
+      optional(4, "double", Types.DoubleType.get()));
+
+  private static final PartitionSpec SPEC = PartitionSpec
+      .builderFor(SCHEMA)
+      .identity("date")
+      .build();
+
+  private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA)
+      .asc("id")
+      .build();
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private Table table;
+
+  @Before
+  public void initTable() throws IOException {
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+
+    File tableLocation = temp.newFolder();
+    Assert.assertTrue(tableLocation.delete());
+
+    this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString());
+  }

Review comment:
       Since this is using `HadoopTables`, it should be okay. The only state is in the table location, which is managed by the temporary folder.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick edited a comment on pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
kbendick edited a comment on pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#issuecomment-1007212455


   Thanks @openinx 
   
   _Some questions for more context:_
   
   Is this due to Flink 1.13 only (e.g. fixed in 1.14) or did you just choose to implement in 1.13 for another reason?
   
   _Personal thoughts / comments:_
   
   The SerializableSupplier is written as is to avoid calls to get on the user-side, but if that's prohibiting Kryo usage then I agree that it should be changed if need be.
   
   We should likely consider updating the usage of SerializableSupplier to be consistent across versions eventually, even if it only affects one version, for ease of backporting etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#issuecomment-1007243161


   > Is this due to Flink 1.13 only (e.g. fixed / not known to occur in 1.14 or 1.12) or did you just choose to implement in 1.13 for another reason?
   
   The fix for `SerializableTable`  should be okay for all the flink versions.  I'm trying to add this test casest flink 1.13 firstly, it should also be applied to other flink versions. I think I will port those to flink 1.12  & flink 1.13 then.
   
   > The SerializableSupplier is written as is to avoid calls to get on the user-side. 
   
   I think the `SerializableSupplier` is mainly designed for resolving the hadoop `Configuration` serialization & deserialization issues.  We maintain all the key-value pairs inside the in-memory HashMap from the hadoop `Configuration` and then restore to the hadoop `Configuration`  once we've transferred those bytes to engines's DAG operators ( Saying `SerializableSupplier#get`).
   
   The current patch don't change the previous intentional design, instead it is trying to implement it in another way to fix the  flink kyro serialization issues. 
   
   Yes, I agree it's worth to port those test cases to other flink versions.
   
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick edited a comment on pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
kbendick edited a comment on pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#issuecomment-1007663657


   > > Is this due to Flink 1.13 only (e.g. fixed / not known to occur in 1.14 or 1.12) or did you just choose to implement in 1.13 for another reason?
   > 
   > The fix for `SerializableTable` should be okay for all the flink versions. I'm trying to add this test casest flink 1.13 firstly, it should also be applied to other flink versions. I think I will port those to flink 1.12 & flink 1.13 then.
   
   Perfect. I just wasn't sure as sometimes people do 1.14 for new PRs (or Spark 3.2 for example). Since it wasn't latest, that's what made me ask if maybe this was 1.13 specific. But it's absolutely ok to do it in 1.13 first for review purposes. Thanks for clarifying!
   
   > > The SerializableSupplier is written as is to avoid calls to get on the user-side.
   > 
   > I think the `SerializableSupplier` is mainly designed for resolving the hadoop `Configuration` serialization & deserialization issues. We maintain all the key-value pairs inside the in-memory HashMap from the hadoop `Configuration` and then restore to the hadoop `Configuration` once we've transferred those bytes to engines's DAG operators ( Saying `SerializableSupplier#get`).
   > 
   > The current patch don't change the previous intentional design, instead it is trying to implement it in another way to fix the flink kyro serialization issues.
   
   That's correct. My mistake. I meant the current usage that has the `::get` call inside of `serializeConfWith`. A minor historical detail. You are right, that the SerializableSupplier is to deal with SerDe issues. This doesn't change that. Sorry for any confusion.  👍 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
kbendick commented on pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#issuecomment-1007663657


   > > Is this due to Flink 1.13 only (e.g. fixed / not known to occur in 1.14 or 1.12) or did you just choose to implement in 1.13 for another reason?
   > 
   > The fix for `SerializableTable` should be okay for all the flink versions. I'm trying to add this test casest flink 1.13 firstly, it should also be applied to other flink versions. I think I will port those to flink 1.12 & flink 1.13 then.
   
   Perfect. I just wasn't sure as sometimes people do 1.14 for new PRs (or Spark 3.2 for example). Since it wasn't latest, that's what made me ask. But it's absolutely ok to do it in 1.13 first for review purposes. Thanks for clarifying!
   
   > > The SerializableSupplier is written as is to avoid calls to get on the user-side.
   > 
   > I think the `SerializableSupplier` is mainly designed for resolving the hadoop `Configuration` serialization & deserialization issues. We maintain all the key-value pairs inside the in-memory HashMap from the hadoop `Configuration` and then restore to the hadoop `Configuration` once we've transferred those bytes to engines's DAG operators ( Saying `SerializableSupplier#get`).
   > 
   > The current patch don't change the previous intentional design, instead it is trying to implement it in another way to fix the flink kyro serialization issues.
   
   That's correct. My mistake. I meant the current usage that has the `::get` call inside of `serializeConfWith`. A minor historical detail. You are right, that the SerializableSupplier is to deal with SerDe issues. This doesn't change that. Sorry for any confusion.  👍 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] marton-bod commented on pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
marton-bod commented on pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#issuecomment-1007409833


   @openinx I'm assuming we would need a similar solution on the Hive side for this lamba?
   https://github.com/apache/iceberg/blob/master/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java#L209
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick edited a comment on pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
kbendick edited a comment on pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#issuecomment-1007212455


   Thanks @openinx 
   
   _Some questions for more context:_
   
   Is this due to Flink 1.13 only (e.g. fixed / not known to occur in 1.14 or 1.12) or did you just choose to implement in 1.13 for another reason?
   
   _Personal thoughts / comments:_
   
   The SerializableSupplier is written as is to avoid calls to get on the user-side, but if that's prohibiting Kryo usage then I agree that it should be changed if need be.
   
   We should likely consider updating the usage of `SerializableSupplier` to be consistent across versions eventually, even if it only affects one version, for ease of backporting etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#issuecomment-1007243161


   > Is this due to Flink 1.13 only (e.g. fixed / not known to occur in 1.14 or 1.12) or did you just choose to implement in 1.13 for another reason?
   
   The fix for `SerializableTable`  should be okay for all the flink versions.  I'm trying to add this test casest flink 1.13 firstly, it should also be applied to other flink versions. I think I will port those to flink 1.12  & flink 1.13 then.
   
   > The SerializableSupplier is written as is to avoid calls to get on the user-side. 
   
   I think the `SerializableSupplier` is mainly designed for resolving the hadoop `Configuration` serialization & deserialization issues.  We maintain all the key-value pairs inside the in-memory HashMap from the hadoop `Configuration` and then restore to the hadoop `Configuration`  once we've transferred those bytes to engines's DAG operators ( Saying `SerializableSupplier#get`).
   
   The current patch don't change the previous intentional design, instead it is trying to implement it in another way to fix the  flink kyro serialization issues. 
   
   Yes, I agree it's worth to port those test cases to other flink versions.
   
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#discussion_r780663672



##########
File path: flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.flink.TestHelpers.roundTripKryoSerialize;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestTableSerialization {
+  private static final HadoopTables TABLES = new HadoopTables();
+
+  private static final Schema SCHEMA = new Schema(
+      required(1, "id", Types.LongType.get()),
+      optional(2, "data", Types.StringType.get()),
+      required(3, "date", Types.StringType.get()),
+      optional(4, "double", Types.DoubleType.get()));
+
+  private static final PartitionSpec SPEC = PartitionSpec
+      .builderFor(SCHEMA)
+      .identity("date")
+      .build();
+
+  private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA)
+      .asc("id")
+      .build();
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private Table table;
+
+  @Before
+  public void initTable() throws IOException {
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+
+    File tableLocation = temp.newFolder();
+    Assert.assertTrue(tableLocation.delete());
+
+    this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString());
+  }
+
+  @Test
+  public void testSerializableTableKryoSerialization() throws IOException {
+    SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table);
+    org.apache.iceberg.TestHelpers.assertSerializedAndLoadedMetadata(table,

Review comment:
       As we flink module has the the same name `TestHelpers`, so here we have to import the full class qualifier to access the iceberg's `TestHelpers`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#discussion_r780826280



##########
File path: flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.flink.TestHelpers.roundTripKryoSerialize;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestTableSerialization {
+  private static final HadoopTables TABLES = new HadoopTables();
+
+  private static final Schema SCHEMA = new Schema(
+      required(1, "id", Types.LongType.get()),
+      optional(2, "data", Types.StringType.get()),
+      required(3, "date", Types.StringType.get()),
+      optional(4, "double", Types.DoubleType.get()));
+
+  private static final PartitionSpec SPEC = PartitionSpec
+      .builderFor(SCHEMA)
+      .identity("date")
+      .build();
+
+  private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA)
+      .asc("id")
+      .build();
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private Table table;
+
+  @Before
+  public void initTable() throws IOException {
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+
+    File tableLocation = temp.newFolder();
+    Assert.assertTrue(tableLocation.delete());
+
+    this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString());
+  }
+
+  @Test
+  public void testSerializableTableKryoSerialization() throws IOException {
+    SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table);
+    org.apache.iceberg.TestHelpers.assertSerializedAndLoadedMetadata(table,

Review comment:
       Oh that’s a bummer.
   
   Question: in a follow up PR, would it be possible or beneficial to change one of the `TestHelpers` name or to start using a utility class with a different name in Flink so that we don’t have the conflict? Not necessary for this PR but I could open an issue to look at it later. We’d have to consider backwards compatibility, but going forward it would be nice for new things to be able to be fully imported. Possibly the issue of using both the `TestHelpers` doesn’t come up often enough though?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
kbendick commented on pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#issuecomment-1007212455






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#discussion_r780399869



##########
File path: flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.flink.TestHelpers.roundTripKryoSerialize;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestTableSerialization {
+  private static final HadoopTables TABLES = new HadoopTables();
+
+  private static final Schema SCHEMA = new Schema(
+      required(1, "id", Types.LongType.get()),
+      optional(2, "data", Types.StringType.get()),
+      required(3, "date", Types.StringType.get()),
+      optional(4, "double", Types.DoubleType.get()));
+
+  private static final PartitionSpec SPEC = PartitionSpec
+      .builderFor(SCHEMA)
+      .identity("date")
+      .build();
+
+  private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA)
+      .asc("id")
+      .build();
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private Table table;
+
+  @Before
+  public void initTable() throws IOException {
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+
+    File tableLocation = temp.newFolder();
+    Assert.assertTrue(tableLocation.delete());
+
+    this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, tableLocation.toString());
+  }
+
+  @Test
+  public void testSerializableTableKryoSerialization() throws IOException {
+    SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table);
+    org.apache.iceberg.TestHelpers.assertSerializedAndLoadedMetadata(table,
+        roundTripKryoSerialize(SerializableTable.class, serializableTable));
+  }
+
+  @Test
+  public void testSerializableMetadataTableKryoSerialization() throws IOException {
+    for (MetadataTableType type : MetadataTableType.values()) {
+      TableOperations ops = ((HasTableOperations) table).operations();
+      Table metadataTable = MetadataTableUtils.createMetadataTableInstance(ops, table.name(), "meta", type);
+      SerializableTable serializableMetadataTable = (SerializableTable) SerializableTable.copyOf(metadataTable);
+
+      org.apache.iceberg.TestHelpers.assertSerializedAndLoadedMetadata(
+          metadataTable,
+          roundTripKryoSerialize(SerializableTable.class, serializableMetadataTable));
+    }
+  }
+
+  @Test
+  public void testSerializableTransactionTableKryoSerialization() throws IOException {
+    Transaction txn = table.newTransaction();
+
+    txn.updateProperties()
+        .set("k1", "v1")
+        .commit();
+
+    Table txnTable = txn.table();
+    SerializableTable serializableTxnTable = (SerializableTable) SerializableTable.copyOf(txnTable);

Review comment:
       should we add test that the `serializableTxnTable` can perform commit operation?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] marton-bod commented on pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
marton-bod commented on pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#issuecomment-1007409833


   @openinx I'm assuming we would need a similar solution on the Hive side for this lamba?
   https://github.com/apache/iceberg/blob/master/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java#L209
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick edited a comment on pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
kbendick edited a comment on pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#issuecomment-1007212455


   Thanks @openinx 
   
   _Some questions for more context:_
   
   Is this due to Flink 1.13 only (e.g. fixed / not known to occur in 1.14 or 1.12) or did you just choose to implement in 1.13 for another reason?
   
   _Personal thoughts / comments:_
   
   The SerializableSupplier is written as is to avoid calls to get on the user-side, but if that's prohibiting Kryo usage then I agree that it should be changed if need be.
   
   We should likely consider updating the usage of SerializableSupplier to be consistent across versions eventually, even if it only affects one version, for ease of backporting etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857#discussion_r780826016



##########
File path: flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
##########
@@ -67,6 +71,16 @@
   private TestHelpers() {
   }
 
+  public static <T> T roundTripKryoSerialize(Class<T> clazz, T table) throws IOException {

Review comment:
       I agree it would be useful for other tests. We have similar test helper in Spark for roundTripKryoSerializer.
   
   Question / nit: Does the name of the second argument of type `T` have to be `table`? This feels like it could be used with a lot more than just table objects. Maybe we should just change the name to something more generic like `item` or `toTest`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #3857: Flink 1.13: Add SerializableTable test cases.

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #3857:
URL: https://github.com/apache/iceberg/pull/3857


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org