You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/10/09 03:53:56 UTC

[GitHub] [iceberg] JingsongLi opened a new pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

JingsongLi opened a new pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565


   Discussed in: https://github.com/apache/iceberg/pull/1509/files#r496268020


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#issuecomment-705952388


   CC: @rdblue @openinx 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r503650760



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.types.Types;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test for {@link CatalogLoader} and {@link TableLoader}.
+ */
+public class TestCatalogTableLoader extends FlinkTestBase {
+
+  private static File warehouse = null;
+  private static final TableIdentifier IDENTIFIER = TableIdentifier.of("default", "my_table");
+  private static final Schema SCHEMA = new Schema(Types.NestedField.required(1, "f1", Types.StringType.get()));
+
+  @BeforeClass
+  public static void createWarehouse() throws IOException {
+    warehouse = File.createTempFile("warehouse", null);
+    Assert.assertTrue(warehouse.delete());
+    hiveConf.set("my_key", "my_value");
+  }
+
+  @AfterClass
+  public static void dropWarehouse() {
+    if (warehouse != null && warehouse.exists()) {
+      warehouse.delete();
+    }
+  }
+
+  @Test
+  public void testHadoopCatalogLoader() throws IOException, ClassNotFoundException {
+    CatalogLoader loader = CatalogLoader.hadoop("my_catalog", hiveConf, "file:" + warehouse);
+    validateHadoopConf(javaSerAndDeSer(loader).loadCatalog().createTable(IDENTIFIER, SCHEMA));
+  }
+
+  @Test
+  public void testHiveCatalogLoader() throws IOException, ClassNotFoundException {
+    CatalogLoader loader = CatalogLoader.hive("my_catalog", hiveConf, null, 2);
+    validateHadoopConf(javaSerAndDeSer(loader).loadCatalog().createTable(IDENTIFIER, SCHEMA));
+  }
+
+  @Test
+  public void testHadoopTableLoader() throws IOException, ClassNotFoundException {
+    String location = "file:" + warehouse + "/my_table";
+    new HadoopTables(hiveConf).create(SCHEMA, location);
+    TableLoader loader = TableLoader.fromHadoopTable(location, hiveConf);
+    TableLoader copied = javaSerAndDeSer(loader);
+    copied.open();
+    try {
+      validateHadoopConf(copied.loadTable());
+    } finally {
+      copied.close();
+    }
+  }
+

Review comment:
       make sense, I'll add test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r502832201



##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -43,18 +44,18 @@ static CatalogLoader hive(String name, Configuration hadoopConf, String uri, int
 
   class HadoopCatalogLoader implements CatalogLoader {
     private final String catalogName;
-    private final Configuration hadoopConf;
+    private final SerializableConfiguration hadoopConf;

Review comment:
       I'm not an expert on how Flink will run in tests, but I think it would be safer not to rely on end-to-end or integration tests to check serialization because there may not be a guarantee that tasks are serialized for those runners. (And this was a problem for Hive testing in the past.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r502275912



##########
File path: flink/src/main/java/org/apache/iceberg/flink/TableLoader.java
##########
@@ -45,23 +46,30 @@ static TableLoader fromCatalog(CatalogLoader catalogLoader, TableIdentifier iden
   }
 
   static TableLoader fromHadoopTable(String location) {

Review comment:
       I think it is useful for users, since Flink has default hadoop configuration.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r502275412



##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -43,18 +44,18 @@ static CatalogLoader hive(String name, Configuration hadoopConf, String uri, int
 
   class HadoopCatalogLoader implements CatalogLoader {
     private final String catalogName;
-    private final Configuration hadoopConf;
+    private final SerializableConfiguration hadoopConf;

Review comment:
       This is already tested by ITCases. It is the only way which must be passed.
   But unit test can be added too.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -43,18 +44,18 @@ static CatalogLoader hive(String name, Configuration hadoopConf, String uri, int
 
   class HadoopCatalogLoader implements CatalogLoader {
     private final String catalogName;
-    private final Configuration hadoopConf;
+    private final SerializableConfiguration hadoopConf;

Review comment:
       This is already tested by ITCases. It is the only way which must be passed.
   But we can add more unit tests too.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/TableLoader.java
##########
@@ -45,23 +46,30 @@ static TableLoader fromCatalog(CatalogLoader catalogLoader, TableIdentifier iden
   }
 
   static TableLoader fromHadoopTable(String location) {

Review comment:
       I think it is useful for users, since Flink has default hadoop configuration.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#issuecomment-705952388


   CC: @rdblue @openinx 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r503173314



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.types.Types;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test for {@link CatalogLoader} and {@link TableLoader}.
+ */
+public class TestCatalogTableLoader extends FlinkTestBase {
+
+  private static File warehouse = null;
+  private static final TableIdentifier IDENTIFIER = TableIdentifier.of("default", "my_table");
+  private static final Schema SCHEMA = new Schema(Types.NestedField.required(1, "f1", Types.StringType.get()));
+
+  @BeforeClass
+  public static void createWarehouse() throws IOException {
+    warehouse = File.createTempFile("warehouse", null);
+    Assert.assertTrue(warehouse.delete());
+    hiveConf.set("my_key", "my_value");
+  }
+
+  @AfterClass
+  public static void dropWarehouse() {
+    if (warehouse != null && warehouse.exists()) {
+      warehouse.delete();
+    }
+  }
+
+  @Test
+  public void testHadoopCatalogLoader() throws IOException, ClassNotFoundException {
+    CatalogLoader loader = CatalogLoader.hadoop("my_catalog", hiveConf, "file:" + warehouse);
+    validateHadoopConf(javaSerAndDeSer(loader).loadCatalog().createTable(IDENTIFIER, SCHEMA));
+  }
+
+  @Test
+  public void testHiveCatalogLoader() throws IOException, ClassNotFoundException {
+    CatalogLoader loader = CatalogLoader.hive("my_catalog", hiveConf, null, 2);
+    validateHadoopConf(javaSerAndDeSer(loader).loadCatalog().createTable(IDENTIFIER, SCHEMA));
+  }
+
+  @Test
+  public void testHadoopTableLoader() throws IOException, ClassNotFoundException {
+    String location = "file:" + warehouse + "/my_table";
+    new HadoopTables(hiveConf).create(SCHEMA, location);
+    TableLoader loader = TableLoader.fromHadoopTable(location, hiveConf);
+    TableLoader copied = javaSerAndDeSer(loader);
+    copied.open();
+    try {
+      validateHadoopConf(copied.loadTable());
+    } finally {
+      copied.close();
+    }
+  }
+

Review comment:
       Missed a test for `HiveTableLoader` ?  I think we don't have to open the table actually,  just need to ensure the members inside the instance are the same, maybe adding an equals inside the CatalogLoader or TableLoader. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r503022723



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
##########
@@ -62,21 +62,22 @@
   /**
    * Create an Iceberg {@link org.apache.iceberg.catalog.Catalog} loader to be used by this Flink catalog adapter.
    *
-   * @param name    Flink's catalog name
-   * @param options Flink's catalog options
+   * @param name       Flink's catalog name
+   * @param hadoopConf Hadoop configuration for catalog
+   * @param options    Flink's catalog options
    * @return an Iceberg catalog loader
    */
-  protected CatalogLoader createCatalogLoader(String name, Map<String, String> options) {
+  protected CatalogLoader createCatalogLoader(String name, Configuration hadoopConf, Map<String, String> options) {

Review comment:
       I will use same order and `properties`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi merged pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
JingsongLi merged pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r503031133



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestTableLoader.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.IOException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTableLoader {
+
+  @Test
+  public void testJavaSerialization() throws IOException, ClassNotFoundException {
+    TableLoader loader = TableLoader.fromHadoopTable("/my_path/my_table");
+    TableLoader copied = TestCatalogLoader.javaSerAndDeSer(loader);
+    Assert.assertEquals("HadoopTableLoader{location=/my_path/my_table}", copied.toString());

Review comment:
       Agree, I think we can check `FileIO` too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#issuecomment-707518866


   Thanks @rdblue and @openinx for review, merging...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r503031133



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestTableLoader.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.IOException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTableLoader {
+
+  @Test
+  public void testJavaSerialization() throws IOException, ClassNotFoundException {
+    TableLoader loader = TableLoader.fromHadoopTable("/my_path/my_table");
+    TableLoader copied = TestCatalogLoader.javaSerAndDeSer(loader);
+    Assert.assertEquals("HadoopTableLoader{location=/my_path/my_table}", copied.toString());

Review comment:
       Agree




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r502257571



##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -43,18 +44,18 @@ static CatalogLoader hive(String name, Configuration hadoopConf, String uri, int
 
   class HadoopCatalogLoader implements CatalogLoader {
     private final String catalogName;
-    private final Configuration hadoopConf;
+    private final SerializableConfiguration hadoopConf;

Review comment:
       Nice catch, since the `CatalogLoader` need to be serializable, so all members need to be serializable. Do we need to add several unit tests to address the serializable issue so that we won't break it in future ?   similar to the unit test TestScanTaskSerialization ? 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -43,18 +44,18 @@ static CatalogLoader hive(String name, Configuration hadoopConf, String uri, int
 
   class HadoopCatalogLoader implements CatalogLoader {
     private final String catalogName;
-    private final Configuration hadoopConf;
+    private final SerializableConfiguration hadoopConf;

Review comment:
       `TableLoader`  need the similar unit test. 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/TableLoader.java
##########
@@ -45,23 +46,30 @@ static TableLoader fromCatalog(CatalogLoader catalogLoader, TableIdentifier iden
   }
 
   static TableLoader fromHadoopTable(String location) {

Review comment:
       Is this only visible for testing ?  Moving to test package ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r502258283



##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -43,18 +44,18 @@ static CatalogLoader hive(String name, Configuration hadoopConf, String uri, int
 
   class HadoopCatalogLoader implements CatalogLoader {
     private final String catalogName;
-    private final Configuration hadoopConf;
+    private final SerializableConfiguration hadoopConf;

Review comment:
       `TableLoader`  need the similar unit test. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r503177391



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.types.Types;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test for {@link CatalogLoader} and {@link TableLoader}.
+ */
+public class TestCatalogTableLoader extends FlinkTestBase {
+
+  private static File warehouse = null;
+  private static final TableIdentifier IDENTIFIER = TableIdentifier.of("default", "my_table");
+  private static final Schema SCHEMA = new Schema(Types.NestedField.required(1, "f1", Types.StringType.get()));
+
+  @BeforeClass
+  public static void createWarehouse() throws IOException {
+    warehouse = File.createTempFile("warehouse", null);
+    Assert.assertTrue(warehouse.delete());
+    hiveConf.set("my_key", "my_value");
+  }
+
+  @AfterClass
+  public static void dropWarehouse() {
+    if (warehouse != null && warehouse.exists()) {
+      warehouse.delete();
+    }
+  }
+
+  @Test
+  public void testHadoopCatalogLoader() throws IOException, ClassNotFoundException {
+    CatalogLoader loader = CatalogLoader.hadoop("my_catalog", hiveConf, "file:" + warehouse);
+    validateHadoopConf(javaSerAndDeSer(loader).loadCatalog().createTable(IDENTIFIER, SCHEMA));
+  }
+
+  @Test
+  public void testHiveCatalogLoader() throws IOException, ClassNotFoundException {
+    CatalogLoader loader = CatalogLoader.hive("my_catalog", hiveConf, null, 2);
+    validateHadoopConf(javaSerAndDeSer(loader).loadCatalog().createTable(IDENTIFIER, SCHEMA));
+  }
+
+  @Test
+  public void testHadoopTableLoader() throws IOException, ClassNotFoundException {
+    String location = "file:" + warehouse + "/my_table";
+    new HadoopTables(hiveConf).create(SCHEMA, location);
+    TableLoader loader = TableLoader.fromHadoopTable(location, hiveConf);
+    TableLoader copied = javaSerAndDeSer(loader);
+    copied.open();
+    try {
+      validateHadoopConf(copied.loadTable());
+    } finally {
+      copied.close();
+    }
+  }
+

Review comment:
       You mean `Tableloader.CatalogTableloader` class? I think we don't need to add case for it since we have case for `Catalog Loader`s.
   Opening the table can cover more since they are loaders, we can try to load something and ensure that is correct.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r503168802



##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -68,13 +64,13 @@ public boolean isBounded() {
 
   @Override
   public TableSource<RowData> projectFields(int[] fields) {
-    return new IcebergTableSource(loader, hadoopConf, schema, options, fields);
+    return new IcebergTableSource(loader, schema, options, fields);
   }
 
   @Override
   public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {
-    return FlinkSource.forRowData().env(execEnv).tableLoader(loader).hadoopConf(hadoopConf)
-        .project(getProjectedSchema()).properties(options).build();
+    return FlinkSource.forRowData().env(execEnv).tableLoader(loader).project(getProjectedSchema())
+        .properties(options).build();

Review comment:
       nit:   I see somewhere call it `options` and other place call it `properties`, is it possible to unify them ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r502764816



##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -43,18 +44,18 @@ static CatalogLoader hive(String name, Configuration hadoopConf, String uri, int
 
   class HadoopCatalogLoader implements CatalogLoader {
     private final String catalogName;
-    private final Configuration hadoopConf;
+    private final SerializableConfiguration hadoopConf;

Review comment:
       `TestFlinkScanSql`, `TestFlinkTableSink` and `TestFlinkIcebergSink` will cover this too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r502832005



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestCatalogLoader.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestCatalogLoader {
+
+  @Test
+  public void testHadoopJavaSerialization() throws IOException, ClassNotFoundException {

Review comment:
       Same here. I think these tests should do a little more than validate the `toString` method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r502266717



##########
File path: flink/src/main/java/org/apache/iceberg/flink/TableLoader.java
##########
@@ -45,23 +46,30 @@ static TableLoader fromCatalog(CatalogLoader catalogLoader, TableIdentifier iden
   }
 
   static TableLoader fromHadoopTable(String location) {

Review comment:
       Is this only visible for testing ?  Moving to test package ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#issuecomment-706611443


   @JingsongLi, thanks for working on this! The implementation looks great, but I think it would be good to have a bit more thorough  validation in the tests.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r502831940



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestTableLoader.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.IOException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTableLoader {
+
+  @Test
+  public void testJavaSerialization() throws IOException, ClassNotFoundException {
+    TableLoader loader = TableLoader.fromHadoopTable("/my_path/my_table");
+    TableLoader copied = TestCatalogLoader.javaSerAndDeSer(loader);
+    Assert.assertEquals("HadoopTableLoader{location=/my_path/my_table}", copied.toString());

Review comment:
       I don't think it is sufficient to test that the loader's `toString` representation is the same. This would still pass if the underlying `Configuration` were just set to null and not serialized at all. I think it would make sense to do something that will fail if the `Configuration` is not serialized and deserialized correctly, like load the table. In that case, you would just need to create a Hadoop table on the local FS and load it through the copied loader.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r503022145



##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -43,18 +44,18 @@ static CatalogLoader hive(String name, Configuration hadoopConf, String uri, int
 
   class HadoopCatalogLoader implements CatalogLoader {
     private final String catalogName;
-    private final Configuration hadoopConf;
+    private final SerializableConfiguration hadoopConf;

Review comment:
       Flink `StreamExecutionEnvironment.execute` will launch a local cluster, the client will connect to this cluster by RPC through binary message.
   I think you are right, it is better to add unit test too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r502275412



##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -43,18 +44,18 @@ static CatalogLoader hive(String name, Configuration hadoopConf, String uri, int
 
   class HadoopCatalogLoader implements CatalogLoader {
     private final String catalogName;
-    private final Configuration hadoopConf;
+    private final SerializableConfiguration hadoopConf;

Review comment:
       This is already tested by ITCases. It is the only way which must be passed.
   But we can add more unit tests too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r503175307



##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -68,13 +64,13 @@ public boolean isBounded() {
 
   @Override
   public TableSource<RowData> projectFields(int[] fields) {
-    return new IcebergTableSource(loader, hadoopConf, schema, options, fields);
+    return new IcebergTableSource(loader, schema, options, fields);
   }
 
   @Override
   public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {
-    return FlinkSource.forRowData().env(execEnv).tableLoader(loader).hadoopConf(hadoopConf)
-        .project(getProjectedSchema()).properties(options).build();
+    return FlinkSource.forRowData().env(execEnv).tableLoader(loader).project(getProjectedSchema())
+        .properties(options).build();

Review comment:
       OK I'll do it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r502275412



##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -43,18 +44,18 @@ static CatalogLoader hive(String name, Configuration hadoopConf, String uri, int
 
   class HadoopCatalogLoader implements CatalogLoader {
     private final String catalogName;
-    private final Configuration hadoopConf;
+    private final SerializableConfiguration hadoopConf;

Review comment:
       This is already tested by ITCases. It is the only way which must be passed.
   But unit test can be added too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#issuecomment-707247165


   I think this is ready, but @openinx will probably want to have a look at the answer to his question about a test for `HiveTableLoader` so I'll wait for him to merge. Thanks, @JingsongLi!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r502257571



##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -43,18 +44,18 @@ static CatalogLoader hive(String name, Configuration hadoopConf, String uri, int
 
   class HadoopCatalogLoader implements CatalogLoader {
     private final String catalogName;
-    private final Configuration hadoopConf;
+    private final SerializableConfiguration hadoopConf;

Review comment:
       Nice catch, since the `CatalogLoader` need to be serializable, so all members need to be serializable. Do we need to add several unit tests to address the serializable issue so that we won't break it in future ?   similar to the unit test TestScanTaskSerialization ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r503022723



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
##########
@@ -62,21 +62,22 @@
   /**
    * Create an Iceberg {@link org.apache.iceberg.catalog.Catalog} loader to be used by this Flink catalog adapter.
    *
-   * @param name    Flink's catalog name
-   * @param options Flink's catalog options
+   * @param name       Flink's catalog name
+   * @param hadoopConf Hadoop configuration for catalog
+   * @param options    Flink's catalog options
    * @return an Iceberg catalog loader
    */
-  protected CatalogLoader createCatalogLoader(String name, Map<String, String> options) {
+  protected CatalogLoader createCatalogLoader(String name, Configuration hadoopConf, Map<String, String> options) {

Review comment:
       I will use same order and `options`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#issuecomment-707512267


   LGTM now, will merge this if all unit tests pass.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r502831618



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
##########
@@ -62,21 +62,22 @@
   /**
    * Create an Iceberg {@link org.apache.iceberg.catalog.Catalog} loader to be used by this Flink catalog adapter.
    *
-   * @param name    Flink's catalog name
-   * @param options Flink's catalog options
+   * @param name       Flink's catalog name
+   * @param hadoopConf Hadoop configuration for catalog
+   * @param options    Flink's catalog options
    * @return an Iceberg catalog loader
    */
-  protected CatalogLoader createCatalogLoader(String name, Map<String, String> options) {
+  protected CatalogLoader createCatalogLoader(String name, Configuration hadoopConf, Map<String, String> options) {

Review comment:
       It's a little odd that the arguments here are the same as `createCatalog`, but that they are in a different order and use "options" rather than "properties". Should we at least use the same argument order?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r503631264



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.types.Types;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test for {@link CatalogLoader} and {@link TableLoader}.
+ */
+public class TestCatalogTableLoader extends FlinkTestBase {
+
+  private static File warehouse = null;
+  private static final TableIdentifier IDENTIFIER = TableIdentifier.of("default", "my_table");
+  private static final Schema SCHEMA = new Schema(Types.NestedField.required(1, "f1", Types.StringType.get()));
+
+  @BeforeClass
+  public static void createWarehouse() throws IOException {
+    warehouse = File.createTempFile("warehouse", null);
+    Assert.assertTrue(warehouse.delete());
+    hiveConf.set("my_key", "my_value");
+  }
+
+  @AfterClass
+  public static void dropWarehouse() {
+    if (warehouse != null && warehouse.exists()) {
+      warehouse.delete();
+    }
+  }
+
+  @Test
+  public void testHadoopCatalogLoader() throws IOException, ClassNotFoundException {
+    CatalogLoader loader = CatalogLoader.hadoop("my_catalog", hiveConf, "file:" + warehouse);
+    validateHadoopConf(javaSerAndDeSer(loader).loadCatalog().createTable(IDENTIFIER, SCHEMA));
+  }
+
+  @Test
+  public void testHiveCatalogLoader() throws IOException, ClassNotFoundException {
+    CatalogLoader loader = CatalogLoader.hive("my_catalog", hiveConf, null, 2);
+    validateHadoopConf(javaSerAndDeSer(loader).loadCatalog().createTable(IDENTIFIER, SCHEMA));
+  }
+
+  @Test
+  public void testHadoopTableLoader() throws IOException, ClassNotFoundException {
+    String location = "file:" + warehouse + "/my_table";
+    new HadoopTables(hiveConf).create(SCHEMA, location);
+    TableLoader loader = TableLoader.fromHadoopTable(location, hiveConf);
+    TableLoader copied = javaSerAndDeSer(loader);
+    copied.open();
+    try {
+      validateHadoopConf(copied.loadTable());
+    } finally {
+      copied.close();
+    }
+  }
+

Review comment:
       OK, it's 'CatalogTableLoader'.  it has two members: `CatalogLoader` & `String identifier` (the other one is transient member). I agree that in this case if we could assert that `CatalogLoader` is serializing & deserializing correctly, then it imply that `CatalogTableLoader` should be OK. But providing those unit tests is to ensure that we won't break this point in future.  What if others introduce another non-serializable member in `CatalogTableLoader` class?  that's why I said it seems to be more reasonable to write a ser/der unit test for `CatalogTableLoader`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #1565: Flink: move hadoop configuration to Loaders from Source/Sink API

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #1565:
URL: https://github.com/apache/iceberg/pull/1565#discussion_r502763688



##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -43,18 +44,18 @@ static CatalogLoader hive(String name, Configuration hadoopConf, String uri, int
 
   class HadoopCatalogLoader implements CatalogLoader {
     private final String catalogName;
-    private final Configuration hadoopConf;
+    private final SerializableConfiguration hadoopConf;

Review comment:
       What do you mean ITCases?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org