You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/12/03 23:50:05 UTC

[GitHub] [iceberg] ismailsimsek commented on a change in pull request #1870: Iceberg Jdbc Catalog Implementation

ismailsimsek commented on a change in pull request #1870:
URL: https://github.com/apache/iceberg/pull/1870#discussion_r535731000



##########
File path: core/src/main/java/org/apache/iceberg/hadoop/JdbcCatalog.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hadoop;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.handlers.BeanListHandler;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JdbcCatalog extends BaseMetastoreCatalog implements Configurable, Closeable {
+
+  public static final String JDBC_CATALOG_DEFAULT_NAME = "default_catalog";
+  public static final String JDBC_CATALOG_TABLE_NAME = "iceberg_catalog";
+  public static final String JDB_CATALOG_TABLE_DDL = "CREATE TABLE IF NOT EXISTS " +
+          JDBC_CATALOG_TABLE_NAME + " ( " +
+          "catalogName VARCHAR(1255) NOT NULL," +
+          "tableNamespace VARCHAR(1255)," +
+          "tableName VARCHAR(1255) NOT NULL," +
+          "metadataLocation VARCHAR(66255)," +
+          "previousMetadataLocation VARCHAR(66255)," +
+          "PRIMARY KEY (catalogName,tableNamespace,tableName)  " +
+          ")";
+  public static final String JDBC_NAMESPACE_TABLE_LIST = "SELECT catalogName, tableNamespace, tableName, " +
+          "metadataLocation, previousMetadataLocation FROM " + JDBC_CATALOG_TABLE_NAME + " " +
+          "WHERE catalogName = ? AND tableNamespace = ?";
+  public static final String JDBC_TABLE_RENAME = "UPDATE " + JDBC_CATALOG_TABLE_NAME + " SET tableNamespace = ? , " +
+          "tableName = ? WHERE catalogName = ? AND tableNamespace = ? AND tableName = ? ";
+  public static final String JDBC_TABLE_DROP = "DELETE FROM " + JDBC_CATALOG_TABLE_NAME + " WHERE catalogName = ? " +
+          "AND tableNamespace = ? AND tableName = ? ";
+  public static final String JDBC_TABLE_SELECT = "SELECT catalogName, tableNamespace, tableName, metadataLocation, " +
+          "previousMetadataLocation FROM " + JDBC_CATALOG_TABLE_NAME + " WHERE catalogName = ? " +
+          "AND tableNamespace = ? AND tableName = ? ";
+  public static final String JDBC_TABLE_INSERT = "INSERT INTO " + JDBC_CATALOG_TABLE_NAME + " (catalogName, " +
+          "tableNamespace, tableName, metadataLocation, previousMetadataLocation) VALUES (?,?,?,?,?)";
+  public static final String JDBC_TABLE_UPDATE_METADATA = "UPDATE " + JDBC_CATALOG_TABLE_NAME + " " +
+          "SET metadataLocation = ? , previousMetadataLocation = ? WHERE catalogName = ? AND tableNamespace = ? " +
+          "AND tableName = ? ";
+
+  public static final String JDBC_CATALOG_JDBC_DRIVER = "jdbccatalog.jdbcdriver";
+  public static final String JDBC_CATALOG_DBURL = "jdbccatalog.dburl";
+  public static final String JDBC_CATALOG_USER = "jdbccatalog.user";
+  public static final String JDBC_CATALOG_PASSWORD = "jdbccatalog.password";
+
+  private static final Logger LOG = LoggerFactory.getLogger(JdbcCatalog.class);
+  private final QueryRunner queryRunner = new QueryRunner();
+  private FileIO fileIO;
+  private String catalogName = JDBC_CATALOG_DEFAULT_NAME;
+  private String warehouseLocation;
+  private Configuration hadoopConf;
+  private Connection jdbcConnection;
+
+
+  public JdbcCatalog() {
+  }
+
+  private void createCatalogIfNotExists() throws SQLException {
+    queryRunner.execute(jdbcConnection, JDB_CATALOG_TABLE_DDL);
+    LOG.debug("Created Jdbc table '{}' to store iceberg catalog!", JDBC_CATALOG_TABLE_NAME);
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    Preconditions.checkArgument(properties.get(CatalogProperties.WAREHOUSE_LOCATION) != null &&
+                    !properties.get(CatalogProperties.WAREHOUSE_LOCATION).equals(""),
+            "no location provided for warehouse");
+    Preconditions.checkArgument(!properties.getOrDefault(JDBC_CATALOG_JDBC_DRIVER, "").equals(""),
+            "no jdbc driver classname provided!");
+    Preconditions.checkArgument(!properties.getOrDefault(JDBC_CATALOG_DBURL, "").equals(""),
+            "no jdbc connection url provided!");
+    Preconditions.checkArgument(!properties.getOrDefault(JDBC_CATALOG_USER, "").equals(""),
+            "no jdbc database user provided!");
+    Preconditions.checkArgument(!properties.getOrDefault(JDBC_CATALOG_PASSWORD, "").equals(""),
+            "no jdbc database user password provided!");
+    String catalogJdbcDriver = properties.get(JDBC_CATALOG_JDBC_DRIVER);
+    String catalogDburl = properties.get(JDBC_CATALOG_DBURL);
+    String catalogUser = properties.get(JDBC_CATALOG_USER);
+    String catalogDbpassword = properties.get(JDBC_CATALOG_PASSWORD);
+    this.catalogName = name;
+    this.warehouseLocation = properties.get(CatalogProperties.WAREHOUSE_LOCATION).replaceAll("/*$", "");
+
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    this.fileIO = fileIOImpl == null ? new HadoopFileIO(hadoopConf) : CatalogUtil.loadFileIO(fileIOImpl, properties,
+            hadoopConf);
+
+    DbUtils.loadDriver(catalogJdbcDriver);
+    LOG.debug("Connecting to Jdbc database {}.", catalogDburl);
+    try {
+      jdbcConnection = DriverManager.getConnection(catalogDburl, catalogUser, catalogDbpassword);
+      createCatalogIfNotExists();
+    } catch (SQLException throwables) {
+      throw new RuntimeIOException("Failed to initialize Jdbc Catalog!\n %s %s", throwables.getErrorCode(),
+              throwables.getMessage());
+    }
+
+  }
+
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return new JdbcTableOperations(jdbcConnection, fileIO, catalogName, tableIdentifier);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    String tableName = tableIdentifier.name();
+    StringBuilder sb = new StringBuilder();
+
+    sb.append(warehouseLocation).append('/');
+    for (String level : tableIdentifier.namespace().levels()) {
+      sb.append(level).append('/');
+    }
+    sb.append(tableName);
+
+    return sb.toString();
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    TableOperations ops = newTableOps(identifier);
+    TableMetadata lastMetadata = ops.current();
+    try {
+      int insertedRecords = 0;
+      insertedRecords = queryRunner.update(jdbcConnection, JDBC_TABLE_DROP, catalogName,

Review comment:
       updated




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org