You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/05/20 02:30:11 UTC

[GitHub] [flink] xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285414379
 
 

 ##########
 File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##########
 @@ -54,117 +75,494 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A catalog implementation for Hive.
+ * Base class for catalogs backed by Hive metastore.
  */
-public class HiveCatalog extends HiveCatalogBase {
+public class HiveCatalog implements Catalog {
 	private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);
+	private static final String DEFAULT_DB = "default";
 
-	public HiveCatalog(String catalogName, String hivemetastoreURI) {
-		super(catalogName, hivemetastoreURI);
+	// Prefix used to distinguish properties created by Hive and Flink,
+	// as Hive metastore has its own properties created upon table creation and migration between different versions of metastore.
+	private static final String FLINK_PROPERTY_PREFIX = "flink.";
+	private static final String GENERC_META_PROPERTY_KEY = "flink.is_generic";
 
-		LOG.info("Created HiveCatalog '{}'", catalogName);
+	protected final String catalogName;
+	protected final HiveConf hiveConf;
+
+	private final String defaultDatabase;
+	protected IMetaStoreClient client;
+
+	public HiveCatalog(String catalogName, String hivemetastoreURI) {
+		this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
 	}
 
 	public HiveCatalog(String catalogName, HiveConf hiveConf) {
-		super(catalogName, hiveConf);
+		this(catalogName, DEFAULT_DB, hiveConf);
+	}
+
+	public HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty");
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty");
+		this.catalogName = catalogName;
+		this.defaultDatabase = defaultDatabase;
+		this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null");
 
 		LOG.info("Created HiveCatalog '{}'", catalogName);
 	}
 
+	private static HiveConf getHiveConf(String hiveMetastoreURI) {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty");
+
+		HiveConf hiveConf = new HiveConf();
+		hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI);
+		return hiveConf;
+	}
+
+	private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+		try {
+			return RetryingMetaStoreClient.getProxy(
+				hiveConf,
+				null,
+				null,
+				HiveMetaStoreClient.class.getName(),
+				true);
+		} catch (MetaException e) {
+			throw new CatalogException("Failed to create Hive metastore client", e);
+		}
+	}
+
+	@Override
+	public void open() throws CatalogException {
+		if (client == null) {
+			client = getMetastoreClient(hiveConf);
+			LOG.info("Connected to Hive metastore");
+		}
+
+		if (!databaseExists(defaultDatabase)) {
+			throw new CatalogException(String.format("Configured default database %s doesn't exist in catalog %s.",
+				defaultDatabase, catalogName));
+		}
+	}
+
+	@Override
+	public void close() throws CatalogException {
+		if (client != null) {
+			client.close();
+			client = null;
+			LOG.info("Close connection to Hive metastore");
+		}
+	}
+
 	// ------ databases ------
 
+	public String getDefaultDatabase() throws CatalogException {
+		return defaultDatabase;
+	}
+
 	@Override
-	protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
-		return new HiveCatalogDatabase(
-			hiveDatabase.getParameters(),
-			hiveDatabase.getLocationUri(),
-			hiveDatabase.getDescription());
+	public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
+		Database hiveDatabase = getHiveDatabase(databaseName);
+
+		Map<String, String> properties = hiveDatabase.getParameters();
+		boolean isGeneric = Boolean.valueOf(properties.get(GENERC_META_PROPERTY_KEY));
 
 Review comment:
   This property is useful for users as well and I think we should keep it for user to see.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services