You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/12/03 17:35:23 UTC

[GitHub] [iceberg] jackye1995 commented on a change in pull request #1870: Iceberg Jdbc Catalog Implementation

jackye1995 commented on a change in pull request #1870:
URL: https://github.com/apache/iceberg/pull/1870#discussion_r535427137



##########
File path: core/src/main/java/org/apache/iceberg/hadoop/JdbcCatalog.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hadoop;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.handlers.BeanListHandler;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JdbcCatalog extends BaseMetastoreCatalog implements Configurable, Closeable {
+
+  public static final String JDBC_CATALOG_DEFAULT_NAME = "default_catalog";

Review comment:
       nit: Following the convention of hive and hadoop catalog, the default name should probably be "jdbc"

##########
File path: core/src/main/java/org/apache/iceberg/hadoop/JdbcCatalog.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hadoop;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.handlers.BeanListHandler;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JdbcCatalog extends BaseMetastoreCatalog implements Configurable, Closeable {
+
+  public static final String JDBC_CATALOG_DEFAULT_NAME = "default_catalog";
+  public static final String JDBC_CATALOG_TABLE_NAME = "iceberg_catalog";

Review comment:
       How do we incorporate the database concept in all SQL databases? Looks like we are just creating this table in the default database. My though on this is we can have a database named `iceberg` (maybe configurable), and inside it there can be a `namesapce` table to store namespace info, and this table (maybe with a different name like `tables`) to store all table information. Thoughts?

##########
File path: build.gradle
##########
@@ -226,6 +226,10 @@ project(':iceberg-core') {
     compile "com.fasterxml.jackson.core:jackson-databind"
     compile "com.fasterxml.jackson.core:jackson-core"
     compile "com.github.ben-manes.caffeine:caffeine"
+    compile 'commons-dbutils:commons-dbutils:1.7'
+    compile "org.hsqldb:hsqldb:2.5.1"
+    compile "mysql:mysql-connector-java:8.0.21"
+    compile "org.postgresql:postgresql:42.2.18"

Review comment:
       1. I only see the hsqldb used for testing, what are the use of mysql and postgres?
   2. should mark as `testCompile` instead
   3. versions should go to `versions.props`
   4. have you checked if the license is okay for hsqldb?

##########
File path: core/src/main/java/org/apache/iceberg/hadoop/JdbcCatalog.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hadoop;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.handlers.BeanListHandler;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JdbcCatalog extends BaseMetastoreCatalog implements Configurable, Closeable {
+
+  public static final String JDBC_CATALOG_DEFAULT_NAME = "default_catalog";
+  public static final String JDBC_CATALOG_TABLE_NAME = "iceberg_catalog";
+  public static final String JDB_CATALOG_TABLE_DDL = "CREATE TABLE IF NOT EXISTS " +
+          JDBC_CATALOG_TABLE_NAME + " ( " +
+          "catalogName VARCHAR(1255) NOT NULL," +
+          "tableNamespace VARCHAR(1255)," +
+          "tableName VARCHAR(1255) NOT NULL," +
+          "metadataLocation VARCHAR(66255)," +
+          "previousMetadataLocation VARCHAR(66255)," +
+          "PRIMARY KEY (catalogName,tableNamespace,tableName)  " +
+          ")";
+  public static final String JDBC_NAMESPACE_TABLE_LIST = "SELECT catalogName, tableNamespace, tableName, " +
+          "metadataLocation, previousMetadataLocation FROM " + JDBC_CATALOG_TABLE_NAME + " " +
+          "WHERE catalogName = ? AND tableNamespace = ?";
+  public static final String JDBC_TABLE_RENAME = "UPDATE " + JDBC_CATALOG_TABLE_NAME + " SET tableNamespace = ? , " +
+          "tableName = ? WHERE catalogName = ? AND tableNamespace = ? AND tableName = ? ";
+  public static final String JDBC_TABLE_DROP = "DELETE FROM " + JDBC_CATALOG_TABLE_NAME + " WHERE catalogName = ? " +
+          "AND tableNamespace = ? AND tableName = ? ";
+  public static final String JDBC_TABLE_SELECT = "SELECT catalogName, tableNamespace, tableName, metadataLocation, " +

Review comment:
       nit: why not just select *?

##########
File path: core/src/main/java/org/apache/iceberg/hadoop/JdbcCatalog.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hadoop;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.handlers.BeanListHandler;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JdbcCatalog extends BaseMetastoreCatalog implements Configurable, Closeable {
+
+  public static final String JDBC_CATALOG_DEFAULT_NAME = "default_catalog";
+  public static final String JDBC_CATALOG_TABLE_NAME = "iceberg_catalog";
+  public static final String JDB_CATALOG_TABLE_DDL = "CREATE TABLE IF NOT EXISTS " +
+          JDBC_CATALOG_TABLE_NAME + " ( " +
+          "catalogName VARCHAR(1255) NOT NULL," +
+          "tableNamespace VARCHAR(1255)," +
+          "tableName VARCHAR(1255) NOT NULL," +
+          "metadataLocation VARCHAR(66255)," +
+          "previousMetadataLocation VARCHAR(66255)," +
+          "PRIMARY KEY (catalogName,tableNamespace,tableName)  " +
+          ")";
+  public static final String JDBC_NAMESPACE_TABLE_LIST = "SELECT catalogName, tableNamespace, tableName, " +
+          "metadataLocation, previousMetadataLocation FROM " + JDBC_CATALOG_TABLE_NAME + " " +
+          "WHERE catalogName = ? AND tableNamespace = ?";
+  public static final String JDBC_TABLE_RENAME = "UPDATE " + JDBC_CATALOG_TABLE_NAME + " SET tableNamespace = ? , " +
+          "tableName = ? WHERE catalogName = ? AND tableNamespace = ? AND tableName = ? ";
+  public static final String JDBC_TABLE_DROP = "DELETE FROM " + JDBC_CATALOG_TABLE_NAME + " WHERE catalogName = ? " +
+          "AND tableNamespace = ? AND tableName = ? ";
+  public static final String JDBC_TABLE_SELECT = "SELECT catalogName, tableNamespace, tableName, metadataLocation, " +
+          "previousMetadataLocation FROM " + JDBC_CATALOG_TABLE_NAME + " WHERE catalogName = ? " +
+          "AND tableNamespace = ? AND tableName = ? ";
+  public static final String JDBC_TABLE_INSERT = "INSERT INTO " + JDBC_CATALOG_TABLE_NAME + " (catalogName, " +
+          "tableNamespace, tableName, metadataLocation, previousMetadataLocation) VALUES (?,?,?,?,?)";
+  public static final String JDBC_TABLE_UPDATE_METADATA = "UPDATE " + JDBC_CATALOG_TABLE_NAME + " " +
+          "SET metadataLocation = ? , previousMetadataLocation = ? WHERE catalogName = ? AND tableNamespace = ? " +
+          "AND tableName = ? ";
+
+  public static final String JDBC_CATALOG_JDBC_DRIVER = "jdbccatalog.jdbcdriver";
+  public static final String JDBC_CATALOG_DBURL = "jdbccatalog.dburl";
+  public static final String JDBC_CATALOG_USER = "jdbccatalog.user";
+  public static final String JDBC_CATALOG_PASSWORD = "jdbccatalog.password";
+
+  private static final Logger LOG = LoggerFactory.getLogger(JdbcCatalog.class);
+  private final QueryRunner queryRunner = new QueryRunner();
+  private FileIO fileIO;
+  private String catalogName = JDBC_CATALOG_DEFAULT_NAME;
+  private String warehouseLocation;
+  private Configuration hadoopConf;
+  private Connection jdbcConnection;
+
+
+  public JdbcCatalog() {
+  }
+
+  private void createCatalogIfNotExists() throws SQLException {
+    queryRunner.execute(jdbcConnection, JDB_CATALOG_TABLE_DDL);
+    LOG.debug("Created Jdbc table '{}' to store iceberg catalog!", JDBC_CATALOG_TABLE_NAME);
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    Preconditions.checkArgument(properties.get(CatalogProperties.WAREHOUSE_LOCATION) != null &&
+                    !properties.get(CatalogProperties.WAREHOUSE_LOCATION).equals(""),
+            "no location provided for warehouse");
+    Preconditions.checkArgument(!properties.getOrDefault(JDBC_CATALOG_JDBC_DRIVER, "").equals(""),
+            "no jdbc driver classname provided!");
+    Preconditions.checkArgument(!properties.getOrDefault(JDBC_CATALOG_DBURL, "").equals(""),
+            "no jdbc connection url provided!");
+    Preconditions.checkArgument(!properties.getOrDefault(JDBC_CATALOG_USER, "").equals(""),
+            "no jdbc database user provided!");
+    Preconditions.checkArgument(!properties.getOrDefault(JDBC_CATALOG_PASSWORD, "").equals(""),
+            "no jdbc database user password provided!");
+    String catalogJdbcDriver = properties.get(JDBC_CATALOG_JDBC_DRIVER);
+    String catalogDburl = properties.get(JDBC_CATALOG_DBURL);
+    String catalogUser = properties.get(JDBC_CATALOG_USER);
+    String catalogDbpassword = properties.get(JDBC_CATALOG_PASSWORD);
+    this.catalogName = name;
+    this.warehouseLocation = properties.get(CatalogProperties.WAREHOUSE_LOCATION).replaceAll("/*$", "");
+
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    this.fileIO = fileIOImpl == null ? new HadoopFileIO(hadoopConf) : CatalogUtil.loadFileIO(fileIOImpl, properties,
+            hadoopConf);
+
+    DbUtils.loadDriver(catalogJdbcDriver);
+    LOG.debug("Connecting to Jdbc database {}.", catalogDburl);
+    try {
+      jdbcConnection = DriverManager.getConnection(catalogDburl, catalogUser, catalogDbpassword);
+      createCatalogIfNotExists();
+    } catch (SQLException throwables) {
+      throw new RuntimeIOException("Failed to initialize Jdbc Catalog!\n %s %s", throwables.getErrorCode(),
+              throwables.getMessage());
+    }
+
+  }
+
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return new JdbcTableOperations(jdbcConnection, fileIO, catalogName, tableIdentifier);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    String tableName = tableIdentifier.name();
+    StringBuilder sb = new StringBuilder();
+
+    sb.append(warehouseLocation).append('/');
+    for (String level : tableIdentifier.namespace().levels()) {
+      sb.append(level).append('/');
+    }
+    sb.append(tableName);
+
+    return sb.toString();
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    TableOperations ops = newTableOps(identifier);
+    TableMetadata lastMetadata = ops.current();
+    try {
+      int insertedRecords = 0;
+      insertedRecords = queryRunner.update(jdbcConnection, JDBC_TABLE_DROP, catalogName,

Review comment:
       nit: the variable should be `deletedRecords`

##########
File path: core/src/main/java/org/apache/iceberg/hadoop/JdbcCatalog.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hadoop;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.handlers.BeanListHandler;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JdbcCatalog extends BaseMetastoreCatalog implements Configurable, Closeable {

Review comment:
       we should be able to implement `SupportsNamespaces`, do you plan to do that in another PR?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org