You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/06 17:15:32 UTC
[flink] branch master updated: [FLINK-12239][hive] Support table
related operations in GenericHiveMetastoreCatalog
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new cf20197 [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
cf20197 is described below
commit cf20197f1c8c51eab028da7c477dd6c7ad96db2f
Author: bowen.li <bo...@gmail.com>
AuthorDate: Wed May 1 14:26:35 2019 -0700
[FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
This PR enables GenericHiveMetastoreCatalog to operate Flink tables by using Hive metastore as a storage. Flink tables will be stored as Hive tables in metastore, and GenericHiveMetastoreCatalog can convert between Flink and Hive tables upon read and write.
This closes #8329
---
flink-connectors/flink-connector-hive/pom.xml | 5 +-
.../catalog/hive/GenericHiveMetastoreCatalog.java | 114 +++++++-
.../hive/GenericHiveMetastoreCatalogUtil.java | 160 ++++++++++-
.../table/catalog/hive/HiveCatalogBaseUtil.java | 58 ++++
.../flink/table/catalog/hive/HiveTableConfig.java | 16 +-
.../flink/table/catalog/hive/HiveTypeUtil.java | 140 ++++++++++
.../src/main/resources/META-INF/NOTICE | 3 +
.../hive/GenericHiveMetastoreCatalogTest.java | 96 ++++++-
.../flink/table/catalog/hive/HiveTestUtils.java | 4 +-
.../flink/table/catalog/GenericCatalogTable.java | 17 +-
.../table/catalog/GenericInMemoryCatalogTest.java | 298 ++-------------------
.../flink/table/catalog/CatalogBaseTable.java | 14 +-
.../flink/table/catalog/ReadableCatalog.java | 12 +-
.../table/catalog/ReadableWritableCatalog.java | 7 +-
.../exceptions/DatabaseNotEmptyException.java | 2 +-
.../flink/table/catalog/CatalogTestBase.java | 297 +++++++++++++++++++-
.../flink/table/catalog/CatalogTestUtil.java | 10 +-
17 files changed, 900 insertions(+), 353 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index cb09934..1b672d4 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -197,10 +197,6 @@ under the License.
<artifactId>metrics-json</artifactId>
</exclusion>
<exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </exclusion>
- <exclusion>
<groupId>com.github.joshelser</groupId>
<artifactId>dropwizard-metrics-hadoop-metrics2-reporter</artifactId>
</exclusion>
@@ -387,6 +383,7 @@ under the License.
<include>commons-dbcp:commons-dbcp</include>
<include>commons-pool:commons-pool</include>
<include>commons-beanutils:commons-beanutils</include>
+ <include>com.fasterxml.jackson.core:*</include>
<include>com.jolbox:bonecp</include>
<include>org.apache.hive:*</include>
<include>org.apache.thrift:libthrift</include>
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
index a8fcf62..50ed2e9 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
@@ -49,6 +49,8 @@ import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -229,31 +231,97 @@ public class GenericHiveMetastoreCatalog implements ReadableWritableCatalog {
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ try {
+ client.dropTable(
+ tablePath.getDatabaseName(),
+ tablePath.getObjectName(),
+ // Indicate whether associated data should be deleted.
+ // Set to 'true' for now because Flink tables shouldn't have data in Hive. Can be changed later if necessary
+ true,
+ ignoreIfNotExists);
+ } catch (NoSuchObjectException e) {
+ if (!ignoreIfNotExists) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to drop table %s", tablePath.getFullName()), e);
+ }
}
@Override
public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
- throws TableNotExistException, TableAlreadyExistException, DatabaseNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ throws TableNotExistException, TableAlreadyExistException, CatalogException {
+ try {
+ // alter_table() doesn't throw a clear exception when target table doesn't exist. Thus, check the table existence explicitly
+ if (tableExists(tablePath)) {
+ ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName);
+ // alter_table() doesn't throw a clear exception when new table already exists. Thus, check the table existence explicitly
+ if (tableExists(newPath)) {
+ throw new TableAlreadyExistException(catalogName, newPath);
+ } else {
+ Table table = getHiveTable(tablePath);
+ table.setTableName(newTableName);
+ client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table);
+ }
+ } else if (!ignoreIfNotExists) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to rename table %s", tablePath.getFullName()), e);
+ }
}
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ if (!databaseExists(tablePath.getDatabaseName())) {
+ throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName());
+ } else {
+ try {
+ client.createTable(GenericHiveMetastoreCatalogUtil.createHiveTable(tablePath, table));
+ } catch (AlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new TableAlreadyExistException(catalogName, tablePath);
+ }
+ } catch (TException e) {
+ throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e);
+ }
+ }
}
@Override
- public void alterTable(ObjectPath tableName, CatalogBaseTable newTable, boolean ignoreIfNotExists)
+ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ if (!tableExists(tablePath)) {
+ if (!ignoreIfNotExists) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+ } else {
+ // IMetastoreClient.alter_table() requires the table to have a valid location, which it doesn't in this case
+ // Thus we have to translate alterTable() into (dropTable() + createTable())
+ dropTable(tablePath, false);
+ try {
+ createTable(tablePath, newTable, false);
+ } catch (TableAlreadyExistException | DatabaseNotExistException e) {
+ // These exceptions wouldn't be thrown, unless a concurrent operation is triggered in Hive
+ throw new CatalogException(
+ String.format("Failed to alter table %s", tablePath), e);
+ }
+ }
}
@Override
- public List<String> listTables(String databaseName)
- throws DatabaseNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
+ try {
+ return client.getAllTables(databaseName);
+ } catch (UnknownDBException e) {
+ throw new DatabaseNotExistException(catalogName, databaseName);
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to list tables in database %s", databaseName), e);
+ }
}
@Override
@@ -262,13 +330,33 @@ public class GenericHiveMetastoreCatalog implements ReadableWritableCatalog {
}
@Override
- public CatalogBaseTable getTable(ObjectPath objectPath) throws TableNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+ Table hiveTable = getHiveTable(tablePath);
+
+ return GenericHiveMetastoreCatalogUtil.createCatalogTable(hiveTable);
+ }
+
+ protected Table getHiveTable(ObjectPath tablePath) throws TableNotExistException {
+ try {
+ return client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+ } catch (NoSuchObjectException e) {
+ throw new TableNotExistException(catalogName, tablePath);
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to get table %s from Hive metastore", tablePath.getFullName()), e);
+ }
}
@Override
- public boolean tableExists(ObjectPath objectPath) throws CatalogException {
- throw new UnsupportedOperationException();
+ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+ try {
+ return client.tableExists(tablePath.getDatabaseName(), tablePath.getObjectName());
+ } catch (UnknownDBException e) {
+ return false;
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to check whether table %s exists or not.", tablePath.getFullName()), e);
+ }
}
// ------ partitions ------
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
index 779905a..0e564f5 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
@@ -18,32 +18,178 @@
package org.apache.flink.table.catalog.hive;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.plan.stats.TableStats;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-
+import java.util.stream.Collectors;
/**
* Utils to convert meta objects between Flink and Hive for GenericHiveMetastoreCatalog.
*/
public class GenericHiveMetastoreCatalogUtil {
+ // Prefix used to distinguish properties created by Hive and Flink,
+ // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore.
+ private static final String FLINK_PROPERTY_PREFIX = "flink.";
+
+ // Flink tables should be stored as 'external' tables in Hive metastore
+ private static final Map<String, String> EXTERNAL_TABLE_PROPERTY = new HashMap<String, String>() {{
+ put("EXTERNAL", "TRUE");
+ }};
+
private GenericHiveMetastoreCatalogUtil() {
}
// ------ Utils ------
/**
- * Creates a Hive database from CatalogDatabase.
+ * Creates a Hive database from a CatalogDatabase.
+ *
+ * @param databaseName name of the database
+ * @param catalogDatabase the CatalogDatabase instance
+ * @return a Hive database
*/
- public static Database createHiveDatabase(String dbName, CatalogDatabase db) {
- Map<String, String> props = db.getProperties();
+ public static Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase) {
return new Database(
- dbName,
- db.getDescription().isPresent() ? db.getDescription().get() : null,
+ databaseName,
+ catalogDatabase.getDescription().isPresent() ? catalogDatabase.getDescription().get() : null,
null,
- props);
+ catalogDatabase.getProperties());
+ }
+
+ /**
+ * Creates a Hive table from a CatalogBaseTable.
+ * TODO: [FLINK-12240] Support view related operations in GenericHiveMetastoreCatalog
+ *
+ * @param tablePath path of the table
+ * @param table the CatalogBaseTable instance
+ * @return a Hive table
+ */
+ public static Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) {
+ Map<String, String> properties = new HashMap<>(table.getProperties());
+
+ // Table comment
+ properties.put(HiveTableConfig.TABLE_COMMENT, table.getComment());
+
+ Table hiveTable = new Table();
+ hiveTable.setDbName(tablePath.getDatabaseName());
+ hiveTable.setTableName(tablePath.getObjectName());
+ hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
+
+ // Table properties
+ hiveTable.setParameters(buildFlinkProperties(properties));
+ hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY);
+
+ // Hive table's StorageDescriptor
+ StorageDescriptor sd = new StorageDescriptor();
+ sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
+
+ List<FieldSchema> allColumns = createHiveColumns(table.getSchema());
+
+ // Table columns and partition keys
+ if (table instanceof CatalogTable) {
+ CatalogTable catalogTable = (CatalogTable) table;
+
+ if (catalogTable.isPartitioned()) {
+ int partitionKeySize = catalogTable.getPartitionKeys().size();
+ List<FieldSchema> regularColumns = allColumns.subList(0, allColumns.size() - partitionKeySize);
+ List<FieldSchema> partitionColumns = allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
+
+ sd.setCols(regularColumns);
+ hiveTable.setPartitionKeys(partitionColumns);
+ } else {
+ sd.setCols(allColumns);
+ hiveTable.setPartitionKeys(new ArrayList<>());
+ }
+
+ hiveTable.setSd(sd);
+ } else {
+ // TODO: [FLINK-12240] Support view related operations in GenericHiveMetastoreCatalog
+ throw new UnsupportedOperationException();
+ }
+
+ return hiveTable;
+ }
+
+ /**
+ * Creates a CatalogBaseTable from a Hive table.
+ * TODO: [FLINK-12240] Support view related operations in GenericHiveMetastoreCatalog
+ *
+ * @param hiveTable the Hive table
+ * @return a CatalogBaseTable
+ */
+ public static CatalogBaseTable createCatalogTable(Table hiveTable) {
+ // Table schema
+ TableSchema tableSchema = HiveCatalogBaseUtil.createTableSchema(
+ hiveTable.getSd().getCols(), hiveTable.getPartitionKeys());
+
+ // Table properties
+ Map<String, String> properties = retrieveFlinkProperties(hiveTable.getParameters());
+
+ // Table comment
+ String comment = properties.remove(HiveTableConfig.TABLE_COMMENT);
+
+ // Partition keys
+ List<String> partitionKeys = new ArrayList<>();
+
+ if (hiveTable.getPartitionKeys() != null && hiveTable.getPartitionKeys().isEmpty()) {
+ partitionKeys = hiveTable.getPartitionKeys().stream()
+ .map(fs -> fs.getName())
+ .collect(Collectors.toList());
+ }
+
+ return new GenericCatalogTable(
+ tableSchema, new TableStats(0), partitionKeys, properties, comment);
+ }
+
+ /**
+ * Create Hive columns from Flink TableSchema.
+ */
+ private static List<FieldSchema> createHiveColumns(TableSchema schema) {
+ String[] fieldNames = schema.getFieldNames();
+ TypeInformation[] fieldTypes = schema.getFieldTypes();
+
+ List<FieldSchema> columns = new ArrayList<>(fieldNames.length);
+
+ for (int i = 0; i < fieldNames.length; i++) {
+ columns.add(
+ new FieldSchema(fieldNames[i], HiveTypeUtil.toHiveType(fieldTypes[i]), null));
+ }
+
+ return columns;
+ }
+
+ /**
+ * Filter out Hive-created properties, and return Flink-created properties.
+ */
+ private static Map<String, String> retrieveFlinkProperties(Map<String, String> hiveTableParams) {
+ return hiveTableParams.entrySet().stream()
+ .filter(e -> e.getKey().startsWith(FLINK_PROPERTY_PREFIX))
+ .collect(Collectors.toMap(e -> e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
+ }
+
+ /**
+ * Add a prefix to Flink-created properties to distinguish them from Hive-created properties.
+ */
+ public static Map<String, String> buildFlinkProperties(Map<String, String> properties) {
+ return properties.entrySet().stream()
+ .filter(e -> e.getKey() != null && e.getValue() != null)
+ .collect(Collectors.toMap(e -> FLINK_PROPERTY_PREFIX + e.getKey(), e -> e.getValue()));
}
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBaseUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBaseUtil.java
new file mode 100644
index 0000000..3f93b4a
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBaseUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Shared util for catalogs backed by Hive-metastore.
+ */
+public class HiveCatalogBaseUtil {
+
+ /**
+ * Create a Flink's TableSchema from Hive table's columns and partition keys.
+ *
+ * @param cols columns of the Hive table
+ * @param partitionKeys partition keys of the Hive table
+ * @return a Flink TableSchema
+ */
+ public static TableSchema createTableSchema(List<FieldSchema> cols, List<FieldSchema> partitionKeys) {
+ List<FieldSchema> allCols = new ArrayList<>(cols);
+ allCols.addAll(partitionKeys);
+
+ String[] colNames = new String[allCols.size()];
+ TypeInformation[] colTypes = new TypeInformation[allCols.size()];
+
+ for (int i = 0; i < allCols.size(); i++) {
+ FieldSchema fs = allCols.get(i);
+
+ colNames[i] = fs.getName();
+ colTypes[i] = HiveTypeUtil.toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(fs.getType()));
+ }
+
+ return new TableSchema(colNames, colTypes);
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
similarity index 61%
copy from flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java
copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
index 91cf133..336d16f 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
@@ -16,20 +16,14 @@
* limitations under the License.
*/
-package org.apache.flink.table.catalog.exceptions;
+package org.apache.flink.table.catalog.hive;
/**
- * Exception for trying to drop on a database that is not empty.
- *
+ * Configs for Flink tables stored in Hive metastore.
*/
-public class DatabaseNotEmptyException extends Exception {
- private static final String MSG = "Database %s in Catalog %s is not empty.";
+public class HiveTableConfig {
- public DatabaseNotEmptyException(String catalog, String database, Throwable cause) {
- super(String.format(MSG, database, catalog), cause);
- }
+ // Description of the Flink table
+ public static final String TABLE_COMMENT = "comment";
- public DatabaseNotEmptyException(String catalog, String database) {
- this(catalog, database, null);
- }
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTypeUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTypeUtil.java
new file mode 100644
index 0000000..c665d6c
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTypeUtil.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+/**
+ * Utils to convert data types between Flink and Hive.
+ */
+public class HiveTypeUtil {
+
+ // Note: Need to keep this in sync with BaseSemanticAnalyzer::getTypeStringFromAST
+ private static final String HIVE_ARRAY_TYPE_NAME_FORMAT = serdeConstants.LIST_TYPE_NAME + "<%s>";
+
+ private HiveTypeUtil() {
+ }
+
+ /**
+ * Convert Flink data type to Hive data type.
+ * TODO: the following Hive types are not supported in Flink yet, including CHAR, VARCHAR, DECIMAL, MAP, STRUCT
+ * [FLINK-12386] Support complete mapping between Flink and Hive data types
+ *
+ * @param type a Flink data type
+ * @return the corresponding Hive data type
+ */
+ public static String toHiveType(TypeInformation type) {
+ if (type == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+ return serdeConstants.BOOLEAN_TYPE_NAME;
+ } else if (type == BasicTypeInfo.BYTE_TYPE_INFO) {
+ return serdeConstants.TINYINT_TYPE_NAME;
+ } else if (type == BasicTypeInfo.SHORT_TYPE_INFO) {
+ return serdeConstants.SMALLINT_TYPE_NAME;
+ } else if (type == BasicTypeInfo.INT_TYPE_INFO) {
+ return serdeConstants.INT_TYPE_NAME;
+ } else if (type == BasicTypeInfo.LONG_TYPE_INFO) {
+ return serdeConstants.BIGINT_TYPE_NAME;
+ } else if (type == BasicTypeInfo.FLOAT_TYPE_INFO) {
+ return serdeConstants.FLOAT_TYPE_NAME;
+ } else if (type == BasicTypeInfo.DOUBLE_TYPE_INFO) {
+ return serdeConstants.DOUBLE_TYPE_NAME;
+ } else if (type == BasicTypeInfo.STRING_TYPE_INFO) {
+ return serdeConstants.STRING_TYPE_NAME;
+ } else if (type == BasicTypeInfo.DATE_TYPE_INFO) {
+ return serdeConstants.DATE_TYPE_NAME;
+ } else if (type == BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO) {
+ return serdeConstants.BINARY_TYPE_NAME;
+ } else if (type instanceof SqlTimeTypeInfo) {
+ return serdeConstants.TIMESTAMP_TYPE_NAME;
+ } else if (type instanceof BasicArrayTypeInfo) {
+ return toHiveArrayType((BasicArrayTypeInfo) type);
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("Flink doesn't support converting type %s to Hive type yet.", type.toString()));
+ }
+ }
+
+ private static String toHiveArrayType(BasicArrayTypeInfo arrayTypeInfo) {
+ return String.format(HIVE_ARRAY_TYPE_NAME_FORMAT, toHiveType(arrayTypeInfo.getComponentInfo()));
+ }
+
+ /**
+ * Convert Hive data type to a Flink data type.
+ * TODO: the following Hive types are not supported in Flink yet, including CHAR, VARCHAR, DECIMAL, MAP, STRUCT
+ * [FLINK-12386] Support complete mapping between Flink and Hive data types
+ *
+ * @param hiveType a Hive data type
+ * @return the corresponding Flink data type
+ */
+ public static TypeInformation toFlinkType(TypeInfo hiveType) {
+ switch (hiveType.getCategory()) {
+ case PRIMITIVE:
+ return toFlinkPrimitiveType((PrimitiveTypeInfo) hiveType);
+ case LIST:
+ ListTypeInfo listTypeInfo = (ListTypeInfo) hiveType;
+ return BasicArrayTypeInfo.getInfoFor(toFlinkType(listTypeInfo.getListElementTypeInfo()).getTypeClass());
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Flink doesn't support Hive data type %s yet.", hiveType));
+ }
+ }
+
+ // TODO: the following Hive types are not supported in Flink yet, including CHAR, VARCHAR, DECIMAL, MAP, STRUCT
+ // [FLINK-12386] Support complete mapping between Flink and Hive data types
+ private static TypeInformation toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) {
+ switch (hiveType.getPrimitiveCategory()) {
+ // For CHAR(p) and VARCHAR(p) types, map them to String for now because Flink doesn't yet support them.
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ return BasicTypeInfo.STRING_TYPE_INFO;
+ case BOOLEAN:
+ return BasicTypeInfo.BOOLEAN_TYPE_INFO;
+ case BYTE:
+ return BasicTypeInfo.BYTE_TYPE_INFO;
+ case SHORT:
+ return BasicTypeInfo.SHORT_TYPE_INFO;
+ case INT:
+ return BasicTypeInfo.INT_TYPE_INFO;
+ case LONG:
+ return BasicTypeInfo.LONG_TYPE_INFO;
+ case FLOAT:
+ return BasicTypeInfo.FLOAT_TYPE_INFO;
+ case DOUBLE:
+ return BasicTypeInfo.DOUBLE_TYPE_INFO;
+ case DATE:
+ return BasicTypeInfo.DATE_TYPE_INFO;
+ case TIMESTAMP:
+ return SqlTimeTypeInfo.TIMESTAMP;
+ case BINARY:
+ return BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Flink doesn't support Hive primitive type %s yet", hiveType));
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE
index ea1c160..b67affe 100644
--- a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE
+++ b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE
@@ -8,6 +8,9 @@ This project bundles the following dependencies under the Apache Software Licens
- commons-dbcp:commons-dbcp:1.4
- commons-pool:commons-pool:1.5.4
+- com.fasterxml.jackson.core:jackson-annotations:2.6.0
+- com.fasterxml.jackson.core:jackson-core:2.6.5
+- com.fasterxml.jackson.core:jackson-databind:2.6.5
- com.jolbox:bonecp:0.8.0.RELEASE
- org.apache.hive:hive-common:2.3.4
- org.apache.hive:hive-metastore:2.3.4
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
index 315d657..2687699 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
@@ -18,10 +18,18 @@
package org.apache.flink.table.catalog.hive;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTestBase;
+import org.apache.flink.table.catalog.CatalogTestUtil;
import org.apache.flink.table.catalog.GenericCatalogDatabase;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.plan.stats.TableStats;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -40,14 +48,47 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
catalog.open();
}
- // =====================
- // GenericHiveMetastoreCatalog doesn't support table operation yet
- // Thus, overriding the following tests which involve table operation in CatalogTestBase so they won't run against GenericHiveMetastoreCatalog
- // =====================
+ // ------ data types ------
- // TODO: re-enable this test once GenericHiveMetastoreCatalog support table operations
@Test
- public void testDropDb_DatabaseNotEmptyException() throws Exception {
+ public void testDataTypes() throws Exception {
+ // TODO: the following Hive types are not supported in Flink yet, including CHAR, VARCHAR, DECIMAL, MAP, STRUCT
+ // [FLINK-12386] Support complete mapping between Flink and Hive data types
+ TypeInformation[] types = new TypeInformation[] {
+ BasicTypeInfo.BYTE_TYPE_INFO,
+ BasicTypeInfo.SHORT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.FLOAT_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO,
+ BasicTypeInfo.BOOLEAN_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO,
+ BasicTypeInfo.DATE_TYPE_INFO,
+ SqlTimeTypeInfo.TIMESTAMP
+ };
+
+ verifyDataTypes(types);
+ }
+
+ private void verifyDataTypes(TypeInformation[] types) throws Exception {
+ String[] colNames = new String[types.length];
+
+ for (int i = 0; i < types.length; i++) {
+ colNames[i] = types[i].toString().toLowerCase() + "_col";
+ }
+
+ CatalogTable table = new GenericCatalogTable(
+ new TableSchema(colNames, types),
+ new TableStats(0),
+ getBatchTableProperties(),
+ TEST_COMMENT
+ );
+
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, table, false);
+
+ CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
}
// ------ utils ------
@@ -77,13 +118,48 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
@Override
public CatalogTable createTable() {
- // TODO: implement this once GenericHiveMetastoreCatalog support table operations
- return null;
+ return new GenericCatalogTable(
+ createTableSchema(),
+ new TableStats(0),
+ getBatchTableProperties(),
+ TEST_COMMENT);
}
@Override
public CatalogTable createAnotherTable() {
- // TODO: implement this once GenericHiveMetastoreCatalog support table operations
- return null;
+ return new GenericCatalogTable(
+ createAnotherTableSchema(),
+ new TableStats(0),
+ getBatchTableProperties(),
+ TEST_COMMENT);
+ }
+
+ @Override
+ public CatalogTable createStreamingTable() {
+ return new GenericCatalogTable(
+ createTableSchema(),
+ new TableStats(0),
+ getStreamingTableProperties(),
+ TEST_COMMENT);
+ }
+
+ @Override
+ public CatalogTable createPartitionedTable() {
+ return new GenericCatalogTable(
+ createTableSchema(),
+ new TableStats(0),
+ createPartitionKeys(),
+ getBatchTableProperties(),
+ TEST_COMMENT);
+ }
+
+ @Override
+ public CatalogTable createAnotherPartitionedTable() {
+ return new GenericCatalogTable(
+ createAnotherTableSchema(),
+ new TableStats(0),
+ createPartitionKeys(),
+ getBatchTableProperties(),
+ TEST_COMMENT);
}
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
index 83f5bed..4a32313 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -18,6 +18,8 @@
package org.apache.flink.table.catalog.hive;
+import org.apache.flink.table.catalog.CatalogTestBase;
+
import org.apache.hadoop.hive.conf.HiveConf;
import org.junit.rules.TemporaryFolder;
@@ -35,7 +37,7 @@ public class HiveTestUtils {
* Create a GenericHiveMetastoreCatalog with an embedded Hive Metastore.
*/
public static GenericHiveMetastoreCatalog createGenericHiveMetastoreCatalog() throws IOException {
- return new GenericHiveMetastoreCatalog("test", getHiveConf());
+ return new GenericHiveMetastoreCatalog(CatalogTestBase.TEST_CATALOG_NAME, getHiveConf());
}
private static HiveConf getHiveConf() throws IOException {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
index 47498e7..73c2dbc 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
@@ -61,8 +61,8 @@ public class GenericCatalogTable implements CatalogTable {
TableSchema tableSchema,
TableStats tableStats,
Map<String, String> properties,
- String comment) {
- this(tableSchema, tableStats, new ArrayList<>(), properties, comment);
+ String description) {
+ this(tableSchema, tableStats, new ArrayList<>(), properties, description);
}
@Override
@@ -91,6 +91,11 @@ public class GenericCatalogTable implements CatalogTable {
}
@Override
+ public String getComment() {
+ return comment;
+ }
+
+ @Override
public GenericCatalogTable copy() {
return new GenericCatalogTable(
this.tableSchema.copy(), this.tableStats.copy(), new ArrayList<>(partitionKeys), new HashMap<>(this.properties), comment);
@@ -106,12 +111,4 @@ public class GenericCatalogTable implements CatalogTable {
return Optional.of("This is a catalog table in an im-memory catalog");
}
- public String getComment() {
- return this.comment;
- }
-
- public void setComment(String comment) {
- this.comment = comment;
- }
-
}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index f88db0d..40a0d9f 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -18,9 +18,6 @@
package org.apache.flink.table.catalog;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
@@ -40,7 +37,6 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -62,15 +58,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
@After
public void close() throws Exception {
- if (catalog.tableExists(path1)) {
- catalog.dropTable(path1, true);
- }
- if (catalog.tableExists(path2)) {
- catalog.dropTable(path2, true);
- }
- if (catalog.tableExists(path3)) {
- catalog.dropTable(path3, true);
- }
if (catalog.functionExists(path1)) {
catalog.dropFunction(path1, true);
}
@@ -79,108 +66,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
// ------ tables ------
@Test
- public void testCreateTable_Streaming() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- CatalogTable table = createStreamingTable();
- catalog.createTable(path1, table, false);
-
- CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
- }
-
- @Test
- public void testCreateTable_Batch() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
-
- // Non-partitioned table
- CatalogTable table = createTable();
- catalog.createTable(path1, table, false);
-
- CatalogBaseTable tableCreated = catalog.getTable(path1);
-
- CatalogTestUtil.checkEquals(table, (CatalogTable) tableCreated);
- assertEquals(TABLE_COMMENT, tableCreated.getDescription().get());
-
- List<String> tables = catalog.listTables(db1);
-
- assertEquals(1, tables.size());
- assertEquals(path1.getObjectName(), tables.get(0));
-
- catalog.dropTable(path1, false);
-
- // Partitioned table
- table = createPartitionedTable();
- catalog.createTable(path1, table, false);
-
- CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
-
- tables = catalog.listTables(db1);
-
- assertEquals(1, tables.size());
- assertEquals(path1.getObjectName(), tables.get(0));
- }
-
- @Test
- public void testCreateTable_DatabaseNotExistException() throws Exception {
- assertFalse(catalog.databaseExists(db1));
-
- exception.expect(DatabaseNotExistException.class);
- exception.expectMessage("Database db1 does not exist in Catalog");
- catalog.createTable(nonExistObjectPath, createTable(), false);
- }
-
- @Test
- public void testCreateTable_TableAlreadyExistException() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.createTable(path1, createTable(), false);
-
- exception.expect(TableAlreadyExistException.class);
- exception.expectMessage("Table (or view) db1.t1 already exists in Catalog");
- catalog.createTable(path1, createTable(), false);
- }
-
- @Test
- public void testCreateTable_TableAlreadyExist_ignored() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
-
- CatalogTable table = createTable();
- catalog.createTable(path1, table, false);
-
- CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
-
- catalog.createTable(path1, createAnotherTable(), true);
-
- CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
- }
-
- @Test
- public void testGetTable_TableNotExistException() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
-
- exception.expect(TableNotExistException.class);
- exception.expectMessage("Table (or view) db1.nonexist does not exist in Catalog");
- catalog.getTable(nonExistObjectPath);
- }
-
- @Test
- public void testGetTable_TableNotExistException_NoDb() throws Exception {
- exception.expect(TableNotExistException.class);
- exception.expectMessage("Table (or view) db1.nonexist does not exist in Catalog");
- catalog.getTable(nonExistObjectPath);
- }
-
- @Test
- public void testDropTable_nonPartitionedTable() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.createTable(path1, createTable(), false);
-
- assertTrue(catalog.tableExists(path1));
-
- catalog.dropTable(path1, false);
-
- assertFalse(catalog.tableExists(path1));
- }
-
- @Test
public void testDropTable_partitionedTable() throws Exception {
catalog.createDatabase(db1, createDb(), false);
catalog.createTable(path1, createPartitionedTable(), false);
@@ -214,78 +99,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
}
@Test
- public void testDropTable_TableNotExistException() throws Exception {
- exception.expect(TableNotExistException.class);
- exception.expectMessage("Table (or view) non.exist does not exist in Catalog");
- catalog.dropTable(nonExistDbPath, false);
- }
-
- @Test
- public void testDropTable_TableNotExist_ignored() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.dropTable(nonExistObjectPath, true);
- }
-
- @Test
- public void testAlterTable() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
-
- // Non-partitioned table
- CatalogTable table = createTable();
- catalog.createTable(path1, table, false);
-
- CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
-
- CatalogTable newTable = createAnotherTable();
- catalog.alterTable(path1, newTable, false);
-
- assertNotEquals(table, catalog.getTable(path1));
- CatalogTestUtil.checkEquals(newTable, (CatalogTable) catalog.getTable(path1));
-
- catalog.dropTable(path1, false);
-
- // Partitioned table
- table = createPartitionedTable();
- catalog.createTable(path1, table, false);
-
- CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
-
- newTable = createAnotherPartitionedTable();
- catalog.alterTable(path1, newTable, false);
-
- CatalogTestUtil.checkEquals(newTable, (CatalogTable) catalog.getTable(path1));
- }
-
- @Test
- public void testAlterTable_TableNotExistException() throws Exception {
- exception.expect(TableNotExistException.class);
- exception.expectMessage("Table (or view) non.exist does not exist in Catalog");
- catalog.alterTable(nonExistDbPath, createTable(), false);
- }
-
- @Test
- public void testAlterTable_TableNotExist_ignored() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.alterTable(nonExistObjectPath, createTable(), true);
-
- assertFalse(catalog.tableExists(nonExistObjectPath));
- }
-
- @Test
- public void testRenameTable_nonPartitionedTable() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- CatalogTable table = createTable();
- catalog.createTable(path1, table, false);
-
- CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
-
- catalog.renameTable(path1, t2, false);
-
- CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path3));
- assertFalse(catalog.tableExists(path1));
- }
-
- @Test
public void testRenameTable_partitionedTable() throws Exception {
catalog.createDatabase(db1, createDb(), false);
CatalogTable table = createPartitionedTable();
@@ -305,44 +118,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
assertFalse(catalog.partitionExists(path1, catalogPartitionSpec));
}
- @Test
- public void testRenameTable_TableNotExistException() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
-
- exception.expect(TableNotExistException.class);
- exception.expectMessage("Table (or view) db1.t1 does not exist in Catalog");
- catalog.renameTable(path1, t2, false);
- }
-
- @Test
- public void testRenameTable_TableNotExistException_ignored() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- catalog.renameTable(path1, t2, true);
- }
-
- @Test
- public void testRenameTable_TableAlreadyExistException() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
- CatalogTable table = createTable();
- catalog.createTable(path1, table, false);
- catalog.createTable(path3, createAnotherTable(), false);
-
- exception.expect(TableAlreadyExistException.class);
- exception.expectMessage("Table (or view) db1.t2 already exists in Catalog");
- catalog.renameTable(path1, t2, false);
- }
-
- @Test
- public void testTableExists() throws Exception {
- catalog.createDatabase(db1, createDb(), false);
-
- assertFalse(catalog.tableExists(path1));
-
- catalog.createTable(path1, createTable(), false);
-
- assertTrue(catalog.tableExists(path1));
- }
-
// ------ views ------
@Test
@@ -951,24 +726,29 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
@Override
public CatalogDatabase createDb() {
- return new GenericCatalogDatabase(new HashMap<String, String>() {{
- put("k1", "v1");
- }}, TEST_COMMENT);
+ return new GenericCatalogDatabase(
+ new HashMap<String, String>() {{
+ put("k1", "v1");
+ }},
+ TEST_COMMENT);
}
@Override
public CatalogDatabase createAnotherDb() {
- return new GenericCatalogDatabase(new HashMap<String, String>() {{
- put("k2", "v2");
- }}, "this is another database.");
+ return new GenericCatalogDatabase(
+ new HashMap<String, String>() {{
+ put("k2", "v2");
+ }},
+ "this is another database.");
}
- private GenericCatalogTable createStreamingTable() {
+ @Override
+ public GenericCatalogTable createStreamingTable() {
return new GenericCatalogTable(
createTableSchema(),
new TableStats(0),
getStreamingTableProperties(),
- TABLE_COMMENT);
+ TEST_COMMENT);
}
@Override
@@ -977,7 +757,7 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
createTableSchema(),
new TableStats(0),
getBatchTableProperties(),
- TABLE_COMMENT);
+ TEST_COMMENT);
}
@Override
@@ -986,29 +766,27 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
createAnotherTableSchema(),
new TableStats(0),
getBatchTableProperties(),
- TABLE_COMMENT);
+ TEST_COMMENT);
}
- protected CatalogTable createPartitionedTable() {
+ @Override
+ public CatalogTable createPartitionedTable() {
return new GenericCatalogTable(
createTableSchema(),
new TableStats(0),
createPartitionKeys(),
getBatchTableProperties(),
- TABLE_COMMENT);
+ TEST_COMMENT);
}
- protected CatalogTable createAnotherPartitionedTable() {
+ @Override
+ public CatalogTable createAnotherPartitionedTable() {
return new GenericCatalogTable(
createAnotherTableSchema(),
new TableStats(0),
createPartitionKeys(),
getBatchTableProperties(),
- TABLE_COMMENT);
- }
-
- private List<String> createPartitionKeys() {
- return Arrays.asList("second", "third");
+ TEST_COMMENT);
}
private CatalogPartitionSpec createPartitionSpec() {
@@ -1053,40 +831,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
return new GenericCatalogPartition(props);
}
- private Map<String, String> getBatchTableProperties() {
- return new HashMap<String, String>() {{
- put(IS_STREAMING, "false");
- }};
- }
-
- private Map<String, String> getStreamingTableProperties() {
- return new HashMap<String, String>() {{
- put(IS_STREAMING, "true");
- }};
- }
-
- private TableSchema createTableSchema() {
- return new TableSchema(
- new String[] {"first", "second", "third"},
- new TypeInformation[] {
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- }
- );
- }
-
- private TableSchema createAnotherTableSchema() {
- return new TableSchema(
- new String[] {"first2", "second", "third"},
- new TypeInformation[] {
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO
- }
- );
- }
-
private CatalogView createView() {
return new GenericCatalogView(
String.format("select * from %s", t1),
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
index f916354..e2157e0 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
@@ -30,18 +30,28 @@ import java.util.Optional;
public interface CatalogBaseTable {
/**
* Get the properties of the table.
- * @return table property map
+ *
+ * @return property map of the table/view
*/
Map<String, String> getProperties();
/**
* Get the schema of the table.
- * @return schema of the table
+ *
+ * @return schema of the table/view.
*/
TableSchema getSchema();
/**
+ * Get comment of the table or view.
+ *
+ * @return comment of the table/view.
+ */
+ String getComment();
+
+ /**
* Get a deep copy of the CatalogBaseTable instance.
+ *
* @return a copy of the CatalogBaseTable instance
*/
CatalogBaseTable copy();
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
index a8a69f2..5586348 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
@@ -96,6 +96,8 @@ public interface ReadableCatalog {
*/
boolean databaseExists(String databaseName) throws CatalogException;
+ // ------ tables and views ------
+
/**
* Get names of all tables and views under this database. An empty list is returned if none exists.
*
@@ -116,24 +118,24 @@ public interface ReadableCatalog {
List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException;
/**
- * Get a CatalogTable or CatalogView identified by objectPath.
+ * Get a CatalogTable or CatalogView identified by tablePath.
*
- * @param objectPath Path of the table or view
+ * @param tablePath Path of the table or view
* @return The requested table or view
* @throws TableNotExistException if the target does not exist
* @throws CatalogException in case of any runtime exception
*/
- CatalogBaseTable getTable(ObjectPath objectPath) throws TableNotExistException, CatalogException;
+ CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException;
/**
* Check if a table or view exists in this catalog.
*
- * @param objectPath Path of the table or view
+ * @param tablePath Path of the table or view
* @return true if the given table exists in the catalog
* false otherwise
* @throws CatalogException in case of any runtime exception
*/
- boolean tableExists(ObjectPath objectPath) throws CatalogException;
+ boolean tableExists(ObjectPath tablePath) throws CatalogException;
// ------ partitions ------
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
index 04d9bcb..60bc93d 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
@@ -102,11 +102,10 @@ public interface ReadableWritableCatalog extends ReadableCatalog {
* if set to false, throw an exception,
* if set to true, do nothing.
* @throws TableNotExistException if the table does not exist
- * @throws DatabaseNotExistException if the database in tablePath to doesn't exist
* @throws CatalogException in case of any runtime exception
*/
void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
- throws TableNotExistException, TableAlreadyExistException, DatabaseNotExistException, CatalogException;
+ throws TableNotExistException, TableAlreadyExistException, CatalogException;
/**
* Create a new table or view.
@@ -128,7 +127,7 @@ public interface ReadableWritableCatalog extends ReadableCatalog {
* Note that the new and old CatalogBaseTable must be of the same type. For example, this doesn't
* allow alter a regular table to partitioned table, or alter a view to a table, and vice versa.
*
- * @param tableName path of the table or view to be modified
+ * @param tablePath path of the table or view to be modified
* @param newTable the new table definition
* @param ignoreIfNotExists flag to specify behavior when the table or view does not exist:
* if set to false, throw an exception,
@@ -136,7 +135,7 @@ public interface ReadableWritableCatalog extends ReadableCatalog {
* @throws TableNotExistException if the table does not exist
* @throws CatalogException in case of any runtime exception
*/
- void alterTable(ObjectPath tableName, CatalogBaseTable newTable, boolean ignoreIfNotExists)
+ void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException;
// ------ partitions ------
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java
index 91cf133..423d141 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotEmptyException.java
@@ -23,7 +23,7 @@ package org.apache.flink.table.catalog.exceptions;
*
*/
public class DatabaseNotEmptyException extends Exception {
- private static final String MSG = "Database %s in Catalog %s is not empty.";
+ private static final String MSG = "Database %s in catalog %s is not empty.";
public DatabaseNotEmptyException(String catalog, String database, Throwable cause) {
super(String.format(MSG, database, catalog), cause);
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index 5457d34..8117c82 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -18,9 +18,14 @@
package org.apache.flink.table.catalog;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.junit.After;
import org.junit.AfterClass;
@@ -29,11 +34,14 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
/**
@@ -55,9 +63,9 @@ public abstract class CatalogTestBase {
protected final ObjectPath nonExistDbPath = ObjectPath.fromString("non.exist");
protected final ObjectPath nonExistObjectPath = ObjectPath.fromString("db1.nonexist");
- protected static final String TEST_CATALOG_NAME = "test-catalog";
+ public static final String TEST_CATALOG_NAME = "test-catalog";
+
protected static final String TEST_COMMENT = "test comment";
- protected static final String TABLE_COMMENT = "This is my batch table";
protected static ReadableWritableCatalog catalog;
@@ -66,6 +74,16 @@ public abstract class CatalogTestBase {
@After
public void cleanup() throws Exception {
+ if (catalog.tableExists(path1)) {
+ catalog.dropTable(path1, true);
+ }
+ if (catalog.tableExists(path2)) {
+ catalog.dropTable(path2, true);
+ }
+ if (catalog.tableExists(path3)) {
+ catalog.dropTable(path3, true);
+ }
+
if (catalog.databaseExists(db1)) {
catalog.dropDatabase(db1, true);
}
@@ -168,7 +186,7 @@ public abstract class CatalogTestBase {
catalog.createTable(path1, createTable(), false);
exception.expect(DatabaseNotEmptyException.class);
- exception.expectMessage("Database db1 in Catalog test-catalog is not empty");
+ exception.expectMessage("Database db1 in catalog test-catalog is not empty");
catalog.dropDatabase(db1, true);
}
@@ -209,6 +227,220 @@ public abstract class CatalogTestBase {
assertTrue(catalog.databaseExists(db1));
}
+ // ------ tables ------
+
+ @Test
+ public void testCreateTable_Streaming() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ CatalogTable table = createStreamingTable();
+ catalog.createTable(path1, table, false);
+
+ CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
+ }
+
+ @Test
+ public void testCreateTable_Batch() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+
+ // Non-partitioned table
+ CatalogTable table = createTable();
+ catalog.createTable(path1, table, false);
+
+ CatalogBaseTable tableCreated = catalog.getTable(path1);
+
+ CatalogTestUtil.checkEquals(table, (CatalogTable) tableCreated);
+ assertEquals(TEST_COMMENT, tableCreated.getDescription().get());
+
+ List<String> tables = catalog.listTables(db1);
+
+ assertEquals(1, tables.size());
+ assertEquals(path1.getObjectName(), tables.get(0));
+
+ catalog.dropTable(path1, false);
+
+ // Partitioned table
+ table = createPartitionedTable();
+ catalog.createTable(path1, table, false);
+
+ CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
+
+ tables = catalog.listTables(db1);
+
+ assertEquals(1, tables.size());
+ assertEquals(path1.getObjectName(), tables.get(0));
+ }
+
+ @Test
+ public void testCreateTable_DatabaseNotExistException() throws Exception {
+ assertFalse(catalog.databaseExists(db1));
+
+ exception.expect(DatabaseNotExistException.class);
+ exception.expectMessage("Database db1 does not exist in Catalog");
+ catalog.createTable(nonExistObjectPath, createTable(), false);
+ }
+
+ @Test
+ public void testCreateTable_TableAlreadyExistException() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createTable(), false);
+
+ exception.expect(TableAlreadyExistException.class);
+ exception.expectMessage("Table (or view) db1.t1 already exists in Catalog");
+ catalog.createTable(path1, createTable(), false);
+ }
+
+ @Test
+ public void testCreateTable_TableAlreadyExist_ignored() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+
+ CatalogTable table = createTable();
+ catalog.createTable(path1, table, false);
+
+ CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
+
+ catalog.createTable(path1, createAnotherTable(), true);
+
+ CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
+ }
+
+ @Test
+ public void testGetTable_TableNotExistException() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+
+ exception.expect(TableNotExistException.class);
+ exception.expectMessage("Table (or view) db1.nonexist does not exist in Catalog");
+ catalog.getTable(nonExistObjectPath);
+ }
+
+ @Test
+ public void testGetTable_TableNotExistException_NoDb() throws Exception {
+ exception.expect(TableNotExistException.class);
+ exception.expectMessage("Table (or view) db1.nonexist does not exist in Catalog");
+ catalog.getTable(nonExistObjectPath);
+ }
+
+ @Test
+ public void testDropTable_nonPartitionedTable() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createTable(), false);
+
+ assertTrue(catalog.tableExists(path1));
+
+ catalog.dropTable(path1, false);
+
+ assertFalse(catalog.tableExists(path1));
+ }
+
+ @Test
+ public void testDropTable_TableNotExistException() throws Exception {
+ exception.expect(TableNotExistException.class);
+ exception.expectMessage("Table (or view) non.exist does not exist in Catalog");
+ catalog.dropTable(nonExistDbPath, false);
+ }
+
+ @Test
+ public void testDropTable_TableNotExist_ignored() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.dropTable(nonExistObjectPath, true);
+ }
+
+ @Test
+ public void testAlterTable() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+
+ // Non-partitioned table
+ CatalogTable table = createTable();
+ catalog.createTable(path1, table, false);
+
+ CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
+
+ CatalogTable newTable = createAnotherTable();
+ catalog.alterTable(path1, newTable, false);
+
+ assertNotEquals(table, catalog.getTable(path1));
+ CatalogTestUtil.checkEquals(newTable, (CatalogTable) catalog.getTable(path1));
+
+ catalog.dropTable(path1, false);
+
+ // Partitioned table
+ table = createPartitionedTable();
+ catalog.createTable(path1, table, false);
+
+ CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
+
+ newTable = createAnotherPartitionedTable();
+ catalog.alterTable(path1, newTable, false);
+
+ CatalogTestUtil.checkEquals(newTable, (CatalogTable) catalog.getTable(path1));
+ }
+
+ @Test
+ public void testAlterTable_TableNotExistException() throws Exception {
+ exception.expect(TableNotExistException.class);
+ exception.expectMessage("Table (or view) non.exist does not exist in Catalog");
+ catalog.alterTable(nonExistDbPath, createTable(), false);
+ }
+
+ @Test
+ public void testAlterTable_TableNotExist_ignored() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.alterTable(nonExistObjectPath, createTable(), true);
+
+ assertFalse(catalog.tableExists(nonExistObjectPath));
+ }
+
+ @Test
+ public void testRenameTable_nonPartitionedTable() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ CatalogTable table = createTable();
+ catalog.createTable(path1, table, false);
+
+ CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
+
+ catalog.renameTable(path1, t2, false);
+
+ CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path3));
+ assertFalse(catalog.tableExists(path1));
+ }
+
+ @Test
+ public void testRenameTable_TableNotExistException() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+
+ exception.expect(TableNotExistException.class);
+ exception.expectMessage("Table (or view) db1.t1 does not exist in Catalog");
+ catalog.renameTable(path1, t2, false);
+ }
+
+ @Test
+ public void testRenameTable_TableNotExistException_ignored() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.renameTable(path1, t2, true);
+ }
+
+ @Test
+ public void testRenameTable_TableAlreadyExistException() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ CatalogTable table = createTable();
+ catalog.createTable(path1, table, false);
+ catalog.createTable(path3, createAnotherTable(), false);
+
+ exception.expect(TableAlreadyExistException.class);
+ exception.expectMessage("Table (or view) db1.t2 already exists in Catalog");
+ catalog.renameTable(path1, t2, false);
+ }
+
+ @Test
+ public void testTableExists() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+
+ assertFalse(catalog.tableExists(path1));
+
+ catalog.createTable(path1, createTable(), false);
+
+ assertTrue(catalog.tableExists(path1));
+ }
+
// ------ utilities ------
/**
@@ -245,4 +477,63 @@ public abstract class CatalogTestBase {
* @return another CatalogTable instance
*/
public abstract CatalogTable createAnotherTable();
+
+ /**
+ * Create a streaming CatalogTable instance by specific catalog implementation.
+ *
+ * @return a streaming CatalogTable instance
+ */
+ public abstract CatalogTable createStreamingTable();
+
+ /**
+ * Create a partitioned CatalogTable instance by specific catalog implementation.
+ *
+ * @return a streaming CatalogTable instance
+ */
+ public abstract CatalogTable createPartitionedTable();
+
+ /**
+ * Create another partitioned CatalogTable instance by specific catalog implementation.
+ *
+ * @return another partitioned CatalogTable instance
+ */
+ public abstract CatalogTable createAnotherPartitionedTable();
+
+ protected TableSchema createTableSchema() {
+ return new TableSchema(
+ new String[] {"first", "second", "third"},
+ new TypeInformation[] {
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ }
+ );
+ }
+
+ protected TableSchema createAnotherTableSchema() {
+ return new TableSchema(
+ new String[] {"first2", "second", "third"},
+ new TypeInformation[] {
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO
+ }
+ );
+ }
+
+ protected List<String> createPartitionKeys() {
+ return Arrays.asList("second", "third");
+ }
+
+ protected Map<String, String> getBatchTableProperties() {
+ return new HashMap<String, String>() {{
+ put(IS_STREAMING, "false");
+ }};
+ }
+
+ protected Map<String, String> getStreamingTableProperties() {
+ return new HashMap<String, String>() {{
+ put(IS_STREAMING, "true");
+ }};
+ }
}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index 0700736..57341fe 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -33,26 +33,26 @@ public class CatalogTestUtil {
assertEquals(t1.getDescription(), t2.getDescription());
}
- protected static void checkEquals(TableStats ts1, TableStats ts2) {
+ public static void checkEquals(TableStats ts1, TableStats ts2) {
assertEquals(ts1.getRowCount(), ts2.getRowCount());
assertEquals(ts1.getColumnStats().size(), ts2.getColumnStats().size());
}
- protected static void checkEquals(CatalogView v1, CatalogView v2) {
+ public static void checkEquals(CatalogView v1, CatalogView v2) {
assertEquals(v1.getOriginalQuery(), v2.getOriginalQuery());
assertEquals(v1.getExpandedQuery(), v2.getExpandedQuery());
}
- protected static void checkEquals(CatalogDatabase d1, CatalogDatabase d2) {
+ public static void checkEquals(CatalogDatabase d1, CatalogDatabase d2) {
assertEquals(d1.getProperties(), d2.getProperties());
}
- protected static void checkEquals(CatalogFunction f1, CatalogFunction f2) {
+ public static void checkEquals(CatalogFunction f1, CatalogFunction f2) {
assertEquals(f1.getClassName(), f2.getClassName());
assertEquals(f1.getProperties(), f2.getProperties());
}
- protected static void checkEquals(CatalogPartition p1, CatalogPartition p2) {
+ public static void checkEquals(CatalogPartition p1, CatalogPartition p2) {
assertEquals(p1.getProperties(), p2.getProperties());
}
}