You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/10/23 21:28:57 UTC
[iceberg] branch master updated: Flink: Load hive-site.xml for
HiveCatalog (#1586)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new d132474 Flink: Load hive-site.xml for HiveCatalog (#1586)
d132474 is described below
commit d132474e7c8706f504a47f75b58e5a58d0bef9ff
Author: openinx <op...@gmail.com>
AuthorDate: Sat Oct 24 05:28:49 2020 +0800
Flink: Load hive-site.xml for HiveCatalog (#1586)
---
.../org/apache/iceberg/flink/CatalogLoader.java | 19 +++-
.../apache/iceberg/flink/FlinkCatalogFactory.java | 40 +++++++-
.../apache/iceberg/flink/FlinkCatalogTestBase.java | 106 +++++++++------------
.../org/apache/iceberg/flink/FlinkTestBase.java | 50 ++++++++--
.../iceberg/flink/TestCatalogTableLoader.java | 4 +-
.../iceberg/flink/TestFlinkCatalogDatabase.java | 2 +
.../iceberg/flink/TestFlinkCatalogTable.java | 1 +
.../apache/iceberg/flink/TestFlinkHiveCatalog.java | 101 ++++++++++++++++++++
.../apache/iceberg/flink/TestFlinkTableSink.java | 2 +
.../source/TestFlinkInputFormatReaderDeletes.java | 1 +
.../java/org/apache/iceberg/hive/HiveCatalog.java | 25 +++--
site/docs/flink.md | 23 +++--
12 files changed, 280 insertions(+), 94 deletions(-)
diff --git a/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java b/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
index 2088a22..4d0670c 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
@@ -32,14 +32,21 @@ import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
*/
public interface CatalogLoader extends Serializable {
+ /**
+ * Create a new catalog with the provided properties. NOTICE: for flink, we may initialize the {@link CatalogLoader}
+ * at flink sql client side or job manager side, and then serialize this catalog loader to task manager, finally
+ * deserialize it and create a new catalog at task manager side.
+ *
+ * @return a newly created {@link Catalog}
+ */
Catalog loadCatalog();
static CatalogLoader hadoop(String name, Configuration hadoopConf, String warehouseLocation) {
return new HadoopCatalogLoader(name, hadoopConf, warehouseLocation);
}
- static CatalogLoader hive(String name, Configuration hadoopConf, String uri, int clientPoolSize) {
- return new HiveCatalogLoader(name, hadoopConf, uri, clientPoolSize);
+ static CatalogLoader hive(String name, Configuration hadoopConf, String uri, String warehouse, int clientPoolSize) {
+ return new HiveCatalogLoader(name, hadoopConf, uri, warehouse, clientPoolSize);
}
class HadoopCatalogLoader implements CatalogLoader {
@@ -71,18 +78,21 @@ public interface CatalogLoader extends Serializable {
private final String catalogName;
private final SerializableConfiguration hadoopConf;
private final String uri;
+ private final String warehouse;
private final int clientPoolSize;
- private HiveCatalogLoader(String catalogName, Configuration conf, String uri, int clientPoolSize) {
+ private HiveCatalogLoader(String catalogName, Configuration conf, String uri, String warehouse,
+ int clientPoolSize) {
this.catalogName = catalogName;
this.hadoopConf = new SerializableConfiguration(conf);
this.uri = uri;
+ this.warehouse = warehouse;
this.clientPoolSize = clientPoolSize;
}
@Override
public Catalog loadCatalog() {
- return new HiveCatalog(catalogName, uri, clientPoolSize, hadoopConf.get());
+ return new HiveCatalog(catalogName, uri, warehouse, clientPoolSize, hadoopConf.get());
}
@Override
@@ -90,6 +100,7 @@ public interface CatalogLoader extends Serializable {
return MoreObjects.toStringHelper(this)
.add("catalogName", catalogName)
.add("uri", uri)
+ .add("warehouse", warehouse)
.add("clientPoolSize", clientPoolSize)
.toString();
}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
index 75c779e..9484a96 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
@@ -19,6 +19,9 @@
package org.apache.iceberg.flink;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -27,7 +30,10 @@ import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.descriptors.CatalogDescriptorValidator;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -54,7 +60,8 @@ public class FlinkCatalogFactory implements CatalogFactory {
public static final String ICEBERG_CATALOG_TYPE = "catalog-type";
public static final String HIVE_URI = "uri";
public static final String HIVE_CLIENT_POOL_SIZE = "clients";
- public static final String HADOOP_WAREHOUSE_LOCATION = "warehouse";
+ public static final String HIVE_CONF_DIR = "hive-conf-dir";
+ public static final String WAREHOUSE_LOCATION = "warehouse";
public static final String DEFAULT_DATABASE = "default-database";
public static final String BASE_NAMESPACE = "base-namespace";
@@ -71,12 +78,17 @@ public class FlinkCatalogFactory implements CatalogFactory {
String catalogType = properties.getOrDefault(ICEBERG_CATALOG_TYPE, "hive");
switch (catalogType) {
case "hive":
- int clientPoolSize = Integer.parseInt(properties.getOrDefault(HIVE_CLIENT_POOL_SIZE, "2"));
+ // The values of properties 'uri', 'warehouse', 'hive-conf-dir' are allowed to be null, in that case it will
+ // fallback to parse those values from hadoop configuration which is loaded from classpath.
String uri = properties.get(HIVE_URI);
- return CatalogLoader.hive(name, hadoopConf, uri, clientPoolSize);
+ String warehouse = properties.get(WAREHOUSE_LOCATION);
+ int clientPoolSize = Integer.parseInt(properties.getOrDefault(HIVE_CLIENT_POOL_SIZE, "2"));
+ String hiveConfDir = properties.get(HIVE_CONF_DIR);
+ Configuration newHadoopConf = mergeHiveConf(hadoopConf, hiveConfDir);
+ return CatalogLoader.hive(name, newHadoopConf, uri, warehouse, clientPoolSize);
case "hadoop":
- String warehouseLocation = properties.get(HADOOP_WAREHOUSE_LOCATION);
+ String warehouseLocation = properties.get(WAREHOUSE_LOCATION);
return CatalogLoader.hadoop(name, hadoopConf, warehouseLocation);
default:
@@ -98,7 +110,8 @@ public class FlinkCatalogFactory implements CatalogFactory {
properties.add(ICEBERG_CATALOG_TYPE);
properties.add(HIVE_URI);
properties.add(HIVE_CLIENT_POOL_SIZE);
- properties.add(HADOOP_WAREHOUSE_LOCATION);
+ properties.add(HIVE_CONF_DIR);
+ properties.add(WAREHOUSE_LOCATION);
properties.add(DEFAULT_DATABASE);
properties.add(BASE_NAMESPACE);
return properties;
@@ -119,6 +132,23 @@ public class FlinkCatalogFactory implements CatalogFactory {
return new FlinkCatalog(name, defaultDatabase, baseNamespace, catalogLoader, cacheEnabled);
}
+ private static Configuration mergeHiveConf(Configuration hadoopConf, String hiveConfDir) {
+ Configuration newConf = new Configuration(hadoopConf);
+ if (!Strings.isNullOrEmpty(hiveConfDir)) {
+ Preconditions.checkState(Files.exists(Paths.get(hiveConfDir, "hive-site.xml")),
+ "There should be a hive-site.xml file under the directory %s", hiveConfDir);
+ newConf.addResource(new Path(hiveConfDir, "hive-site.xml"));
+ } else {
+ // If don't provide the hive-site.xml path explicitly, it will try to load resource from classpath. If still
+ // couldn't load the configuration file, then it will throw exception in HiveCatalog.
+ URL configFile = CatalogLoader.class.getClassLoader().getResource("hive-site.xml");
+ if (configFile != null) {
+ newConf.addResource(configFile);
+ }
+ }
+ return newConf;
+ }
+
public static Configuration clusterHadoopConf() {
return HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
index 009931e..3a5d827 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
@@ -19,19 +19,10 @@
package org.apache.iceberg.flink;
-import java.io.File;
import java.io.IOException;
-import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.IntStream;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableResult;
-import org.apache.flink.types.Row;
import org.apache.flink.util.ArrayUtils;
-import org.apache.flink.util.CloseableIterator;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -39,9 +30,11 @@ import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.After;
import org.junit.AfterClass;
-import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -49,19 +42,29 @@ import org.junit.runners.Parameterized;
public abstract class FlinkCatalogTestBase extends FlinkTestBase {
protected static final String DATABASE = "db";
- private static File warehouse = null;
+ private static TemporaryFolder hiveWarehouse = new TemporaryFolder();
+ private static TemporaryFolder hadoopWarehouse = new TemporaryFolder();
@BeforeClass
public static void createWarehouse() throws IOException {
- FlinkCatalogTestBase.warehouse = File.createTempFile("warehouse", null);
- Assert.assertTrue(warehouse.delete());
+ hiveWarehouse.create();
+ hadoopWarehouse.create();
}
@AfterClass
public static void dropWarehouse() {
- if (warehouse != null && warehouse.exists()) {
- warehouse.delete();
- }
+ hiveWarehouse.delete();
+ hadoopWarehouse.delete();
+ }
+
+ @Before
+ public void before() {
+ sql("CREATE CATALOG %s WITH %s", catalogName, toWithClause(config));
+ }
+
+ @After
+ public void clean() {
+ sql("DROP CATALOG IF EXISTS %s", catalogName);
}
@Parameterized.Parameters(name = "catalogName = {0} baseNamespace = {1}")
@@ -76,12 +79,11 @@ public abstract class FlinkCatalogTestBase extends FlinkTestBase {
);
}
- private volatile TableEnvironment tEnv = null;
-
protected final String catalogName;
protected final String[] baseNamespace;
protected final Catalog validationCatalog;
protected final SupportsNamespaces validationNamespaceCatalog;
+ private final Map<String, String> config = Maps.newHashMap();
protected final String flinkDatabase;
protected final Namespace icebergNamespace;
@@ -92,62 +94,44 @@ public abstract class FlinkCatalogTestBase extends FlinkTestBase {
this.baseNamespace = baseNamespace;
this.isHadoopCatalog = catalogName.startsWith("testhadoop");
this.validationCatalog = isHadoopCatalog ?
- new HadoopCatalog(hiveConf, "file:" + warehouse) :
+ new HadoopCatalog(hiveConf, "file:" + hadoopWarehouse.getRoot()) :
catalog;
this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog;
- Map<String, String> config = Maps.newHashMap();
config.put("type", "iceberg");
- config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, isHadoopCatalog ? "hadoop" : "hive");
- config.put(FlinkCatalogFactory.HADOOP_WAREHOUSE_LOCATION, "file:" + warehouse);
if (baseNamespace.length > 0) {
config.put(FlinkCatalogFactory.BASE_NAMESPACE, Joiner.on(".").join(baseNamespace));
}
-
- FlinkCatalogFactory factory = new FlinkCatalogFactory() {
- @Override
- protected org.apache.flink.table.catalog.Catalog createCatalog(
- String name, Map<String, String> properties, Configuration hadoopConf) {
- return super.createCatalog(name, properties, hiveConf);
- }
- };
- getTableEnv().registerCatalog(
- catalogName,
- flinkCatalogs.computeIfAbsent(catalogName, k -> factory.createCatalog(k, config)));
+ if (isHadoopCatalog) {
+ config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hadoop");
+ config.put(FlinkCatalogFactory.WAREHOUSE_LOCATION, "file://" + hadoopWarehouse.getRoot());
+ } else {
+ config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive");
+ config.put(FlinkCatalogFactory.WAREHOUSE_LOCATION, "file://" + hiveWarehouse.getRoot());
+ config.put(FlinkCatalogFactory.HIVE_URI, getURI(hiveConf));
+ }
this.flinkDatabase = catalogName + "." + DATABASE;
- this.icebergNamespace = Namespace.of(ArrayUtils.concat(baseNamespace, new String[] { DATABASE }));
+ this.icebergNamespace = Namespace.of(ArrayUtils.concat(baseNamespace, new String[] {DATABASE}));
}
- protected TableEnvironment getTableEnv() {
- if (tEnv == null) {
- synchronized (this) {
- if (tEnv == null) {
- this.tEnv = TableEnvironment.create(EnvironmentSettings
- .newInstance()
- .useBlinkPlanner()
- .inBatchMode().build());
- }
- }
- }
- return tEnv;
+ static String getURI(HiveConf conf) {
+ return conf.get(HiveConf.ConfVars.METASTOREURIS.varname);
}
- public List<Object[]> sql(String query, Object... args) {
- TableResult tableResult = getTableEnv().executeSql(String.format(query, args));
- tableResult.getJobClient().ifPresent(c -> {
- try {
- c.getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
+ static String toWithClause(Map<String, String> props) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("(");
+ int propCount = 0;
+ for (Map.Entry<String, String> entry : props.entrySet()) {
+ if (propCount > 0) {
+ builder.append(",");
}
- });
- CloseableIterator<Row> iter = tableResult.collect();
- List<Object[]> results = Lists.newArrayList();
- while (iter.hasNext()) {
- Row row = iter.next();
- results.add(IntStream.range(0, row.getArity()).mapToObj(row::getField).toArray(Object[]::new));
+ builder.append("'").append(entry.getKey()).append("'").append("=")
+ .append("'").append(entry.getValue()).append("'");
+ propCount++;
}
- return results;
+ builder.append(")");
+ return builder.toString();
}
}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
index d92ad63..b680af7 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
@@ -19,13 +19,19 @@
package org.apache.iceberg.flink;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.flink.table.catalog.Catalog;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.IntStream;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.TestHiveMetastore;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -34,7 +40,8 @@ public abstract class FlinkTestBase extends AbstractTestBase {
private static TestHiveMetastore metastore = null;
protected static HiveConf hiveConf = null;
protected static HiveCatalog catalog = null;
- protected static ConcurrentMap<String, Catalog> flinkCatalogs;
+
+ private volatile TableEnvironment tEnv = null;
@BeforeClass
public static void startMetastore() {
@@ -42,7 +49,6 @@ public abstract class FlinkTestBase extends AbstractTestBase {
metastore.start();
FlinkTestBase.hiveConf = metastore.hiveConf();
FlinkTestBase.catalog = new HiveCatalog(metastore.hiveConf());
- flinkCatalogs = Maps.newConcurrentMap();
}
@AfterClass
@@ -50,7 +56,37 @@ public abstract class FlinkTestBase extends AbstractTestBase {
metastore.stop();
catalog.close();
FlinkTestBase.catalog = null;
- flinkCatalogs.values().forEach(Catalog::close);
- flinkCatalogs.clear();
+ }
+
+ protected TableEnvironment getTableEnv() {
+ if (tEnv == null) {
+ synchronized (this) {
+ if (tEnv == null) {
+ this.tEnv = TableEnvironment.create(EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inBatchMode().build());
+ }
+ }
+ }
+ return tEnv;
+ }
+
+ List<Object[]> sql(String query, Object... args) {
+ TableResult tableResult = getTableEnv().executeSql(String.format(query, args));
+ tableResult.getJobClient().ifPresent(c -> {
+ try {
+ c.getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ CloseableIterator<Row> iter = tableResult.collect();
+ List<Object[]> results = Lists.newArrayList();
+ while (iter.hasNext()) {
+ Row row = iter.next();
+ results.add(IntStream.range(0, row.getArity()).mapToObj(row::getField).toArray(Object[]::new));
+ }
+ return results;
}
}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java b/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
index 5681125..ce3d829 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
@@ -68,7 +68,7 @@ public class TestCatalogTableLoader extends FlinkTestBase {
@Test
public void testHiveCatalogLoader() throws IOException, ClassNotFoundException {
- CatalogLoader loader = CatalogLoader.hive("my_catalog", hiveConf, null, 2);
+ CatalogLoader loader = CatalogLoader.hive("my_catalog", hiveConf, null, null, 2);
validateCatalogLoader(loader);
}
@@ -81,7 +81,7 @@ public class TestCatalogTableLoader extends FlinkTestBase {
@Test
public void testHiveCatalogTableLoader() throws IOException, ClassNotFoundException {
- CatalogLoader catalogLoader = CatalogLoader.hive("my_catalog", hiveConf, null, 2);
+ CatalogLoader catalogLoader = CatalogLoader.hive("my_catalog", hiveConf, null, null, 2);
validateTableLoader(TableLoader.fromCatalog(catalogLoader, IDENTIFIER));
}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
index b74b14e..2695555 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
@@ -41,9 +41,11 @@ public class TestFlinkCatalogDatabase extends FlinkCatalogTestBase {
}
@After
+ @Override
public void clean() {
sql("DROP TABLE IF EXISTS %s.tl", flinkDatabase);
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+ super.clean();
}
@Test
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
index 934cbf8..a400f94 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
@@ -60,6 +60,7 @@ public class TestFlinkCatalogTable extends FlinkCatalogTestBase {
@Before
public void before() {
+ super.before();
sql("CREATE DATABASE %s", flinkDatabase);
sql("USE CATALOG %s", catalogName);
sql("USE %s", DATABASE);
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java
new file mode 100644
index 0000000..5a0f2a6
--- /dev/null
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestFlinkHiveCatalog extends FlinkTestBase {
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Test
+ public void testCreateCatalogWithWarehouseLocation() throws IOException {
+ Map<String, String> props = Maps.newHashMap();
+ props.put("type", "iceberg");
+ props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive");
+ props.put(FlinkCatalogFactory.HIVE_URI, FlinkCatalogTestBase.getURI(hiveConf));
+
+ File warehouseDir = tempFolder.newFolder();
+ props.put(FlinkCatalogFactory.WAREHOUSE_LOCATION, "file://" + warehouseDir.getAbsolutePath());
+
+ checkSQLQuery(props, warehouseDir);
+ }
+
+ @Test
+ public void testCreateCatalogWithHiveConfDir() throws IOException {
+ // Dump the hive conf into a local file.
+ File hiveConfDir = tempFolder.newFolder();
+ File hiveSiteXML = new File(hiveConfDir, "hive-site.xml");
+ File warehouseDir = tempFolder.newFolder();
+ try (FileOutputStream fos = new FileOutputStream(hiveSiteXML)) {
+ Configuration newConf = new Configuration(hiveConf);
+ // Set another new directory which is different with the hive metastore's warehouse path.
+ newConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, "file://" + warehouseDir.getAbsolutePath());
+ newConf.writeXml(fos);
+ }
+ Assert.assertTrue("hive-site.xml should be created now.", Files.exists(hiveSiteXML.toPath()));
+
+ // Construct the catalog attributions.
+ Map<String, String> props = Maps.newHashMap();
+ props.put("type", "iceberg");
+ props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive");
+ props.put(FlinkCatalogFactory.HIVE_URI, FlinkCatalogTestBase.getURI(hiveConf));
+ // Set the 'hive-conf-dir' instead of 'warehouse'
+ props.put(FlinkCatalogFactory.HIVE_CONF_DIR, hiveConfDir.getAbsolutePath());
+
+ checkSQLQuery(props, warehouseDir);
+ }
+
+ private void checkSQLQuery(Map<String, String> catalogProperties, File warehouseDir) throws IOException {
+ sql("CREATE CATALOG test_catalog WITH %s", FlinkCatalogTestBase.toWithClause(catalogProperties));
+ sql("USE CATALOG test_catalog");
+ sql("CREATE DATABASE test_db");
+ sql("USE test_db");
+ sql("CREATE TABLE test_table(c1 INT, c2 STRING)");
+ sql("INSERT INTO test_table SELECT 1, 'a'");
+
+ Path databasePath = warehouseDir.toPath().resolve("test_db.db");
+ Assert.assertTrue("Database path should exist", Files.exists(databasePath));
+
+ Path tablePath = databasePath.resolve("test_table");
+ Assert.assertTrue("Table path should exist", Files.exists(tablePath));
+
+ Path dataPath = tablePath.resolve("data");
+ Assert.assertTrue("Table data path should exist", Files.exists(dataPath));
+ Assert.assertEquals("Should have a .crc file and a .parquet file", 2, Files.list(dataPath).count());
+
+ sql("DROP TABLE test_table");
+ sql("DROP DATABASE test_db");
+ sql("DROP CATALOG test_catalog");
+ }
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
index 55918bc..f27f54c 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
@@ -91,6 +91,7 @@ public class TestFlinkTableSink extends FlinkCatalogTestBase {
@Before
public void before() {
+ super.before();
sql("CREATE DATABASE %s", flinkDatabase);
sql("USE CATALOG %s", catalogName);
sql("USE %s", DATABASE);
@@ -102,6 +103,7 @@ public class TestFlinkTableSink extends FlinkCatalogTestBase {
public void clean() {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME);
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+ super.clean();
}
@Test
diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java
index 74c81bc..c24a887 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java
@@ -108,6 +108,7 @@ public class TestFlinkInputFormatReaderDeletes extends DeleteReadTests {
CatalogLoader hiveCatalogLoader = CatalogLoader.hive(catalog.name(),
hiveConf,
hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname),
+ hiveConf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
hiveConf.getInt("iceberg.hive.client-pool-size", 5)
);
FlinkInputFormat inputFormat = FlinkSource.forRowData()
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index 3d5de20..d188da5 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -72,6 +72,10 @@ public class HiveCatalog extends BaseMetastoreCatalog implements Closeable, Supp
}
public HiveCatalog(String name, String uri, int clientPoolSize, Configuration conf) {
+ this(name, uri, null, clientPoolSize, conf);
+ }
+
+ public HiveCatalog(String name, String uri, String warehouse, int clientPoolSize, Configuration conf) {
this.name = name;
this.conf = new Configuration(conf);
// before building the client pool, overwrite the configuration's URIs if the argument is non-null
@@ -79,6 +83,10 @@ public class HiveCatalog extends BaseMetastoreCatalog implements Closeable, Supp
this.conf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri);
}
+ if (warehouse != null) {
+ this.conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehouse);
+ }
+
this.clients = new HiveClientPool(clientPoolSize, this.conf);
this.createStack = Thread.currentThread().getStackTrace();
this.closed = false;
@@ -422,10 +430,7 @@ public class HiveCatalog extends BaseMetastoreCatalog implements Closeable, Supp
}
// Otherwise stick to the {WAREHOUSE_DIR}/{DB_NAME}.db/{TABLE_NAME} path
- String warehouseLocation = conf.get("hive.metastore.warehouse.dir");
- Preconditions.checkNotNull(
- warehouseLocation,
- "Warehouse location is not set: hive.metastore.warehouse.dir=null");
+ String warehouseLocation = getWarehouseLocation();
return String.format(
"%s/%s.db/%s",
warehouseLocation,
@@ -433,6 +438,12 @@ public class HiveCatalog extends BaseMetastoreCatalog implements Closeable, Supp
tableIdentifier.name());
}
+ private String getWarehouseLocation() {
+ String warehouseLocation = conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
+ Preconditions.checkNotNull(warehouseLocation, "Warehouse location is not set: hive.metastore.warehouse.dir=null");
+ return warehouseLocation;
+ }
+
private Map<String, String> convertToMetadata(Database database) {
Map<String, String> meta = Maps.newHashMap();
@@ -447,17 +458,15 @@ public class HiveCatalog extends BaseMetastoreCatalog implements Closeable, Supp
}
Database convertToDatabase(Namespace namespace, Map<String, String> meta) {
- String warehouseLocation = conf.get("hive.metastore.warehouse.dir");
-
if (!isValidateNamespace(namespace)) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
}
- Database database = new Database();
+ Database database = new Database();
Map<String, String> parameter = Maps.newHashMap();
database.setName(namespace.level(0));
- database.setLocationUri(new Path(warehouseLocation, namespace.level(0)).toString() + ".db");
+ database.setLocationUri(new Path(getWarehouseLocation(), namespace.level(0)).toString() + ".db");
meta.forEach((key, value) -> {
if (key.equals("comment")) {
diff --git a/site/docs/flink.md b/site/docs/flink.md
index ddc0974..917bd3c 100644
--- a/site/docs/flink.md
+++ b/site/docs/flink.md
@@ -27,7 +27,7 @@ we only integrate iceberg with apache flink 1.11.x .
| [SQL create table](#create-table) | ✔️ | |
| [SQL alter table](#alter-table) | ✔️ | Only support altering table properties, Columns/PartitionKey changes are not supported now|
| [SQL drop_table](#drop-table) | ✔️ | |
-| [SQL select](#querying-with-sql) | ️ | |
+| [SQL select](#querying-with-sql) | ✔️ | Only support batch mode now. |
| [SQL insert into](#insert-into) | ✔️ ️ | Support both streaming and batch mode |
| [SQL insert overwrite](#insert-overwrite) | ✔️ ️ | |
| [DataStream read](#reading-with-datastream) | ✔️ ️ | |
@@ -97,7 +97,8 @@ CREATE CATALOG hive_catalog WITH (
'catalog-type'='hive',
'uri'='thrift://localhost:9083',
'clients'='5',
- 'property-version'='1'
+ 'property-version'='1',
+ 'warehouse'='hdfs://nn:8020/warehouse/path'
);
```
@@ -106,6 +107,8 @@ CREATE CATALOG hive_catalog WITH (
* `uri`: The Hive metastore's thrift URI. (Required)
* `clients`: The Hive metastore client pool size, default value is 2. (Optional)
* `property-version`: Version number to describe the property version. This property can be used for backwards compatibility in case the property format changes. The currently property version is `1`. (Optional)
+* `warehouse`: The Hive warehouse location, users should specify this path if neither set the `hive-conf-dir` to specify a location containing a `hive-site.xml` configuration file nor add a correct `hive-site.xml` to classpath.
+* `hive-conf-dir`: Path to a directory containing a `hive-site.xml` configuration file which will be used to provide custom Hive configuration values. The value of `hive.metastore.warehouse.dir` from `<hive-conf-dir>/hive-site.xml` (or hive configure file from classpath) will be overwrote with the `warehouse` value if setting both `hive-conf-dir` and `warehouse` when creating iceberg catalog.
Iceberg also supports a directory-based catalog in HDFS that can be configured using `'catalog-type'='hadoop'`:
@@ -187,11 +190,13 @@ DROP TABLE hive_catalog.default.sample;
## Querying with SQL
-Iceberg does not support streaming read or batch read in flink now, it's still working in-progress.
-
-## Writing with SQL
+Iceberg does not support streaming read in flink now, it's still working in-progress. But it support batch read to scan the existing records in iceberg table.
-Iceberg support both `INSERT INTO` and `INSERT OVERWRITE` in flink 1.11 now.
+```sql
+-- Execute the flink job in streaming mode for current session context
+SET execution.type = batch ;
+SELECT * FROM sample ;
+```
Notice: we could execute the following sql command to switch the execute type from 'streaming' mode to 'batch' mode, and vice versa:
@@ -203,6 +208,10 @@ SET execution.type = streaming
SET execution.type = batch
```
+## Writing with SQL
+
+Iceberg support both `INSERT INTO` and `INSERT OVERWRITE` in flink 1.11 now.
+
### `INSERT INTO`
To append new data to a table with a flink streaming job, use `INSERT INTO`:
@@ -293,4 +302,4 @@ There are some features that we do not yet support in the current flink iceberg
* Don't support creating iceberg table with computed column.
* Don't support creating iceberg table with watermark.
* Don't support adding columns, removing columns, renaming columns, changing columns. [FLINK-19062](https://issues.apache.org/jira/browse/FLINK-19062) is tracking this.
-* Don't support flink read iceberg table in batch or streaming mode. [#1346](https://github.com/apache/iceberg/pull/1346) and [#1293](https://github.com/apache/iceberg/pull/1293) are tracking this.
\ No newline at end of file
+* Don't support flink read iceberg table in streaming mode. [#1383](https://github.com/apache/iceberg/issues/1383) is tracking this.