You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/10/23 21:28:57 UTC

[iceberg] branch master updated: Flink: Load hive-site.xml for HiveCatalog (#1586)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new d132474  Flink: Load hive-site.xml for HiveCatalog (#1586)
d132474 is described below

commit d132474e7c8706f504a47f75b58e5a58d0bef9ff
Author: openinx <op...@gmail.com>
AuthorDate: Sat Oct 24 05:28:49 2020 +0800

    Flink: Load hive-site.xml for HiveCatalog (#1586)
---
 .../org/apache/iceberg/flink/CatalogLoader.java    |  19 +++-
 .../apache/iceberg/flink/FlinkCatalogFactory.java  |  40 +++++++-
 .../apache/iceberg/flink/FlinkCatalogTestBase.java | 106 +++++++++------------
 .../org/apache/iceberg/flink/FlinkTestBase.java    |  50 ++++++++--
 .../iceberg/flink/TestCatalogTableLoader.java      |   4 +-
 .../iceberg/flink/TestFlinkCatalogDatabase.java    |   2 +
 .../iceberg/flink/TestFlinkCatalogTable.java       |   1 +
 .../apache/iceberg/flink/TestFlinkHiveCatalog.java | 101 ++++++++++++++++++++
 .../apache/iceberg/flink/TestFlinkTableSink.java   |   2 +
 .../source/TestFlinkInputFormatReaderDeletes.java  |   1 +
 .../java/org/apache/iceberg/hive/HiveCatalog.java  |  25 +++--
 site/docs/flink.md                                 |  23 +++--
 12 files changed, 280 insertions(+), 94 deletions(-)

diff --git a/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java b/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
index 2088a22..4d0670c 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
@@ -32,14 +32,21 @@ import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
  */
 public interface CatalogLoader extends Serializable {
 
+  /**
+   * Create a new catalog with the provided properties. NOTICE: for flink, we may initialize the {@link CatalogLoader}
+   * at flink sql client side or job manager side, and then serialize this catalog loader to task manager, finally
+   * deserialize it and create a new catalog at task manager side.
+   *
+   * @return a newly created {@link Catalog}
+   */
   Catalog loadCatalog();
 
   static CatalogLoader hadoop(String name, Configuration hadoopConf, String warehouseLocation) {
     return new HadoopCatalogLoader(name, hadoopConf, warehouseLocation);
   }
 
-  static CatalogLoader hive(String name, Configuration hadoopConf, String uri, int clientPoolSize) {
-    return new HiveCatalogLoader(name, hadoopConf, uri, clientPoolSize);
+  static CatalogLoader hive(String name, Configuration hadoopConf, String uri, String warehouse, int clientPoolSize) {
+    return new HiveCatalogLoader(name, hadoopConf, uri, warehouse, clientPoolSize);
   }
 
   class HadoopCatalogLoader implements CatalogLoader {
@@ -71,18 +78,21 @@ public interface CatalogLoader extends Serializable {
     private final String catalogName;
     private final SerializableConfiguration hadoopConf;
     private final String uri;
+    private final String warehouse;
     private final int clientPoolSize;
 
-    private HiveCatalogLoader(String catalogName, Configuration conf, String uri, int clientPoolSize) {
+    private HiveCatalogLoader(String catalogName, Configuration conf, String uri, String warehouse,
+                              int clientPoolSize) {
       this.catalogName = catalogName;
       this.hadoopConf = new SerializableConfiguration(conf);
       this.uri = uri;
+      this.warehouse = warehouse;
       this.clientPoolSize = clientPoolSize;
     }
 
     @Override
     public Catalog loadCatalog() {
-      return new HiveCatalog(catalogName, uri, clientPoolSize, hadoopConf.get());
+      return new HiveCatalog(catalogName, uri, warehouse, clientPoolSize, hadoopConf.get());
     }
 
     @Override
@@ -90,6 +100,7 @@ public interface CatalogLoader extends Serializable {
       return MoreObjects.toStringHelper(this)
           .add("catalogName", catalogName)
           .add("uri", uri)
+          .add("warehouse", warehouse)
           .add("clientPoolSize", clientPoolSize)
           .toString();
     }
diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
index 75c779e..9484a96 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
@@ -19,6 +19,9 @@
 
 package org.apache.iceberg.flink;
 
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -27,7 +30,10 @@ import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.descriptors.CatalogDescriptorValidator;
 import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 
@@ -54,7 +60,8 @@ public class FlinkCatalogFactory implements CatalogFactory {
   public static final String ICEBERG_CATALOG_TYPE = "catalog-type";
   public static final String HIVE_URI = "uri";
   public static final String HIVE_CLIENT_POOL_SIZE = "clients";
-  public static final String HADOOP_WAREHOUSE_LOCATION = "warehouse";
+  public static final String HIVE_CONF_DIR = "hive-conf-dir";
+  public static final String WAREHOUSE_LOCATION = "warehouse";
 
   public static final String DEFAULT_DATABASE = "default-database";
   public static final String BASE_NAMESPACE = "base-namespace";
@@ -71,12 +78,17 @@ public class FlinkCatalogFactory implements CatalogFactory {
     String catalogType = properties.getOrDefault(ICEBERG_CATALOG_TYPE, "hive");
     switch (catalogType) {
       case "hive":
-        int clientPoolSize = Integer.parseInt(properties.getOrDefault(HIVE_CLIENT_POOL_SIZE, "2"));
+        // The values of properties 'uri', 'warehouse', 'hive-conf-dir' are allowed to be null, in that case it will
+        // fallback to parse those values from hadoop configuration which is loaded from classpath.
         String uri = properties.get(HIVE_URI);
-        return CatalogLoader.hive(name, hadoopConf, uri, clientPoolSize);
+        String warehouse = properties.get(WAREHOUSE_LOCATION);
+        int clientPoolSize = Integer.parseInt(properties.getOrDefault(HIVE_CLIENT_POOL_SIZE, "2"));
+        String hiveConfDir = properties.get(HIVE_CONF_DIR);
+        Configuration newHadoopConf = mergeHiveConf(hadoopConf, hiveConfDir);
+        return CatalogLoader.hive(name, newHadoopConf, uri, warehouse, clientPoolSize);
 
       case "hadoop":
-        String warehouseLocation = properties.get(HADOOP_WAREHOUSE_LOCATION);
+        String warehouseLocation = properties.get(WAREHOUSE_LOCATION);
         return CatalogLoader.hadoop(name, hadoopConf, warehouseLocation);
 
       default:
@@ -98,7 +110,8 @@ public class FlinkCatalogFactory implements CatalogFactory {
     properties.add(ICEBERG_CATALOG_TYPE);
     properties.add(HIVE_URI);
     properties.add(HIVE_CLIENT_POOL_SIZE);
-    properties.add(HADOOP_WAREHOUSE_LOCATION);
+    properties.add(HIVE_CONF_DIR);
+    properties.add(WAREHOUSE_LOCATION);
     properties.add(DEFAULT_DATABASE);
     properties.add(BASE_NAMESPACE);
     return properties;
@@ -119,6 +132,23 @@ public class FlinkCatalogFactory implements CatalogFactory {
     return new FlinkCatalog(name, defaultDatabase, baseNamespace, catalogLoader, cacheEnabled);
   }
 
+  private static Configuration mergeHiveConf(Configuration hadoopConf, String hiveConfDir) {
+    Configuration newConf = new Configuration(hadoopConf);
+    if (!Strings.isNullOrEmpty(hiveConfDir)) {
+      Preconditions.checkState(Files.exists(Paths.get(hiveConfDir, "hive-site.xml")),
+          "There should be a hive-site.xml file under the directory %s", hiveConfDir);
+      newConf.addResource(new Path(hiveConfDir, "hive-site.xml"));
+    } else {
+      // If don't provide the hive-site.xml path explicitly, it will try to load resource from classpath. If still
+      // couldn't load the configuration file, then it will throw exception in HiveCatalog.
+      URL configFile = CatalogLoader.class.getClassLoader().getResource("hive-site.xml");
+      if (configFile != null) {
+        newConf.addResource(configFile);
+      }
+    }
+    return newConf;
+  }
+
   public static Configuration clusterHadoopConf() {
     return HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
   }
diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
index 009931e..3a5d827 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
@@ -19,19 +19,10 @@
 
 package org.apache.iceberg.flink;
 
-import java.io.File;
 import java.io.IOException;
-import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.IntStream;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableResult;
-import org.apache.flink.types.Row;
 import org.apache.flink.util.ArrayUtils;
-import org.apache.flink.util.CloseableIterator;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -39,9 +30,11 @@ import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -49,19 +42,29 @@ import org.junit.runners.Parameterized;
 public abstract class FlinkCatalogTestBase extends FlinkTestBase {
 
   protected static final String DATABASE = "db";
-  private static File warehouse = null;
+  private static TemporaryFolder hiveWarehouse = new TemporaryFolder();
+  private static TemporaryFolder hadoopWarehouse = new TemporaryFolder();
 
   @BeforeClass
   public static void createWarehouse() throws IOException {
-    FlinkCatalogTestBase.warehouse = File.createTempFile("warehouse", null);
-    Assert.assertTrue(warehouse.delete());
+    hiveWarehouse.create();
+    hadoopWarehouse.create();
   }
 
   @AfterClass
   public static void dropWarehouse() {
-    if (warehouse != null && warehouse.exists()) {
-      warehouse.delete();
-    }
+    hiveWarehouse.delete();
+    hadoopWarehouse.delete();
+  }
+
+  @Before
+  public void before() {
+    sql("CREATE CATALOG %s WITH %s", catalogName, toWithClause(config));
+  }
+
+  @After
+  public void clean() {
+    sql("DROP CATALOG IF EXISTS %s", catalogName);
   }
 
   @Parameterized.Parameters(name = "catalogName = {0} baseNamespace = {1}")
@@ -76,12 +79,11 @@ public abstract class FlinkCatalogTestBase extends FlinkTestBase {
     );
   }
 
-  private volatile TableEnvironment tEnv = null;
-
   protected final String catalogName;
   protected final String[] baseNamespace;
   protected final Catalog validationCatalog;
   protected final SupportsNamespaces validationNamespaceCatalog;
+  private final Map<String, String> config = Maps.newHashMap();
 
   protected final String flinkDatabase;
   protected final Namespace icebergNamespace;
@@ -92,62 +94,44 @@ public abstract class FlinkCatalogTestBase extends FlinkTestBase {
     this.baseNamespace = baseNamespace;
     this.isHadoopCatalog = catalogName.startsWith("testhadoop");
     this.validationCatalog = isHadoopCatalog ?
-        new HadoopCatalog(hiveConf, "file:" + warehouse) :
+        new HadoopCatalog(hiveConf, "file:" + hadoopWarehouse.getRoot()) :
         catalog;
     this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog;
 
-    Map<String, String> config = Maps.newHashMap();
     config.put("type", "iceberg");
-    config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, isHadoopCatalog ? "hadoop" : "hive");
-    config.put(FlinkCatalogFactory.HADOOP_WAREHOUSE_LOCATION, "file:" + warehouse);
     if (baseNamespace.length > 0) {
       config.put(FlinkCatalogFactory.BASE_NAMESPACE, Joiner.on(".").join(baseNamespace));
     }
-
-    FlinkCatalogFactory factory = new FlinkCatalogFactory() {
-      @Override
-      protected org.apache.flink.table.catalog.Catalog createCatalog(
-          String name, Map<String, String> properties, Configuration hadoopConf) {
-        return super.createCatalog(name, properties, hiveConf);
-      }
-    };
-    getTableEnv().registerCatalog(
-        catalogName,
-        flinkCatalogs.computeIfAbsent(catalogName, k -> factory.createCatalog(k, config)));
+    if (isHadoopCatalog) {
+      config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hadoop");
+      config.put(FlinkCatalogFactory.WAREHOUSE_LOCATION, "file://" + hadoopWarehouse.getRoot());
+    } else {
+      config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive");
+      config.put(FlinkCatalogFactory.WAREHOUSE_LOCATION, "file://" + hiveWarehouse.getRoot());
+      config.put(FlinkCatalogFactory.HIVE_URI, getURI(hiveConf));
+    }
 
     this.flinkDatabase = catalogName + "." + DATABASE;
-    this.icebergNamespace = Namespace.of(ArrayUtils.concat(baseNamespace, new String[] { DATABASE }));
+    this.icebergNamespace = Namespace.of(ArrayUtils.concat(baseNamespace, new String[] {DATABASE}));
   }
 
-  protected TableEnvironment getTableEnv() {
-    if (tEnv == null) {
-      synchronized (this) {
-        if (tEnv == null) {
-          this.tEnv = TableEnvironment.create(EnvironmentSettings
-              .newInstance()
-              .useBlinkPlanner()
-              .inBatchMode().build());
-        }
-      }
-    }
-    return tEnv;
+  static String getURI(HiveConf conf) {
+    return conf.get(HiveConf.ConfVars.METASTOREURIS.varname);
   }
 
-  public List<Object[]> sql(String query, Object... args) {
-    TableResult tableResult = getTableEnv().executeSql(String.format(query, args));
-    tableResult.getJobClient().ifPresent(c -> {
-      try {
-        c.getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
-      } catch (InterruptedException | ExecutionException e) {
-        throw new RuntimeException(e);
+  static String toWithClause(Map<String, String> props) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("(");
+    int propCount = 0;
+    for (Map.Entry<String, String> entry : props.entrySet()) {
+      if (propCount > 0) {
+        builder.append(",");
       }
-    });
-    CloseableIterator<Row> iter = tableResult.collect();
-    List<Object[]> results = Lists.newArrayList();
-    while (iter.hasNext()) {
-      Row row = iter.next();
-      results.add(IntStream.range(0, row.getArity()).mapToObj(row::getField).toArray(Object[]::new));
+      builder.append("'").append(entry.getKey()).append("'").append("=")
+          .append("'").append(entry.getValue()).append("'");
+      propCount++;
     }
-    return results;
+    builder.append(")");
+    return builder.toString();
   }
 }
diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
index d92ad63..b680af7 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
@@ -19,13 +19,19 @@
 
 package org.apache.iceberg.flink;
 
-import java.util.concurrent.ConcurrentMap;
-import org.apache.flink.table.catalog.Catalog;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.IntStream;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
 import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.hive.TestHiveMetastore;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
@@ -34,7 +40,8 @@ public abstract class FlinkTestBase extends AbstractTestBase {
   private static TestHiveMetastore metastore = null;
   protected static HiveConf hiveConf = null;
   protected static HiveCatalog catalog = null;
-  protected static ConcurrentMap<String, Catalog> flinkCatalogs;
+
+  private volatile TableEnvironment tEnv = null;
 
   @BeforeClass
   public static void startMetastore() {
@@ -42,7 +49,6 @@ public abstract class FlinkTestBase extends AbstractTestBase {
     metastore.start();
     FlinkTestBase.hiveConf = metastore.hiveConf();
     FlinkTestBase.catalog = new HiveCatalog(metastore.hiveConf());
-    flinkCatalogs = Maps.newConcurrentMap();
   }
 
   @AfterClass
@@ -50,7 +56,37 @@ public abstract class FlinkTestBase extends AbstractTestBase {
     metastore.stop();
     catalog.close();
     FlinkTestBase.catalog = null;
-    flinkCatalogs.values().forEach(Catalog::close);
-    flinkCatalogs.clear();
+  }
+
+  protected TableEnvironment getTableEnv() {
+    if (tEnv == null) {
+      synchronized (this) {
+        if (tEnv == null) {
+          this.tEnv = TableEnvironment.create(EnvironmentSettings
+              .newInstance()
+              .useBlinkPlanner()
+              .inBatchMode().build());
+        }
+      }
+    }
+    return tEnv;
+  }
+
+  List<Object[]> sql(String query, Object... args) {
+    TableResult tableResult = getTableEnv().executeSql(String.format(query, args));
+    tableResult.getJobClient().ifPresent(c -> {
+      try {
+        c.getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new RuntimeException(e);
+      }
+    });
+    CloseableIterator<Row> iter = tableResult.collect();
+    List<Object[]> results = Lists.newArrayList();
+    while (iter.hasNext()) {
+      Row row = iter.next();
+      results.add(IntStream.range(0, row.getArity()).mapToObj(row::getField).toArray(Object[]::new));
+    }
+    return results;
   }
 }
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java b/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
index 5681125..ce3d829 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
@@ -68,7 +68,7 @@ public class TestCatalogTableLoader extends FlinkTestBase {
 
   @Test
   public void testHiveCatalogLoader() throws IOException, ClassNotFoundException {
-    CatalogLoader loader = CatalogLoader.hive("my_catalog", hiveConf, null, 2);
+    CatalogLoader loader = CatalogLoader.hive("my_catalog", hiveConf, null, null, 2);
     validateCatalogLoader(loader);
   }
 
@@ -81,7 +81,7 @@ public class TestCatalogTableLoader extends FlinkTestBase {
 
   @Test
   public void testHiveCatalogTableLoader() throws IOException, ClassNotFoundException {
-    CatalogLoader catalogLoader = CatalogLoader.hive("my_catalog", hiveConf, null, 2);
+    CatalogLoader catalogLoader = CatalogLoader.hive("my_catalog", hiveConf, null, null, 2);
     validateTableLoader(TableLoader.fromCatalog(catalogLoader, IDENTIFIER));
   }
 
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
index b74b14e..2695555 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
@@ -41,9 +41,11 @@ public class TestFlinkCatalogDatabase extends FlinkCatalogTestBase {
   }
 
   @After
+  @Override
   public void clean() {
     sql("DROP TABLE IF EXISTS %s.tl", flinkDatabase);
     sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+    super.clean();
   }
 
   @Test
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
index 934cbf8..a400f94 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
@@ -60,6 +60,7 @@ public class TestFlinkCatalogTable extends FlinkCatalogTestBase {
 
   @Before
   public void before() {
+    super.before();
     sql("CREATE DATABASE %s", flinkDatabase);
     sql("USE CATALOG %s", catalogName);
     sql("USE %s", DATABASE);
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java
new file mode 100644
index 0000000..5a0f2a6
--- /dev/null
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestFlinkHiveCatalog extends FlinkTestBase {
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  @Test
+  public void testCreateCatalogWithWarehouseLocation() throws IOException {
+    Map<String, String> props = Maps.newHashMap();
+    props.put("type", "iceberg");
+    props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive");
+    props.put(FlinkCatalogFactory.HIVE_URI, FlinkCatalogTestBase.getURI(hiveConf));
+
+    File warehouseDir = tempFolder.newFolder();
+    props.put(FlinkCatalogFactory.WAREHOUSE_LOCATION, "file://" + warehouseDir.getAbsolutePath());
+
+    checkSQLQuery(props, warehouseDir);
+  }
+
+  @Test
+  public void testCreateCatalogWithHiveConfDir() throws IOException {
+    // Dump the hive conf into a local file.
+    File hiveConfDir = tempFolder.newFolder();
+    File hiveSiteXML = new File(hiveConfDir, "hive-site.xml");
+    File warehouseDir = tempFolder.newFolder();
+    try (FileOutputStream fos = new FileOutputStream(hiveSiteXML)) {
+      Configuration newConf = new Configuration(hiveConf);
+      // Set another new directory which is different with the hive metastore's warehouse path.
+      newConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, "file://" + warehouseDir.getAbsolutePath());
+      newConf.writeXml(fos);
+    }
+    Assert.assertTrue("hive-site.xml should be created now.", Files.exists(hiveSiteXML.toPath()));
+
+    // Construct the catalog attributions.
+    Map<String, String> props = Maps.newHashMap();
+    props.put("type", "iceberg");
+    props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive");
+    props.put(FlinkCatalogFactory.HIVE_URI, FlinkCatalogTestBase.getURI(hiveConf));
+    // Set the 'hive-conf-dir' instead of 'warehouse'
+    props.put(FlinkCatalogFactory.HIVE_CONF_DIR, hiveConfDir.getAbsolutePath());
+
+    checkSQLQuery(props, warehouseDir);
+  }
+
+  private void checkSQLQuery(Map<String, String> catalogProperties, File warehouseDir) throws IOException {
+    sql("CREATE CATALOG test_catalog WITH %s", FlinkCatalogTestBase.toWithClause(catalogProperties));
+    sql("USE CATALOG test_catalog");
+    sql("CREATE DATABASE test_db");
+    sql("USE test_db");
+    sql("CREATE TABLE test_table(c1 INT, c2 STRING)");
+    sql("INSERT INTO test_table SELECT 1, 'a'");
+
+    Path databasePath = warehouseDir.toPath().resolve("test_db.db");
+    Assert.assertTrue("Database path should exist", Files.exists(databasePath));
+
+    Path tablePath = databasePath.resolve("test_table");
+    Assert.assertTrue("Table path should exist", Files.exists(tablePath));
+
+    Path dataPath = tablePath.resolve("data");
+    Assert.assertTrue("Table data path should exist", Files.exists(dataPath));
+    Assert.assertEquals("Should have a .crc file and a .parquet file", 2, Files.list(dataPath).count());
+
+    sql("DROP TABLE test_table");
+    sql("DROP DATABASE test_db");
+    sql("DROP CATALOG test_catalog");
+  }
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
index 55918bc..f27f54c 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
@@ -91,6 +91,7 @@ public class TestFlinkTableSink extends FlinkCatalogTestBase {
 
   @Before
   public void before() {
+    super.before();
     sql("CREATE DATABASE %s", flinkDatabase);
     sql("USE CATALOG %s", catalogName);
     sql("USE %s", DATABASE);
@@ -102,6 +103,7 @@ public class TestFlinkTableSink extends FlinkCatalogTestBase {
   public void clean() {
     sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME);
     sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+    super.clean();
   }
 
   @Test
diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java
index 74c81bc..c24a887 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java
@@ -108,6 +108,7 @@ public class TestFlinkInputFormatReaderDeletes extends DeleteReadTests {
     CatalogLoader hiveCatalogLoader = CatalogLoader.hive(catalog.name(),
         hiveConf,
         hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname),
+        hiveConf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
         hiveConf.getInt("iceberg.hive.client-pool-size", 5)
     );
     FlinkInputFormat inputFormat = FlinkSource.forRowData()
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index 3d5de20..d188da5 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -72,6 +72,10 @@ public class HiveCatalog extends BaseMetastoreCatalog implements Closeable, Supp
   }
 
   public HiveCatalog(String name, String uri, int clientPoolSize, Configuration conf) {
+    this(name, uri, null, clientPoolSize, conf);
+  }
+
+  public HiveCatalog(String name, String uri, String warehouse, int clientPoolSize, Configuration conf) {
     this.name = name;
     this.conf = new Configuration(conf);
     // before building the client pool, overwrite the configuration's URIs if the argument is non-null
@@ -79,6 +83,10 @@ public class HiveCatalog extends BaseMetastoreCatalog implements Closeable, Supp
       this.conf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri);
     }
 
+    if (warehouse != null) {
+      this.conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehouse);
+    }
+
     this.clients = new HiveClientPool(clientPoolSize, this.conf);
     this.createStack = Thread.currentThread().getStackTrace();
     this.closed = false;
@@ -422,10 +430,7 @@ public class HiveCatalog extends BaseMetastoreCatalog implements Closeable, Supp
     }
 
     // Otherwise stick to the {WAREHOUSE_DIR}/{DB_NAME}.db/{TABLE_NAME} path
-    String warehouseLocation = conf.get("hive.metastore.warehouse.dir");
-    Preconditions.checkNotNull(
-        warehouseLocation,
-        "Warehouse location is not set: hive.metastore.warehouse.dir=null");
+    String warehouseLocation = getWarehouseLocation();
     return String.format(
         "%s/%s.db/%s",
         warehouseLocation,
@@ -433,6 +438,12 @@ public class HiveCatalog extends BaseMetastoreCatalog implements Closeable, Supp
         tableIdentifier.name());
   }
 
+  private String getWarehouseLocation() {
+    String warehouseLocation = conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
+    Preconditions.checkNotNull(warehouseLocation, "Warehouse location is not set: hive.metastore.warehouse.dir=null");
+    return warehouseLocation;
+  }
+
   private Map<String, String> convertToMetadata(Database database) {
 
     Map<String, String> meta = Maps.newHashMap();
@@ -447,17 +458,15 @@ public class HiveCatalog extends BaseMetastoreCatalog implements Closeable, Supp
   }
 
   Database convertToDatabase(Namespace namespace, Map<String, String> meta) {
-    String warehouseLocation = conf.get("hive.metastore.warehouse.dir");
-
     if (!isValidateNamespace(namespace)) {
       throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
     }
 
-    Database database  = new Database();
+    Database database = new Database();
     Map<String, String> parameter = Maps.newHashMap();
 
     database.setName(namespace.level(0));
-    database.setLocationUri(new Path(warehouseLocation, namespace.level(0)).toString() + ".db");
+    database.setLocationUri(new Path(getWarehouseLocation(), namespace.level(0)).toString() + ".db");
 
     meta.forEach((key, value) -> {
       if (key.equals("comment")) {
diff --git a/site/docs/flink.md b/site/docs/flink.md
index ddc0974..917bd3c 100644
--- a/site/docs/flink.md
+++ b/site/docs/flink.md
@@ -27,7 +27,7 @@ we only integrate iceberg with apache flink 1.11.x .
 | [SQL create table](#create-table)                                      | ✔️                 |                                                        |
 | [SQL alter table](#alter-table)                                        | ✔️                 | Only support altering table properties, Columns/PartitionKey changes are not supported now|
 | [SQL drop_table](#drop-table)                                          | ✔️                 |                                                        |
-| [SQL select](#querying-with-sql)                                       |  ️                 |                                                        |
+| [SQL select](#querying-with-sql)                                       | ✔️                 | Only support batch mode now.                           |
 | [SQL insert into](#insert-into)                                        | ✔️ ️               | Support both streaming and batch mode                  |
 | [SQL insert overwrite](#insert-overwrite)                              | ✔️ ️               |                                                        |
 | [DataStream read](#reading-with-datastream)                            | ✔️ ️               |                                                        |
@@ -97,7 +97,8 @@ CREATE CATALOG hive_catalog WITH (
   'catalog-type'='hive',
   'uri'='thrift://localhost:9083',
   'clients'='5',
-  'property-version'='1'
+  'property-version'='1',
+  'warehouse'='hdfs://nn:8020/warehouse/path'
 );
 ```
 
@@ -106,6 +107,8 @@ CREATE CATALOG hive_catalog WITH (
 * `uri`: The Hive metastore's thrift URI. (Required)
 * `clients`: The Hive metastore client pool size, default value is 2. (Optional)
 * `property-version`: Version number to describe the property version. This property can be used for backwards compatibility in case the property format changes. The currently property version is `1`. (Optional)
+* `warehouse`: The Hive warehouse location, users should specify this path if neither set the `hive-conf-dir` to specify a location containing a `hive-site.xml` configuration file nor add a correct `hive-site.xml` to classpath.
+* `hive-conf-dir`: Path to a directory containing a `hive-site.xml` configuration file which will be used to provide custom Hive configuration values. The value of `hive.metastore.warehouse.dir` from `<hive-conf-dir>/hive-site.xml` (or hive configure file from classpath) will be overwrote with the `warehouse` value if setting both `hive-conf-dir` and `warehouse` when creating iceberg catalog.
 
 Iceberg also supports a directory-based catalog in HDFS that can be configured using `'catalog-type'='hadoop'`:
 
@@ -187,11 +190,13 @@ DROP TABLE hive_catalog.default.sample;
 
 ## Querying with SQL
 
-Iceberg does not support streaming read or batch read in flink now, it's still working in-progress.
-
-## Writing with SQL
+Iceberg does not support streaming read in flink now, it's still working in-progress. But it support batch read to scan the existing records in iceberg table.
 
-Iceberg support both `INSERT INTO` and `INSERT OVERWRITE` in flink 1.11 now.
+```sql
+-- Execute the flink job in streaming mode for current session context
+SET execution.type = batch ;
+SELECT * FROM sample       ;
+```
 
 Notice: we could execute the following sql command to switch the execute type from 'streaming' mode to 'batch' mode, and vice versa:
 
@@ -203,6 +208,10 @@ SET execution.type = streaming
 SET execution.type = batch
 ```
 
+## Writing with SQL
+
+Iceberg support both `INSERT INTO` and `INSERT OVERWRITE` in flink 1.11 now.
+
 ### `INSERT INTO`
 
 To append new data to a table with a flink streaming job, use `INSERT INTO`:
@@ -293,4 +302,4 @@ There are some features that we do not yet support in the current flink iceberg
 * Don't support creating iceberg table with computed column.
 * Don't support creating iceberg table with watermark.
 * Don't support adding columns, removing columns, renaming columns, changing columns. [FLINK-19062](https://issues.apache.org/jira/browse/FLINK-19062) is tracking this.
-* Don't support flink read iceberg table in batch or streaming mode. [#1346](https://github.com/apache/iceberg/pull/1346) and [#1293](https://github.com/apache/iceberg/pull/1293) are tracking this. 
\ No newline at end of file
+* Don't support flink read iceberg table in streaming mode. [#1383](https://github.com/apache/iceberg/issues/1383) is tracking this.