You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/05/18 15:49:58 UTC

[GitHub] [iceberg] RussellSpitzer opened a new pull request, #4810: Spark: Provide Procedure for Catalog Register Procedure

RussellSpitzer opened a new pull request, #4810:
URL: https://github.com/apache/iceberg/pull/4810

   Adds the ability to invoke a Catalog's register method via a Spark
   procedure. This allows a user to create a catalog entry for a metadata.json
   file which already exists but does not have a corrosponding catalog identifier.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #4810: Spark: Provide Procedure for Catalog Register Procedure

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on PR #4810:
URL: https://github.com/apache/iceberg/pull/4810#issuecomment-1132113677

   Thanks @flyrain and @kbendick , Please take a look again when you have a chance


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #4810: Spark: Provide Procedure for Catalog Register Procedure

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on PR #4810:
URL: https://github.com/apache/iceberg/pull/4810#issuecomment-1130193088

   @karuppayya Could you please take a look?
   @flyrain This is the API we were discussing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #4810: Spark: Provide Procedure for Catalog Register Procedure

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #4810:
URL: https://github.com/apache/iceberg/pull/4810#discussion_r876155917


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/HasIcebergCatalog.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+
+public interface HasIcebergCatalog extends TableCatalog {

Review Comment:
   👍 . This is something I get asked about often enough and I don't like having to ask end users to do multiple casts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #4810: Spark: Provide Procedure for Catalog Register Procedure

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on code in PR #4810:
URL: https://github.com/apache/iceberg/pull/4810#discussion_r883823217


##########
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRegisterTableProcedure.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hive.HiveTableOperations;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestRegisterTableProcedure extends SparkExtensionsTestBase {
+
+  private final String targetName;
+
+  public TestRegisterTableProcedure(
+      String catalogName,
+      String implementation,
+      Map<String, String> config) {
+    super(catalogName, implementation, config);
+    targetName = tableName("register_table");
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @After
+  public void dropTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %s", targetName);
+  }
+
+  @Test
+  public void testRegisterTable() throws NoSuchTableException, ParseException {
+    Assume.assumeTrue("Register only implemented on Hive Catalogs",
+        spark.conf().get("spark.sql.catalog." + catalogName + ".type").equals("hive"));
+
+    long numRows = 1000;
+
+    sql("CREATE TABLE %s (id int, data string) using ICEBERG", tableName);
+    spark.range(0, numRows)
+        .withColumn("data", functions.col("id").cast(DataTypes.StringType))
+        .writeTo(tableName)
+        .append();
+
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+    long originalFileCount = (long) scalarSql("SELECT COUNT(*) from %s.files", tableName);
+    long currentSnapshotId = table.currentSnapshot().snapshotId();
+    String metadataJson = ((HiveTableOperations) (((HasTableOperations) table).operations())).currentMetadataLocation();
+
+    List<Object[]> result = sql("CALL %s.system.register_table('%s', '%s')", catalogName, targetName, metadataJson);
+    Assert.assertEquals("Current Snapshot is not correct", currentSnapshotId, result.get(0)[0]);
+
+    List<Object[]> original = sql("SELECT * FROM %s", tableName);
+    List<Object[]> registered = sql("SELECT * FROM %s", targetName);
+    assertEquals("Registered table rows should match original table rows", original, registered);
+    Assert.assertEquals("Should have the right row count in the procedure result",
+        numRows, result.get(0)[1]);
+    Assert.assertEquals("Should have the right datafile count the procedure result",

Review Comment:
   nit 
   ```suggestion
       Assert.assertEquals("Should have the right datafile count in the procedure result",
   ```



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.iceberg.spark.source.HasIcebergCatalog;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+class RegisterTableProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.required("metadata_file", DataTypes.StringType)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("Current Snapshot", DataTypes.LongType, true, Metadata.empty()),

Review Comment:
   [minor] should we call this `Current SnapshotId` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #4810: Spark: Provide Procedure for Catalog Register Procedure

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #4810:
URL: https://github.com/apache/iceberg/pull/4810#discussion_r876323338


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.iceberg.spark.source.HasIcebergCatalog;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+class RegisterTableProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.required("metadata_file", DataTypes.StringType)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("Current Snapshot", DataTypes.LongType, false, Metadata.empty()),
+      new StructField("Manifests Registered", DataTypes.LongType, false, Metadata.empty()),
+      new StructField("Datafiles Registered", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private RegisterTableProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<RegisterTableProcedure>() {
+      @Override
+      protected RegisterTableProcedure doBuild() {
+        return new RegisterTableProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    TableIdentifier tableName = Spark3Util.identifierToTableIdentifier(toIdentifier(args.getString(0), "table"));
+    String metadataFile = args.getString(1);
+    Preconditions.checkArgument(metadataFile != null && !metadataFile.isEmpty(),
+        "Cannot handle an empty argument metadata_file");
+
+    Catalog icebergCatalog = ((HasIcebergCatalog) tableCatalog()).icebergCatalog();
+    Table registeredTable = icebergCatalog.registerTable(tableName, metadataFile);
+    long currentSnapshotId = registeredTable.currentSnapshot().snapshotId();
+    long totalManifests = spark().table(tableCatalog().name() + "." + tableName + ".manifests").count();
+    long totalDataFiles = spark().table(tableCatalog().name() + "." + tableName + ".files").count();

Review Comment:
   I think this will scan 2 metadata tables, right? The concern is that the procedure is perf may not be good  if the table has a lot of data/metadata files. Can we use the ones from snapshot summary? e.g. TOTAL_DATA_FILES_PROP



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.iceberg.spark.source.HasIcebergCatalog;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+class RegisterTableProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.required("metadata_file", DataTypes.StringType)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("Current Snapshot", DataTypes.LongType, false, Metadata.empty()),
+      new StructField("Manifests Registered", DataTypes.LongType, false, Metadata.empty()),
+      new StructField("Datafiles Registered", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private RegisterTableProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<RegisterTableProcedure>() {
+      @Override
+      protected RegisterTableProcedure doBuild() {
+        return new RegisterTableProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    TableIdentifier tableName = Spark3Util.identifierToTableIdentifier(toIdentifier(args.getString(0), "table"));
+    String metadataFile = args.getString(1);
+    Preconditions.checkArgument(metadataFile != null && !metadataFile.isEmpty(),
+        "Cannot handle an empty argument metadata_file");
+
+    Catalog icebergCatalog = ((HasIcebergCatalog) tableCatalog()).icebergCatalog();
+    Table registeredTable = icebergCatalog.registerTable(tableName, metadataFile);
+    long currentSnapshotId = registeredTable.currentSnapshot().snapshotId();
+    long totalManifests = spark().table(tableCatalog().name() + "." + tableName + ".manifests").count();
+    long totalDataFiles = spark().table(tableCatalog().name() + "." + tableName + ".files").count();

Review Comment:
   I think this will scan 2 metadata tables, right? The concern is that the procedure is the perf may not be good  if the table has a lot of data/metadata files. Can we use the ones from snapshot summary? e.g. TOTAL_DATA_FILES_PROP



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4810: Spark: Provide Procedure for Catalog Register Procedure

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4810:
URL: https://github.com/apache/iceberg/pull/4810#discussion_r877456832


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -648,6 +650,21 @@ public static org.apache.iceberg.Table loadIcebergTable(SparkSession spark, Stri
     return toIcebergTable(sparkTable);
   }
 
+  /**
+   * Returns the underlying Iceberg Catalog object represented by a Spark Catalog
+   * @param spark SparkSession used for looking up catalog reference
+   * @param catalogName The name of the Spark Catalog being referenced
+   * @return the Iceberg catalog class being wrapped by the Spark Catalog
+   */
+  public static Catalog loadIcebergCatalog(SparkSession spark, String catalogName) {

Review Comment:
   @kbendick I think you would be interested in this as well
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #4810: Spark: Provide Procedure for Catalog Register Procedure

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #4810:
URL: https://github.com/apache/iceberg/pull/4810#discussion_r883894158


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -648,6 +650,21 @@ public static org.apache.iceberg.Table loadIcebergTable(SparkSession spark, Stri
     return toIcebergTable(sparkTable);
   }
 
+  /**
+   * Returns the underlying Iceberg Catalog object represented by a Spark Catalog
+   * @param spark SparkSession used for looking up catalog reference
+   * @param catalogName The name of the Spark Catalog being referenced
+   * @return the Iceberg catalog class being wrapped by the Spark Catalog
+   */
+  public static Catalog loadIcebergCatalog(SparkSession spark, String catalogName) {

Review Comment:
   We'd probably want to add the `HasIcebergCatalog` interface to other catalogs as well, though that can be handled in a follow up once the interface is in.
   
   If / when we merge this, let's open an issue for adding `HasIcebergCatalog` to other catalogs. I've got a newer way of tracking opened issues I've been using so that's one way to ensure that it is at least not forgotten about =)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #4810: Spark: Provide Procedure for Catalog Register Procedure

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #4810:
URL: https://github.com/apache/iceberg/pull/4810#discussion_r883892796


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -648,6 +650,21 @@ public static org.apache.iceberg.Table loadIcebergTable(SparkSession spark, Stri
     return toIcebergTable(sparkTable);
   }
 
+  /**
+   * Returns the underlying Iceberg Catalog object represented by a Spark Catalog
+   * @param spark SparkSession used for looking up catalog reference
+   * @param catalogName The name of the Spark Catalog being referenced
+   * @return the Iceberg catalog class being wrapped by the Spark Catalog
+   */
+  public static Catalog loadIcebergCatalog(SparkSession spark, String catalogName) {

Review Comment:
   Thanks for tagging me @RussellSpitzer. I do think this would be really beneficial. I have a number of awkward workarounds that I've either suggsted to people or implemented myself that this would get around. So emphatic +1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4810: Spark: Provide Procedure for Catalog Register Procedure

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4810:
URL: https://github.com/apache/iceberg/pull/4810#discussion_r876373553


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/HasIcebergCatalog.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+
+public interface HasIcebergCatalog extends TableCatalog {

Review Comment:
   As a note, The reason I put this here as a public interface is that I didn't want to expose BaseCatalog



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #4810: Spark: Provide Procedure for Catalog Register Procedure

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #4810:
URL: https://github.com/apache/iceberg/pull/4810#discussion_r876329175


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -648,6 +650,22 @@ public static org.apache.iceberg.Table loadIcebergTable(SparkSession spark, Stri
     return toIcebergTable(sparkTable);
   }
 
+  /**
+   * Returns the underlying Iceberg Catalog object represented by a Spark Catalog
+   * @param spark SparkSession used for looking up catalog reference
+   * @param catalogName The name of the Spark Catalog being referenced
+   * @return the Iceberg catalog class being wrapped by the Spark Catalog
+   */
+  public static Catalog loadIcebergCatalog(SparkSession spark, String catalogName) {
+    CatalogPlugin catalogPlugin = spark.sessionState().catalogManager().catalog(catalogName);
+    Preconditions.checkArgument(catalogPlugin instanceof HasIcebergCatalog,
+        String.format("Cannot load Iceberg catalog from catalog %s because it is not a Spark Iceberg catalog. Actual " +

Review Comment:
   because it is not a Spark Iceberg catalog -> because it doesn't contain an Iceberg catalog? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #4810: Spark: Provide Procedure for Catalog Register Procedure

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #4810:
URL: https://github.com/apache/iceberg/pull/4810#discussion_r877608214


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.iceberg.spark.source.HasIcebergCatalog;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+class RegisterTableProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.required("metadata_file", DataTypes.StringType)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("Current Snapshot", DataTypes.LongType, false, Metadata.empty()),
+      new StructField("Rows", DataTypes.LongType, false, Metadata.empty()),
+      new StructField("Datafiles", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private RegisterTableProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<RegisterTableProcedure>() {
+      @Override
+      protected RegisterTableProcedure doBuild() {
+        return new RegisterTableProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    TableIdentifier tableName = Spark3Util.identifierToTableIdentifier(toIdentifier(args.getString(0), "table"));
+    String metadataFile = args.getString(1);
+    Preconditions.checkArgument(tableCatalog() instanceof HasIcebergCatalog,
+        "Cannot use Register Table in a non-Iceberg catalog");
+    Preconditions.checkArgument(metadataFile != null && !metadataFile.isEmpty(),
+        "Cannot handle an empty argument metadata_file");
+
+    Catalog icebergCatalog = ((HasIcebergCatalog) tableCatalog()).icebergCatalog();
+    Table table = icebergCatalog.registerTable(tableName, metadataFile);
+    Snapshot currentSnapshot = table.currentSnapshot();

Review Comment:
   I think we need a null check here. There is an edge case, that current snapshot is null. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4810: Spark: Provide Procedure for Catalog Register Procedure

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4810:
URL: https://github.com/apache/iceberg/pull/4810#discussion_r876326354


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.iceberg.spark.source.HasIcebergCatalog;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+class RegisterTableProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.required("metadata_file", DataTypes.StringType)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("Current Snapshot", DataTypes.LongType, false, Metadata.empty()),
+      new StructField("Manifests Registered", DataTypes.LongType, false, Metadata.empty()),
+      new StructField("Datafiles Registered", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private RegisterTableProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<RegisterTableProcedure>() {
+      @Override
+      protected RegisterTableProcedure doBuild() {
+        return new RegisterTableProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    TableIdentifier tableName = Spark3Util.identifierToTableIdentifier(toIdentifier(args.getString(0), "table"));
+    String metadataFile = args.getString(1);
+    Preconditions.checkArgument(metadataFile != null && !metadataFile.isEmpty(),
+        "Cannot handle an empty argument metadata_file");
+
+    Catalog icebergCatalog = ((HasIcebergCatalog) tableCatalog()).icebergCatalog();
+    Table registeredTable = icebergCatalog.registerTable(tableName, metadataFile);
+    long currentSnapshotId = registeredTable.currentSnapshot().snapshotId();
+    long totalManifests = spark().table(tableCatalog().name() + "." + tableName + ".manifests").count();
+    long totalDataFiles = spark().table(tableCatalog().name() + "." + tableName + ".files").count();

Review Comment:
   I kinda wanted to do a little sanity check here that the table actually worked which is why I had the full scans here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4810: Spark: Provide Procedure for Catalog Register Procedure

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4810:
URL: https://github.com/apache/iceberg/pull/4810#discussion_r885790871


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -648,6 +650,21 @@ public static org.apache.iceberg.Table loadIcebergTable(SparkSession spark, Stri
     return toIcebergTable(sparkTable);
   }
 
+  /**
+   * Returns the underlying Iceberg Catalog object represented by a Spark Catalog
+   * @param spark SparkSession used for looking up catalog reference
+   * @param catalogName The name of the Spark Catalog being referenced
+   * @return the Iceberg catalog class being wrapped by the Spark Catalog
+   */
+  public static Catalog loadIcebergCatalog(SparkSession spark, String catalogName) {

Review Comment:
   One issue with that is this is a Spark Table Catalog implementing class in the current iteration, should we change that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer merged pull request #4810: Spark: Provide Procedure for Catalog Register Procedure

Posted by GitBox <gi...@apache.org>.
RussellSpitzer merged PR #4810:
URL: https://github.com/apache/iceberg/pull/4810


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4810: Spark: Provide Procedure for Catalog Register Procedure

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4810:
URL: https://github.com/apache/iceberg/pull/4810#discussion_r876068277


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -648,6 +650,21 @@ public static org.apache.iceberg.Table loadIcebergTable(SparkSession spark, Stri
     return toIcebergTable(sparkTable);
   }
 
+  /**
+   * Returns the underlying Iceberg Catalog object represented by a Spark Catalog
+   * @param spark SparkSession used for looking up catalog reference
+   * @param catalogName The name of the Spark Catalog being referenced
+   * @return the Iceberg catalog class being wrapped by the Spark Catalog
+   */
+  public static Catalog loadIcebergCatalog(SparkSession spark, String catalogName) {

Review Comment:
   This isn't strictly needed for this PR but is another API which is now possible because of this PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #4810: Spark: Provide Procedure for Catalog Register Procedure

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #4810:
URL: https://github.com/apache/iceberg/pull/4810#discussion_r876323338


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.iceberg.spark.source.HasIcebergCatalog;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+class RegisterTableProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.required("metadata_file", DataTypes.StringType)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("Current Snapshot", DataTypes.LongType, false, Metadata.empty()),
+      new StructField("Manifests Registered", DataTypes.LongType, false, Metadata.empty()),
+      new StructField("Datafiles Registered", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private RegisterTableProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<RegisterTableProcedure>() {
+      @Override
+      protected RegisterTableProcedure doBuild() {
+        return new RegisterTableProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    TableIdentifier tableName = Spark3Util.identifierToTableIdentifier(toIdentifier(args.getString(0), "table"));
+    String metadataFile = args.getString(1);
+    Preconditions.checkArgument(metadataFile != null && !metadataFile.isEmpty(),
+        "Cannot handle an empty argument metadata_file");
+
+    Catalog icebergCatalog = ((HasIcebergCatalog) tableCatalog()).icebergCatalog();
+    Table registeredTable = icebergCatalog.registerTable(tableName, metadataFile);
+    long currentSnapshotId = registeredTable.currentSnapshot().snapshotId();
+    long totalManifests = spark().table(tableCatalog().name() + "." + tableName + ".manifests").count();
+    long totalDataFiles = spark().table(tableCatalog().name() + "." + tableName + ".files").count();

Review Comment:
   I think this will scan 2 metadata tables, right? The concern is that the perf of the procedure may not be good  if the table has a lot of data/metadata files. Can we use the ones from snapshot summary? e.g. TOTAL_DATA_FILES_PROP



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #4810: Spark: Provide Procedure for Catalog Register Procedure

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #4810:
URL: https://github.com/apache/iceberg/pull/4810#discussion_r876319461


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.iceberg.spark.source.HasIcebergCatalog;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+class RegisterTableProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.required("metadata_file", DataTypes.StringType)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("Current Snapshot", DataTypes.LongType, false, Metadata.empty()),
+      new StructField("Manifests Registered", DataTypes.LongType, false, Metadata.empty()),
+      new StructField("Datafiles Registered", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private RegisterTableProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<RegisterTableProcedure>() {
+      @Override
+      protected RegisterTableProcedure doBuild() {
+        return new RegisterTableProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    TableIdentifier tableName = Spark3Util.identifierToTableIdentifier(toIdentifier(args.getString(0), "table"));
+    String metadataFile = args.getString(1);
+    Preconditions.checkArgument(metadataFile != null && !metadataFile.isEmpty(),
+        "Cannot handle an empty argument metadata_file");
+
+    Catalog icebergCatalog = ((HasIcebergCatalog) tableCatalog()).icebergCatalog();
+    Table registeredTable = icebergCatalog.registerTable(tableName, metadataFile);
+    long currentSnapshotId = registeredTable.currentSnapshot().snapshotId();
+    long totalManifests = spark().table(tableCatalog().name() + "." + tableName + ".manifests").count();
+    long totalDataFiles = spark().table(tableCatalog().name() + "." + tableName + ".files").count();
+
+    return new InternalRow[] {newInternalRow(currentSnapshotId, totalManifests, totalDataFiles)};
+  }
+
+  @Override
+  public String description() {
+    return "MigrateTableProcedure";

Review Comment:
   MigrateTableProcedure -> RegisterTableProcedure ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #4810: Spark: Provide Procedure for Catalog Register Procedure

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #4810:
URL: https://github.com/apache/iceberg/pull/4810#discussion_r876318204


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.iceberg.spark.source.HasIcebergCatalog;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+class RegisterTableProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.required("metadata_file", DataTypes.StringType)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("Current Snapshot", DataTypes.LongType, false, Metadata.empty()),
+      new StructField("Manifests Registered", DataTypes.LongType, false, Metadata.empty()),
+      new StructField("Datafiles Registered", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private RegisterTableProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<RegisterTableProcedure>() {
+      @Override
+      protected RegisterTableProcedure doBuild() {
+        return new RegisterTableProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    TableIdentifier tableName = Spark3Util.identifierToTableIdentifier(toIdentifier(args.getString(0), "table"));
+    String metadataFile = args.getString(1);
+    Preconditions.checkArgument(metadataFile != null && !metadataFile.isEmpty(),
+        "Cannot handle an empty argument metadata_file");
+
+    Catalog icebergCatalog = ((HasIcebergCatalog) tableCatalog()).icebergCatalog();

Review Comment:
   Should we check if the tableCatalog is an instance of `HasIcebergCatalog`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #4810: Spark: Provide Procedure for Catalog Register Procedure

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on PR #4810:
URL: https://github.com/apache/iceberg/pull/4810#issuecomment-1140321165

   Yes sorry the merge alignment pr I just merged in was high priority and this went on my back burner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #4810: Spark: Provide Procedure for Catalog Register Procedure

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on PR #4810:
URL: https://github.com/apache/iceberg/pull/4810#issuecomment-1145948391

   Thanks @flyrain, @kbendick  and @singhpk234 !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org