You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/12/10 20:05:59 UTC

[GitHub] [iceberg] RussellSpitzer opened a new pull request #1906: Migrate Procedures

RussellSpitzer opened a new pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906


   Adds Procedures for Migrate and Snapshot Actions


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540857984



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.SnapshotAction;
+import org.apache.iceberg.actions.Spark3SnapshotAction;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class SnapshotTableProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("source_table", DataTypes.StringType),
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_location", DataTypes.StringType),
+      ProcedureParameter.optional("properties", STRING_MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("imported_datafiles_count", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private SnapshotTableProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<SnapshotTableProcedure>() {
+      @Override
+      protected SnapshotTableProcedure doBuild() {
+        return new SnapshotTableProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    String source = args.getString(0);
+    CatalogAndIdentifier sourceIdent = toCatalogAdnIdentifier(source, PARAMETERS[0].name(), tableCatalog());
+
+    String dest = args.getString(1);
+    CatalogAndIdentifier destIdent = toCatalogAdnIdentifier(dest, PARAMETERS[1].name(), tableCatalog());
+
+    String snapshotLocation = args.isNullAt(2) ? null : args.getString(2);
+
+    Map<String, String> options = Maps.newHashMap();
+    if (!args.isNullAt(3)) {
+      args.getMap(3).foreach(DataTypes.StringType, DataTypes.StringType,
+          (k, v) -> {
+            options.put(k.toString(), v.toString());
+            return BoxedUnit.UNIT;
+          });
+    }
+
+    Preconditions.checkArgument(sourceIdent != destIdent || sourceIdent.catalog() != destIdent.catalog(),
+        "Cannot create a snapshot with the same name as the source of the snapshot.");
+    SnapshotAction action =  new Spark3SnapshotAction(spark(), sourceIdent.catalog(), sourceIdent.identifier(),

Review comment:
       nit: extra space




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540801791



##########
File path: spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java
##########
@@ -122,6 +122,8 @@ public Long execute() {
     properties.put(TableCatalog.PROP_PROVIDER, "iceberg");
     properties.put("migrated", "true");
     properties.putAll(additionalProperties());
+    properties.putIfAbsent(LOCATION, sourceTableLocation());
+

Review comment:
       I'd actually add some empty lines to separate logical blocks similar to what you have in snapshot.
   
   ```
       Map<String, String> properties = Maps.newHashMap();
   
       properties.putAll(JavaConverters.mapAsJavaMapConverter(v1SourceTable().properties()).asJava());
       EXCLUDED_PROPERTIES.forEach(properties::remove);
   
       properties.put(TableCatalog.PROP_PROVIDER, "iceberg");
       properties.put("migrated", "true");
       properties.putAll(additionalProperties());
       properties.putIfAbsent(LOCATION, sourceTableLocation());
   
       return properties;
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540536060



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/SnapshotProcedure.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.SnapshotAction;
+import org.apache.iceberg.actions.Spark3SnapshotAction;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class SnapshotProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("snapshot_source", DataTypes.StringType),
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_location", DataTypes.StringType),
+      ProcedureParameter.optional("table_options", STRING_MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("num_datafiles_included", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private SnapshotProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<SnapshotProcedure>() {
+      @Override
+      protected SnapshotProcedure doBuild() {
+        return new SnapshotProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    String source = args.getString(0);
+    String dest = args.getString(1);
+
+    String snapshotLocation = args.isNullAt(2) ? null : args.getString(2);
+
+    Map<String, String> options = Maps.newHashMap();
+    if (!args.isNullAt(3)) {
+      args.getMap(3).foreach(DataTypes.StringType, DataTypes.StringType,
+          (k, v) -> {
+            options.put(k.toString(), v.toString());
+            return BoxedUnit.UNIT;
+          });
+    }
+
+    CatalogAndIdentifier sourceIdent = toCatalogAndIdentifer(source, PARAMETERS[0].name(), tableCatalog());
+    CatalogAndIdentifier destIdent = toCatalogAndIdentifer(dest, PARAMETERS[1].name(), tableCatalog());
+
+    Preconditions.checkArgument(sourceIdent != destIdent || sourceIdent.catalog() != destIdent.catalog(),
+        "Cannot create a snapshot with the same name as the source of the snapshot.");
+    SnapshotAction action =  new Spark3SnapshotAction(spark(), sourceIdent.catalog(), sourceIdent.identifier(),
+        destIdent.catalog(), destIdent.identifier());
+
+    long numFiles;
+    if (snapshotLocation != null) {
+      numFiles = action.withLocation(snapshotLocation).withProperties(options).execute();
+    } else {
+      numFiles = action.withProperties(options).execute();
+    }
+
+    return new InternalRow[] {newInternalRow(numFiles)};
+  }
+
+  @Override
+  public String description() {
+    return "Creates an Iceberg table from a Spark Table. The Created table will be isolated from the original table" +

Review comment:
       same here. `SnapshotTableProcedure`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540567281



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateProcedures.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestCreateProcedures extends SparkExtensionsTestBase {
+  private static final String sourceName = "spark_catalog.default.source";
+  // Currently we can only Snapshot only out of the Spark Session Catalog
+
+  public TestCreateProcedures(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %S", sourceName);
+  }
+
+  @Test
+  public void testMigrate() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql("DROP TABLE IF EXISTS %s_BACKUP_", tableName);
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    Object[] result = sql("CALL %s.system.migrate('%s')", catalogName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMigrateWithOptions() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    Object[] result = sql("CALL %s.system.migrate('%s', map('foo', 'bar'))", catalogName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    Map<String, String> props = validationCatalog.loadTable(tableIdent).properties();
+    Assert.assertEquals("Should have extra property set", "bar", props.get("foo"));
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshot() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql("CALL %s.system.snapshot('%s', '%s')", catalogName, sourceName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);

Review comment:
       "migrated" -> "added"?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540524023



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/MigrateProcedure.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iceberg.actions.CreateAction;
+import org.apache.iceberg.actions.Spark3MigrateAction;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class MigrateProcedure extends BaseProcedure {

Review comment:
       nit: `MigrateTableProcedure`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540466555



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/SnapshotProcedure.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iceberg.actions.SnapshotAction;
+import org.apache.iceberg.actions.Spark3SnapshotAction;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class SnapshotProcedure extends BaseProcedure {
+  private static final DataType MAP = DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType);
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("snapshot_source", DataTypes.StringType),
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_location", DataTypes.StringType),
+      ProcedureParameter.optional("table_options", MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("num_datafiles_included", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private SnapshotProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<SnapshotProcedure>() {
+      @Override
+      protected SnapshotProcedure doBuild() {
+        return new SnapshotProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    String source = args.getString(0);
+    String dest = args.getString(1);
+
+    String snapshotLocation = args.isNullAt(2) ? null : args.getString(2);
+
+    Map<String, String> options = new HashMap<>();
+    if (!args.isNullAt(3)) {
+      args.getMap(3).foreach(DataTypes.StringType, DataTypes.StringType,

Review comment:
       Calling scala from java




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540796780



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestSnapshotTableProcedure extends SparkExtensionsTestBase {
+  private static final String sourceName = "spark_catalog.default.source";
+  // Currently we can only Snapshot only out of the Spark Session Catalog
+
+  public TestSnapshotTableProcedure(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %S", sourceName);
+  }
+
+  @Test
+  public void testSnapshot() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql("CALL %s.system.snapshot('%s', '%s')", catalogName, sourceName, tableName).get(0);
+
+    Assert.assertEquals("Should have added one file", 1L, result[0]);
+
+    Table createdTable = validationCatalog.loadTable(tableIdent);
+    String tableLocation = createdTable.location();
+    Assert.assertNotEquals("Table should not have the original location", location, tableLocation);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshotWithOptions() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object result = scalarSql(
+        "CALL %s.system.snapshot(source_table => '%s', table => '%s', properties => map('foo','bar'))",
+        catalogName, sourceName, tableName);
+
+    Assert.assertEquals("Should have added one file", 1L, result);
+
+    Table createdTable = validationCatalog.loadTable(tableIdent);
+
+    String tableLocation = createdTable.location();
+    Assert.assertNotEquals("Table should not have the original location", location, tableLocation);
+
+    Map<String, String> props = createdTable.properties();
+    Assert.assertEquals("Should have extra property set", "bar", props.get("foo"));
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshotWithAlternateLocation() throws IOException {
+    Assume.assumeTrue("No Snapshoting with Alternate locations with Hadoop Catalogs", !catalogName.contains("hadoop"));
+    String location = temp.newFolder().toString();
+    String snapshotLocation = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql(

Review comment:
       nit: `scalarSql`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540530457



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/MigrateProcedure.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iceberg.actions.CreateAction;
+import org.apache.iceberg.actions.Spark3MigrateAction;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class MigrateProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_options", STRING_MAP)

Review comment:
       Should it be `table_properties` or just `properties`? There will be a difference between options and properties in Spark 3.1.0.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540566620



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateProcedures.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestCreateProcedures extends SparkExtensionsTestBase {
+  private static final String sourceName = "spark_catalog.default.source";
+  // Currently we can only Snapshot only out of the Spark Session Catalog
+
+  public TestCreateProcedures(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %S", sourceName);
+  }
+
+  @Test
+  public void testMigrate() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql("DROP TABLE IF EXISTS %s_BACKUP_", tableName);
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    Object[] result = sql("CALL %s.system.migrate('%s')", catalogName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMigrateWithOptions() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", tableName, location);

Review comment:
       Since the location is set, should this validate that the migrated table's location matches the one passed here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540535224



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/SnapshotProcedure.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.SnapshotAction;
+import org.apache.iceberg.actions.Spark3SnapshotAction;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class SnapshotProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("snapshot_source", DataTypes.StringType),
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_location", DataTypes.StringType),
+      ProcedureParameter.optional("table_options", STRING_MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("num_datafiles_included", DataTypes.LongType, false, Metadata.empty())

Review comment:
       `imported_data_files_count`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540846166



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/MigrateProcedure.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iceberg.actions.CreateAction;
+import org.apache.iceberg.actions.Spark3MigrateAction;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class MigrateProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_options", STRING_MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("num_datafiles_included", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private MigrateProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<MigrateProcedure>() {
+      @Override
+      protected MigrateProcedure doBuild() {
+        return new MigrateProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Map<String, String> options = new HashMap<>();
+    if (!args.isNullAt(1)) {
+      args.getMap(1).foreach(DataTypes.StringType, DataTypes.StringType,
+          (k, v) -> {
+            options.put(k.toString(), v.toString());
+            return BoxedUnit.UNIT;
+          });
+    }
+
+    String tableName = args.getString(0);
+    CatalogAndIdentifier tableIdent = toCatalogAndIdentifer(tableName, PARAMETERS[0].name(), tableCatalog());
+    CreateAction action =  new Spark3MigrateAction(spark(), tableIdent.catalog(), tableIdent.identifier());
+
+    long numFiles = action.withProperties(options).execute();
+    return new InternalRow[] {newInternalRow(numFiles)};
+  }
+
+  @Override
+  public String description() {

Review comment:
       Sounds like something we can address while adding support for describing procedures.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540606060



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/SnapshotProcedure.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iceberg.actions.SnapshotAction;
+import org.apache.iceberg.actions.Spark3SnapshotAction;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class SnapshotProcedure extends BaseProcedure {
+  private static final DataType MAP = DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType);
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("snapshot_source", DataTypes.StringType),
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_location", DataTypes.StringType),
+      ProcedureParameter.optional("table_options", MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("num_datafiles_included", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private SnapshotProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<SnapshotProcedure>() {
+      @Override
+      protected SnapshotProcedure doBuild() {
+        return new SnapshotProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    String source = args.getString(0);
+    String dest = args.getString(1);
+
+    String snapshotLocation = args.isNullAt(2) ? null : args.getString(2);
+
+    Map<String, String> options = new HashMap<>();
+    if (!args.isNullAt(3)) {
+      args.getMap(3).foreach(DataTypes.StringType, DataTypes.StringType,

Review comment:
       Sounds reasonable to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540462972



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
##########
@@ -70,21 +78,23 @@ protected BaseProcedure(TableCatalog tableCatalog) {
   }
 
   protected Identifier toIdentifier(String identifierAsString, String argName) {
-    Preconditions.checkArgument(identifierAsString != null && !identifierAsString.isEmpty(),
-        "Cannot handle an empty identifier for argument %s", argName);
-
-    CatalogAndIdentifier catalogAndIdentifier = Spark3Util.catalogAndIdentifier(
-        "identifier for arg " + argName, spark, identifierAsString, tableCatalog);
-
-    CatalogPlugin catalog = catalogAndIdentifier.catalog();
-    Identifier identifier = catalogAndIdentifier.identifier();
+    CatalogAndIdentifier catalogAndIdentifier = toCatalogAndIdentifer(identifierAsString, argName, tableCatalog);
 
     Preconditions.checkArgument(
-        catalog.equals(tableCatalog),
+        catalogAndIdentifier.catalog().equals(tableCatalog),
         "Cannot run procedure in catalog '%s': '%s' is a table in catalog '%s'",
-        tableCatalog.name(), identifierAsString, catalog.name());
+        tableCatalog.name(), identifierAsString, catalogAndIdentifier.catalog().name());
+
+    return catalogAndIdentifier.identifier();
+  }
+
+  protected CatalogAndIdentifier toCatalogAndIdentifer(String identifierAsString, String argName,
+                                                       CatalogPlugin catalog) {
+    Preconditions.checkArgument(identifierAsString != null && !identifierAsString.isEmpty(),
+        "Cannot handle an empty identifier for argument %s", argName);
 
-    return identifier;
+    return Spark3Util.catalogAndIdentifier(

Review comment:
       I'll one line this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540971794



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
##########
@@ -70,21 +82,22 @@ protected BaseProcedure(TableCatalog tableCatalog) {
   }
 
   protected Identifier toIdentifier(String identifierAsString, String argName) {
-    Preconditions.checkArgument(identifierAsString != null && !identifierAsString.isEmpty(),
-        "Cannot handle an empty identifier for argument %s", argName);
-
-    CatalogAndIdentifier catalogAndIdentifier = Spark3Util.catalogAndIdentifier(
-        "identifier for arg " + argName, spark, identifierAsString, tableCatalog);
-
-    CatalogPlugin catalog = catalogAndIdentifier.catalog();
-    Identifier identifier = catalogAndIdentifier.identifier();
+    CatalogAndIdentifier catalogAndIdentifier = toCatalogAdnIdentifier(identifierAsString, argName, tableCatalog);
 
     Preconditions.checkArgument(
-        catalog.equals(tableCatalog),
+        catalogAndIdentifier.catalog().equals(tableCatalog),
         "Cannot run procedure in catalog '%s': '%s' is a table in catalog '%s'",
-        tableCatalog.name(), identifierAsString, catalog.name());
+        tableCatalog.name(), identifierAsString, catalogAndIdentifier.catalog().name());
+
+    return catalogAndIdentifier.identifier();
+  }
+
+  protected CatalogAndIdentifier toCatalogAdnIdentifier(String identifierAsString, String argName,

Review comment:
       fix!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540523010



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/MigrateProcedure.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iceberg.actions.CreateAction;
+import org.apache.iceberg.actions.Spark3MigrateAction;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class MigrateProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_options", STRING_MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("num_datafiles_included", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private MigrateProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {

Review comment:
       nit: let's import `ProcedureBuilder` directly to match other procedures.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540464072



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/MigrateProcedure.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iceberg.actions.CreateAction;
+import org.apache.iceberg.actions.Spark3MigrateAction;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class MigrateProcedure extends BaseProcedure {
+  private static final DataType MAP = DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType);

Review comment:
       To keep the following to 1 line per entry




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540574539



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/MigrateProcedure.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iceberg.actions.CreateAction;
+import org.apache.iceberg.actions.Spark3MigrateAction;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class MigrateProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_options", STRING_MAP)

Review comment:
       I like simpler, as long as it is clear. And I agree that we should not use `options` because of the specific meaning with datasource tables. `properties` is fine with me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540571557



##########
File path: spark3-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
##########
@@ -76,6 +76,7 @@ callArgument
 
 expression
     : constant
+    | stringMap

Review comment:
       I considered this, but we really wanted to only support String maps and I was falling back on the expression level to tell us if it's a bad Map (unequal key values) but we can do that here if you like.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540854866



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.CreateAction;
+import org.apache.iceberg.actions.Spark3MigrateAction;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class MigrateTableProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("properties", STRING_MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("migrated_files_count", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private MigrateTableProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<MigrateTableProcedure>() {
+      @Override
+      protected MigrateTableProcedure doBuild() {
+        return new MigrateTableProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    String tableName = args.getString(0);

Review comment:
       I find naming here a bit inconsistent with other procedures. What about this?
   
   ```
   String identAsString = args.getString(0);
   CatalogAndIdentifier catalogAndIdent = toCatalogAndIdentifer(identAsString, PARAMETERS[0].name(), tableCatalog());
   
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540534892



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/SnapshotProcedure.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.SnapshotAction;
+import org.apache.iceberg.actions.Spark3SnapshotAction;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class SnapshotProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("snapshot_source", DataTypes.StringType),

Review comment:
       We may add `dest` prefix if we want to but I am not sure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540571450



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateProcedures.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestCreateProcedures extends SparkExtensionsTestBase {
+  private static final String sourceName = "spark_catalog.default.source";
+  // Currently we can only Snapshot only out of the Spark Session Catalog
+
+  public TestCreateProcedures(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %S", sourceName);
+  }
+
+  @Test
+  public void testMigrate() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql("DROP TABLE IF EXISTS %s_BACKUP_", tableName);
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    Object[] result = sql("CALL %s.system.migrate('%s')", catalogName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMigrateWithOptions() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    Object[] result = sql("CALL %s.system.migrate('%s', map('foo', 'bar'))", catalogName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    Map<String, String> props = validationCatalog.loadTable(tableIdent).properties();
+    Assert.assertEquals("Should have extra property set", "bar", props.get("foo"));
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshot() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql("CALL %s.system.snapshot('%s', '%s')", catalogName, sourceName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshotWithOptions() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql(
+        "CALL %s.system.snapshot( snapshot_source => '%s', table => '%s', table_options => map('foo','bar'))",
+        catalogName, sourceName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    Map<String, String> props = validationCatalog.loadTable(tableIdent).properties();
+    Assert.assertEquals("Should have extra property set", "bar", props.get("foo"));
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshotWithAlternateLocation() throws IOException {
+    Assume.assumeTrue("No Snapshoting with Alternate locations with Hadoop Catalogs", !catalogName.contains("hadoop"));
+    String location = temp.newFolder().toString();
+    String snapshotLocation = temp.newFolder().toString();
+    sql("CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql(
+        "CALL %s.system.snapshot( snapshot_source => '%s', table => '%s', table_location => '%s')",
+        catalogName, sourceName, tableName, snapshotLocation).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    String storageLocation = validationCatalog.loadTable(tableIdent).location();
+    Assert.assertEquals("Snapshot should be made at specified location", snapshotLocation, storageLocation);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testInvalidSnapshotsCases() {
+    AssertHelpers.assertThrows("Should not allow mixed args",
+        AnalysisException.class, "Named and positional arguments cannot be mixed",
+        () -> sql("CALL %s.system.snapshot('n', table => 't')", catalogName));
+
+    AssertHelpers.assertThrows("Should not resolve procedures in arbitrary namespaces",
+        NoSuchProcedureException.class, "not found",
+        () -> sql("CALL %s.custom.snapshot('n', 't')", catalogName));

Review comment:
       Similarly, this isn't the procedure name.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540513734



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
##########
@@ -37,9 +37,13 @@
 import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
 import org.apache.spark.sql.execution.CacheManager;
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
 import scala.Option;
 
 abstract class BaseProcedure implements Procedure {
+  protected static final DataType STRING_MAP = DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType);

Review comment:
       nit: how common is it to be in `BaseProcedure`? Not a strong opinion, just asking.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540632508



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateProcedures.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestCreateProcedures extends SparkExtensionsTestBase {
+  private static final String sourceName = "spark_catalog.default.source";
+  // Currently we can only Snapshot only out of the Spark Session Catalog
+
+  public TestCreateProcedures(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %S", sourceName);
+  }
+
+  @Test
+  public void testMigrate() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql("DROP TABLE IF EXISTS %s_BACKUP_", tableName);
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    Object[] result = sql("CALL %s.system.migrate('%s')", catalogName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMigrateWithOptions() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    Object[] result = sql("CALL %s.system.migrate('%s', map('foo', 'bar'))", catalogName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    Map<String, String> props = validationCatalog.loadTable(tableIdent).properties();
+    Assert.assertEquals("Should have extra property set", "bar", props.get("foo"));
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshot() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,

Review comment:
       ah yeah I only had that because I ran a test and canceled while it was running, I should have removed it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540535683



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/SnapshotProcedure.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.SnapshotAction;
+import org.apache.iceberg.actions.Spark3SnapshotAction;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class SnapshotProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("snapshot_source", DataTypes.StringType),
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_location", DataTypes.StringType),
+      ProcedureParameter.optional("table_options", STRING_MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("num_datafiles_included", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private SnapshotProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<SnapshotProcedure>() {
+      @Override
+      protected SnapshotProcedure doBuild() {
+        return new SnapshotProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    String source = args.getString(0);
+    String dest = args.getString(1);
+
+    String snapshotLocation = args.isNullAt(2) ? null : args.getString(2);
+
+    Map<String, String> options = Maps.newHashMap();
+    if (!args.isNullAt(3)) {
+      args.getMap(3).foreach(DataTypes.StringType, DataTypes.StringType,
+          (k, v) -> {
+            options.put(k.toString(), v.toString());
+            return BoxedUnit.UNIT;
+          });
+    }
+
+    CatalogAndIdentifier sourceIdent = toCatalogAndIdentifer(source, PARAMETERS[0].name(), tableCatalog());
+    CatalogAndIdentifier destIdent = toCatalogAndIdentifer(dest, PARAMETERS[1].name(), tableCatalog());
+
+    Preconditions.checkArgument(sourceIdent != destIdent || sourceIdent.catalog() != destIdent.catalog(),
+        "Cannot create a snapshot with the same name as the source of the snapshot.");
+    SnapshotAction action =  new Spark3SnapshotAction(spark(), sourceIdent.catalog(), sourceIdent.identifier(),
+        destIdent.catalog(), destIdent.identifier());
+
+    long numFiles;

Review comment:
       same comments as for migrate.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540578545



##########
File path: spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java
##########
@@ -46,11 +46,11 @@
  * previously referred to a non-iceberg table will refer to the newly migrated iceberg
  * table.
  */
-class Spark3MigrateAction extends Spark3CreateAction {
+public class Spark3MigrateAction extends Spark3CreateAction {

Review comment:
       Hm. I would prefer not to make these public, but I see that this needs the identifier that has already been parsed to do the custom catalog validation. Should be fine for now, but we should keep this in mind for when we fix the `Actions` API.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540535865



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/SnapshotProcedure.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.SnapshotAction;
+import org.apache.iceberg.actions.Spark3SnapshotAction;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class SnapshotProcedure extends BaseProcedure {

Review comment:
       `SnapshotTableProcedure`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540794964



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestSnapshotTableProcedure extends SparkExtensionsTestBase {
+  private static final String sourceName = "spark_catalog.default.source";
+  // Currently we can only Snapshot only out of the Spark Session Catalog
+
+  public TestSnapshotTableProcedure(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %S", sourceName);
+  }
+
+  @Test
+  public void testSnapshot() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql("CALL %s.system.snapshot('%s', '%s')", catalogName, sourceName, tableName).get(0);

Review comment:
       nit: `scalarSql` method should be used




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540535512



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/SnapshotProcedure.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.SnapshotAction;
+import org.apache.iceberg.actions.Spark3SnapshotAction;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class SnapshotProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("snapshot_source", DataTypes.StringType),
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_location", DataTypes.StringType),
+      ProcedureParameter.optional("table_options", STRING_MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("num_datafiles_included", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private SnapshotProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<SnapshotProcedure>() {
+      @Override
+      protected SnapshotProcedure doBuild() {
+        return new SnapshotProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    String source = args.getString(0);
+    String dest = args.getString(1);
+
+    String snapshotLocation = args.isNullAt(2) ? null : args.getString(2);
+
+    Map<String, String> options = Maps.newHashMap();
+    if (!args.isNullAt(3)) {
+      args.getMap(3).foreach(DataTypes.StringType, DataTypes.StringType,
+          (k, v) -> {
+            options.put(k.toString(), v.toString());
+            return BoxedUnit.UNIT;
+          });
+    }
+
+    CatalogAndIdentifier sourceIdent = toCatalogAndIdentifer(source, PARAMETERS[0].name(), tableCatalog());

Review comment:
       Same comments as for migrate.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540533715



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/MigrateProcedure.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iceberg.actions.CreateAction;
+import org.apache.iceberg.actions.Spark3MigrateAction;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class MigrateProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_options", STRING_MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("num_datafiles_included", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private MigrateProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<MigrateProcedure>() {
+      @Override
+      protected MigrateProcedure doBuild() {
+        return new MigrateProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Map<String, String> options = new HashMap<>();
+    if (!args.isNullAt(1)) {
+      args.getMap(1).foreach(DataTypes.StringType, DataTypes.StringType,
+          (k, v) -> {
+            options.put(k.toString(), v.toString());
+            return BoxedUnit.UNIT;
+          });
+    }
+
+    String tableName = args.getString(0);
+    CatalogAndIdentifier tableIdent = toCatalogAndIdentifer(tableName, PARAMETERS[0].name(), tableCatalog());
+    CreateAction action =  new Spark3MigrateAction(spark(), tableIdent.catalog(), tableIdent.identifier());
+
+    long numFiles = action.withProperties(options).execute();

Review comment:
       nit: `numMigratedFiles`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540576443



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/MigrateProcedure.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iceberg.actions.CreateAction;
+import org.apache.iceberg.actions.Spark3MigrateAction;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class MigrateProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_options", STRING_MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("num_datafiles_included", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private MigrateProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<MigrateProcedure>() {
+      @Override
+      protected MigrateProcedure doBuild() {
+        return new MigrateProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Map<String, String> options = new HashMap<>();
+    if (!args.isNullAt(1)) {
+      args.getMap(1).foreach(DataTypes.StringType, DataTypes.StringType,
+          (k, v) -> {
+            options.put(k.toString(), v.toString());
+            return BoxedUnit.UNIT;
+          });
+    }
+
+    String tableName = args.getString(0);
+    CatalogAndIdentifier tableIdent = toCatalogAndIdentifer(tableName, PARAMETERS[0].name(), tableCatalog());
+    CreateAction action =  new Spark3MigrateAction(spark(), tableIdent.catalog(), tableIdent.identifier());
+
+    long numFiles = action.withProperties(options).execute();
+    return new InternalRow[] {newInternalRow(numFiles)};
+  }
+
+  @Override
+  public String description() {

Review comment:
       The SQL plan uses `description`? I would expect description to be documentation and for the plan to use a method like `planString`.

##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/MigrateProcedure.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iceberg.actions.CreateAction;
+import org.apache.iceberg.actions.Spark3MigrateAction;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class MigrateProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_options", STRING_MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("num_datafiles_included", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private MigrateProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<MigrateProcedure>() {
+      @Override
+      protected MigrateProcedure doBuild() {
+        return new MigrateProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Map<String, String> options = new HashMap<>();
+    if (!args.isNullAt(1)) {
+      args.getMap(1).foreach(DataTypes.StringType, DataTypes.StringType,
+          (k, v) -> {
+            options.put(k.toString(), v.toString());
+            return BoxedUnit.UNIT;
+          });
+    }
+
+    String tableName = args.getString(0);
+    CatalogAndIdentifier tableIdent = toCatalogAndIdentifer(tableName, PARAMETERS[0].name(), tableCatalog());
+    CreateAction action =  new Spark3MigrateAction(spark(), tableIdent.catalog(), tableIdent.identifier());
+
+    long numFiles = action.withProperties(options).execute();
+    return new InternalRow[] {newInternalRow(numFiles)};
+  }
+
+  @Override
+  public String description() {

Review comment:
       The SQL plan uses `description`? I would expect description to be documentation and for the plan to use a method like `planString`. Not something we need to fix here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540597328



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/SnapshotProcedure.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iceberg.actions.SnapshotAction;
+import org.apache.iceberg.actions.Spark3SnapshotAction;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class SnapshotProcedure extends BaseProcedure {
+  private static final DataType MAP = DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType);
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("snapshot_source", DataTypes.StringType),
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_location", DataTypes.StringType),
+      ProcedureParameter.optional("table_options", MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("num_datafiles_included", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private SnapshotProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<SnapshotProcedure>() {
+      @Override
+      protected SnapshotProcedure doBuild() {
+        return new SnapshotProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    String source = args.getString(0);
+    String dest = args.getString(1);
+
+    String snapshotLocation = args.isNullAt(2) ? null : args.getString(2);
+
+    Map<String, String> options = new HashMap<>();
+    if (!args.isNullAt(3)) {
+      args.getMap(3).foreach(DataTypes.StringType, DataTypes.StringType,

Review comment:
       This is a little difficult because it's the Scala method on InternalRow which doesn't really return a map, it returns "catalyst.util.MapData", so it's not a real map and the method we are using is
   
   ```scala
     def foreach(keyType: DataType, valueType: DataType, f: (Any, Any) => Unit): Unit = {
   ```
   
   I thought about writing another method to convert MapData to Java maps but that also seems brittle. I think the safest thing to do here is use the Scala Method for MapData.foreach




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540860126



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.SnapshotAction;
+import org.apache.iceberg.actions.Spark3SnapshotAction;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class SnapshotTableProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("source_table", DataTypes.StringType),
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_location", DataTypes.StringType),
+      ProcedureParameter.optional("properties", STRING_MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("imported_datafiles_count", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private SnapshotTableProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<SnapshotTableProcedure>() {
+      @Override
+      protected SnapshotTableProcedure doBuild() {
+        return new SnapshotTableProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    String source = args.getString(0);
+    CatalogAndIdentifier sourceIdent = toCatalogAdnIdentifier(source, PARAMETERS[0].name(), tableCatalog());
+
+    String dest = args.getString(1);
+    CatalogAndIdentifier destIdent = toCatalogAdnIdentifier(dest, PARAMETERS[1].name(), tableCatalog());
+
+    String snapshotLocation = args.isNullAt(2) ? null : args.getString(2);
+
+    Map<String, String> options = Maps.newHashMap();
+    if (!args.isNullAt(3)) {
+      args.getMap(3).foreach(DataTypes.StringType, DataTypes.StringType,
+          (k, v) -> {
+            options.put(k.toString(), v.toString());
+            return BoxedUnit.UNIT;
+          });
+    }
+
+    Preconditions.checkArgument(sourceIdent != destIdent || sourceIdent.catalog() != destIdent.catalog(),
+        "Cannot create a snapshot with the same name as the source of the snapshot.");
+    SnapshotAction action =  new Spark3SnapshotAction(spark(), sourceIdent.catalog(), sourceIdent.identifier(),
+        destIdent.catalog(), destIdent.identifier());
+
+    long importedDataFiles;

Review comment:
       I'd separate assigning the snapshot location like this:
   
   ```
       if (snapshotLocation != null) {
         action.withLocation(snapshotLocation);
       }
   
       long numImportedFiles = action.withProperties(tableProps).execute();
       return new InternalRow[] {newInternalRow(numImportedFiles)};
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540856414



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.SnapshotAction;
+import org.apache.iceberg.actions.Spark3SnapshotAction;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class SnapshotTableProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("source_table", DataTypes.StringType),
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_location", DataTypes.StringType),

Review comment:
       Should it be just `location` as we just use `properties`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540798906



##########
File path: spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java
##########
@@ -122,6 +122,8 @@ public Long execute() {
     properties.put(TableCatalog.PROP_PROVIDER, "iceberg");
     properties.put("migrated", "true");
     properties.putAll(additionalProperties());
+    properties.putIfAbsent(LOCATION, sourceTableLocation());

Review comment:
       +1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540802041



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
##########
@@ -37,9 +37,13 @@
 import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
 import org.apache.spark.sql.execution.CacheManager;
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
 import scala.Option;
 
 abstract class BaseProcedure implements Procedure {
+  protected static final DataType STRING_MAP = DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType);

Review comment:
       Let's keep it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540571883



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateProcedures.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestCreateProcedures extends SparkExtensionsTestBase {
+  private static final String sourceName = "spark_catalog.default.source";
+  // Currently we can only Snapshot only out of the Spark Session Catalog
+
+  public TestCreateProcedures(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %S", sourceName);
+  }
+
+  @Test
+  public void testMigrate() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql("DROP TABLE IF EXISTS %s_BACKUP_", tableName);
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    Object[] result = sql("CALL %s.system.migrate('%s')", catalogName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMigrateWithOptions() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    Object[] result = sql("CALL %s.system.migrate('%s', map('foo', 'bar'))", catalogName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    Map<String, String> props = validationCatalog.loadTable(tableIdent).properties();
+    Assert.assertEquals("Should have extra property set", "bar", props.get("foo"));
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshot() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql("CALL %s.system.snapshot('%s', '%s')", catalogName, sourceName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshotWithOptions() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql(
+        "CALL %s.system.snapshot( snapshot_source => '%s', table => '%s', table_options => map('foo','bar'))",
+        catalogName, sourceName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    Map<String, String> props = validationCatalog.loadTable(tableIdent).properties();
+    Assert.assertEquals("Should have extra property set", "bar", props.get("foo"));
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshotWithAlternateLocation() throws IOException {
+    Assume.assumeTrue("No Snapshoting with Alternate locations with Hadoop Catalogs", !catalogName.contains("hadoop"));
+    String location = temp.newFolder().toString();
+    String snapshotLocation = temp.newFolder().toString();
+    sql("CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql(
+        "CALL %s.system.snapshot( snapshot_source => '%s', table => '%s', table_location => '%s')",
+        catalogName, sourceName, tableName, snapshotLocation).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    String storageLocation = validationCatalog.loadTable(tableIdent).location();
+    Assert.assertEquals("Snapshot should be made at specified location", snapshotLocation, storageLocation);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testInvalidSnapshotsCases() {
+    AssertHelpers.assertThrows("Should not allow mixed args",
+        AnalysisException.class, "Named and positional arguments cannot be mixed",
+        () -> sql("CALL %s.system.snapshot('n', table => 't')", catalogName));
+
+    AssertHelpers.assertThrows("Should not resolve procedures in arbitrary namespaces",
+        NoSuchProcedureException.class, "not found",
+        () -> sql("CALL %s.custom.snapshot('n', 't')", catalogName));
+
+    AssertHelpers.assertThrows("Should reject calls without all required args",
+        AnalysisException.class, "Missing required parameters",
+        () -> sql("CALL %s.system.snapshot('foo')", catalogName));
+
+    AssertHelpers.assertThrows("Should reject calls with invalid arg types",
+        AnalysisException.class, "Wrong arg type",
+        () -> sql("CALL %s.system.snapshot('n', 't', map('foo', 'bar'))", catalogName));
+
+    AssertHelpers.assertThrows("Should reject calls with empty table identifier",
+        IllegalArgumentException.class, "Cannot handle an empty identifier",
+        () -> sql("CALL %s.system.snapshot('', '')", catalogName));

Review comment:
       This can only validate one case, where either source or dest is empty. I think this should be split into empty source and empty dest cases.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540792636



##########
File path: spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java
##########
@@ -46,11 +46,11 @@
  * previously referred to a non-iceberg table will refer to the newly migrated iceberg
  * table.
  */
-class Spark3MigrateAction extends Spark3CreateAction {
+public class Spark3MigrateAction extends Spark3CreateAction {

Review comment:
       This is another point to fix in actions before we release 0.11.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540566853



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateProcedures.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestCreateProcedures extends SparkExtensionsTestBase {
+  private static final String sourceName = "spark_catalog.default.source";
+  // Currently we can only Snapshot only out of the Spark Session Catalog
+
+  public TestCreateProcedures(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %S", sourceName);
+  }
+
+  @Test
+  public void testMigrate() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql("DROP TABLE IF EXISTS %s_BACKUP_", tableName);
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    Object[] result = sql("CALL %s.system.migrate('%s')", catalogName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMigrateWithOptions() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    Object[] result = sql("CALL %s.system.migrate('%s', map('foo', 'bar'))", catalogName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    Map<String, String> props = validationCatalog.loadTable(tableIdent).properties();
+    Assert.assertEquals("Should have extra property set", "bar", props.get("foo"));
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshot() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,

Review comment:
       Other tests drop the table if exists rather than using `IF NOT EXISTS`. I think that's a better pattern because if the table already exists, it probably violates the assumptions of this test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540562816



##########
File path: spark3-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
##########
@@ -76,6 +76,7 @@ callArgument
 
 expression
     : constant
+    | stringMap

Review comment:
       Actually, we may want to remove `keyValue` and just use `constant` so that a missing value doesn't result in a parse error. Instead we can give a better error message in the AST builder.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540529365



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/MigrateProcedure.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iceberg.actions.CreateAction;
+import org.apache.iceberg.actions.Spark3MigrateAction;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class MigrateProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_options", STRING_MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("num_datafiles_included", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private MigrateProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<MigrateProcedure>() {
+      @Override
+      protected MigrateProcedure doBuild() {
+        return new MigrateProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Map<String, String> options = new HashMap<>();

Review comment:
       nit: `Maps.newHashMap()`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540857722



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.SnapshotAction;
+import org.apache.iceberg.actions.Spark3SnapshotAction;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class SnapshotTableProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("source_table", DataTypes.StringType),
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_location", DataTypes.StringType),
+      ProcedureParameter.optional("properties", STRING_MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("imported_datafiles_count", DataTypes.LongType, false, Metadata.empty())

Review comment:
       Think the name should match whatever we do in migrate. What about `imported_files_count`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540524351



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/MigrateProcedure.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iceberg.actions.CreateAction;
+import org.apache.iceberg.actions.Spark3MigrateAction;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class MigrateProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_options", STRING_MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("num_datafiles_included", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private MigrateProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<MigrateProcedure>() {
+      @Override
+      protected MigrateProcedure doBuild() {
+        return new MigrateProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Map<String, String> options = new HashMap<>();
+    if (!args.isNullAt(1)) {
+      args.getMap(1).foreach(DataTypes.StringType, DataTypes.StringType,
+          (k, v) -> {
+            options.put(k.toString(), v.toString());
+            return BoxedUnit.UNIT;
+          });
+    }
+
+    String tableName = args.getString(0);
+    CatalogAndIdentifier tableIdent = toCatalogAndIdentifer(tableName, PARAMETERS[0].name(), tableCatalog());
+    CreateAction action =  new Spark3MigrateAction(spark(), tableIdent.catalog(), tableIdent.identifier());

Review comment:
       nit: extra space




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540532421



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/MigrateProcedure.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iceberg.actions.CreateAction;
+import org.apache.iceberg.actions.Spark3MigrateAction;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class MigrateProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_options", STRING_MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("num_datafiles_included", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private MigrateProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<MigrateProcedure>() {
+      @Override
+      protected MigrateProcedure doBuild() {
+        return new MigrateProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Map<String, String> options = new HashMap<>();
+    if (!args.isNullAt(1)) {
+      args.getMap(1).foreach(DataTypes.StringType, DataTypes.StringType,
+          (k, v) -> {
+            options.put(k.toString(), v.toString());
+            return BoxedUnit.UNIT;
+          });
+    }
+
+    String tableName = args.getString(0);
+    CatalogAndIdentifier tableIdent = toCatalogAndIdentifer(tableName, PARAMETERS[0].name(), tableCatalog());
+    CreateAction action =  new Spark3MigrateAction(spark(), tableIdent.catalog(), tableIdent.identifier());
+
+    long numFiles = action.withProperties(options).execute();
+    return new InternalRow[] {newInternalRow(numFiles)};
+  }
+
+  @Override
+  public String description() {

Review comment:
       ah yeah, sorry




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r541013686



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestSnapshotTableProcedure extends SparkExtensionsTestBase {
+  private static final String sourceName = "spark_catalog.default.source";
+  // Currently we can only Snapshot only out of the Spark Session Catalog
+
+  public TestSnapshotTableProcedure(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %S", sourceName);
+  }
+
+  @Test
+  public void testSnapshot() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql("CALL %s.system.snapshot('%s', '%s')", catalogName, sourceName, tableName).get(0);
+
+    Assert.assertEquals("Should have added one file", 1L, result[0]);
+
+    Table createdTable = validationCatalog.loadTable(tableIdent);
+    String tableLocation = createdTable.location();
+    Assert.assertNotEquals("Table should not have the original location", location, tableLocation);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshotWithOptions() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object result = scalarSql(
+        "CALL %s.system.snapshot(source_table => '%s', table => '%s', properties => map('foo','bar'))",
+        catalogName, sourceName, tableName);
+
+    Assert.assertEquals("Should have added one file", 1L, result);
+
+    Table createdTable = validationCatalog.loadTable(tableIdent);
+
+    String tableLocation = createdTable.location();
+    Assert.assertNotEquals("Table should not have the original location", location, tableLocation);
+
+    Map<String, String> props = createdTable.properties();
+    Assert.assertEquals("Should have extra property set", "bar", props.get("foo"));
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshotWithAlternateLocation() throws IOException {
+    Assume.assumeTrue("No Snapshoting with Alternate locations with Hadoop Catalogs", !catalogName.contains("hadoop"));
+    String location = temp.newFolder().toString();
+    String snapshotLocation = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql(
+        "CALL %s.system.snapshot(source_table => '%s', table => '%s', table_location => '%s')",
+        catalogName, sourceName, tableName, snapshotLocation).get(0);
+
+    Assert.assertEquals("Should have added one file", 1L, result[0]);
+
+    String storageLocation = validationCatalog.loadTable(tableIdent).location();
+    Assert.assertEquals("Snapshot should be made at specified location", snapshotLocation, storageLocation);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testInvalidSnapshotsCases() {
+    AssertHelpers.assertThrows("Should reject calls without all required args",
+        AnalysisException.class, "Missing required parameters",
+        () -> sql("CALL %s.system.snapshot('foo')", catalogName));
+
+    AssertHelpers.assertThrows("Should reject calls with invalid arg types",
+        AnalysisException.class, "Wrong arg type",
+        () -> sql("CALL %s.system.snapshot('n', 't', map('foo', 'bar'))", catalogName));
+
+    AssertHelpers.assertThrows("Should reject calls with empty table identifier",
+        IllegalArgumentException.class, "Cannot handle an empty identifier",
+        () -> sql("CALL %s.system.snapshot('', 'dest')", catalogName));
+
+    AssertHelpers.assertThrows("Should reject calls with empty table identifier",

Review comment:
       Let's ignore this for now then.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540859105



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.SnapshotAction;
+import org.apache.iceberg.actions.Spark3SnapshotAction;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class SnapshotTableProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("source_table", DataTypes.StringType),
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_location", DataTypes.StringType),
+      ProcedureParameter.optional("properties", STRING_MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("imported_datafiles_count", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private SnapshotTableProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<SnapshotTableProcedure>() {
+      @Override
+      protected SnapshotTableProcedure doBuild() {
+        return new SnapshotTableProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    String source = args.getString(0);
+    CatalogAndIdentifier sourceIdent = toCatalogAdnIdentifier(source, PARAMETERS[0].name(), tableCatalog());
+
+    String dest = args.getString(1);
+    CatalogAndIdentifier destIdent = toCatalogAdnIdentifier(dest, PARAMETERS[1].name(), tableCatalog());
+
+    String snapshotLocation = args.isNullAt(2) ? null : args.getString(2);
+
+    Map<String, String> options = Maps.newHashMap();

Review comment:
       nit: `tableProps`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540794664



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestSnapshotTableProcedure extends SparkExtensionsTestBase {
+  private static final String sourceName = "spark_catalog.default.source";
+  // Currently we can only Snapshot only out of the Spark Session Catalog
+
+  public TestSnapshotTableProcedure(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %S", sourceName);
+  }
+
+  @Test
+  public void testSnapshot() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);

Review comment:
       nit: this could fit on one line, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540564803



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateProcedures.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestCreateProcedures extends SparkExtensionsTestBase {
+  private static final String sourceName = "spark_catalog.default.source";
+  // Currently we can only Snapshot only out of the Spark Session Catalog
+
+  public TestCreateProcedures(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %S", sourceName);
+  }
+
+  @Test
+  public void testMigrate() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql("DROP TABLE IF EXISTS %s_BACKUP_", tableName);
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    Object[] result = sql("CALL %s.system.migrate('%s')", catalogName, tableName).get(0);

Review comment:
       There's a `scalarSql` method for when a SQL command produces one row with one value. That will assert that there is only one row and one column, which may be easier.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540795665



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestSnapshotTableProcedure extends SparkExtensionsTestBase {
+  private static final String sourceName = "spark_catalog.default.source";
+  // Currently we can only Snapshot only out of the Spark Session Catalog
+
+  public TestSnapshotTableProcedure(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %S", sourceName);
+  }
+
+  @Test
+  public void testSnapshot() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql("CALL %s.system.snapshot('%s', '%s')", catalogName, sourceName, tableName).get(0);
+
+    Assert.assertEquals("Should have added one file", 1L, result[0]);
+
+    Table createdTable = validationCatalog.loadTable(tableIdent);
+    String tableLocation = createdTable.location();
+    Assert.assertNotEquals("Table should not have the original location", location, tableLocation);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshotWithOptions() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);

Review comment:
       nit: this should fit on one line




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540532737



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
##########
@@ -70,21 +82,22 @@ protected BaseProcedure(TableCatalog tableCatalog) {
   }
 
   protected Identifier toIdentifier(String identifierAsString, String argName) {
-    Preconditions.checkArgument(identifierAsString != null && !identifierAsString.isEmpty(),
-        "Cannot handle an empty identifier for argument %s", argName);
-
-    CatalogAndIdentifier catalogAndIdentifier = Spark3Util.catalogAndIdentifier(
-        "identifier for arg " + argName, spark, identifierAsString, tableCatalog);
-
-    CatalogPlugin catalog = catalogAndIdentifier.catalog();
-    Identifier identifier = catalogAndIdentifier.identifier();
+    CatalogAndIdentifier catalogAndIdentifier = toCatalogAndIdentifer(identifierAsString, argName, tableCatalog);
 
     Preconditions.checkArgument(
-        catalog.equals(tableCatalog),
+        catalogAndIdentifier.catalog().equals(tableCatalog),
         "Cannot run procedure in catalog '%s': '%s' is a table in catalog '%s'",
-        tableCatalog.name(), identifierAsString, catalog.name());
+        tableCatalog.name(), identifierAsString, catalogAndIdentifier.catalog().name());
+
+    return catalogAndIdentifier.identifier();
+  }
+
+  protected CatalogAndIdentifier toCatalogAndIdentifer(String identifierAsString, String argName,

Review comment:
       ah yeah, let me fix that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540970468



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestSnapshotTableProcedure extends SparkExtensionsTestBase {
+  private static final String sourceName = "spark_catalog.default.source";
+  // Currently we can only Snapshot only out of the Spark Session Catalog
+
+  public TestSnapshotTableProcedure(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %S", sourceName);
+  }
+
+  @Test
+  public void testSnapshot() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql("CALL %s.system.snapshot('%s', '%s')", catalogName, sourceName, tableName).get(0);
+
+    Assert.assertEquals("Should have added one file", 1L, result[0]);
+
+    Table createdTable = validationCatalog.loadTable(tableIdent);
+    String tableLocation = createdTable.location();
+    Assert.assertNotEquals("Table should not have the original location", location, tableLocation);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshotWithOptions() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object result = scalarSql(
+        "CALL %s.system.snapshot(source_table => '%s', table => '%s', properties => map('foo','bar'))",
+        catalogName, sourceName, tableName);
+
+    Assert.assertEquals("Should have added one file", 1L, result);
+
+    Table createdTable = validationCatalog.loadTable(tableIdent);
+
+    String tableLocation = createdTable.location();
+    Assert.assertNotEquals("Table should not have the original location", location, tableLocation);
+
+    Map<String, String> props = createdTable.properties();
+    Assert.assertEquals("Should have extra property set", "bar", props.get("foo"));
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshotWithAlternateLocation() throws IOException {
+    Assume.assumeTrue("No Snapshoting with Alternate locations with Hadoop Catalogs", !catalogName.contains("hadoop"));
+    String location = temp.newFolder().toString();
+    String snapshotLocation = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql(
+        "CALL %s.system.snapshot(source_table => '%s', table => '%s', table_location => '%s')",
+        catalogName, sourceName, tableName, snapshotLocation).get(0);
+
+    Assert.assertEquals("Should have added one file", 1L, result[0]);
+
+    String storageLocation = validationCatalog.loadTable(tableIdent).location();
+    Assert.assertEquals("Snapshot should be made at specified location", snapshotLocation, storageLocation);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testInvalidSnapshotsCases() {
+    AssertHelpers.assertThrows("Should reject calls without all required args",
+        AnalysisException.class, "Missing required parameters",
+        () -> sql("CALL %s.system.snapshot('foo')", catalogName));
+
+    AssertHelpers.assertThrows("Should reject calls with invalid arg types",
+        AnalysisException.class, "Wrong arg type",
+        () -> sql("CALL %s.system.snapshot('n', 't', map('foo', 'bar'))", catalogName));
+
+    AssertHelpers.assertThrows("Should reject calls with empty table identifier",
+        IllegalArgumentException.class, "Cannot handle an empty identifier",
+        () -> sql("CALL %s.system.snapshot('', 'dest')", catalogName));
+
+    AssertHelpers.assertThrows("Should reject calls with empty table identifier",

Review comment:
       Almost everything apparently validly casts to a string, at least everything our parser allows here. This would be easier I think if we changed the parser to accept all possible expressions. I think for now the best I can do is an unbalanced map?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540524124



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/MigrateProcedure.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iceberg.actions.CreateAction;
+import org.apache.iceberg.actions.Spark3MigrateAction;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class MigrateProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_options", STRING_MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("num_datafiles_included", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private MigrateProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<MigrateProcedure>() {
+      @Override
+      protected MigrateProcedure doBuild() {
+        return new MigrateProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Map<String, String> options = new HashMap<>();
+    if (!args.isNullAt(1)) {
+      args.getMap(1).foreach(DataTypes.StringType, DataTypes.StringType,
+          (k, v) -> {
+            options.put(k.toString(), v.toString());
+            return BoxedUnit.UNIT;
+          });
+    }
+
+    String tableName = args.getString(0);
+    CatalogAndIdentifier tableIdent = toCatalogAndIdentifer(tableName, PARAMETERS[0].name(), tableCatalog());
+    CreateAction action =  new Spark3MigrateAction(spark(), tableIdent.catalog(), tableIdent.identifier());
+
+    long numFiles = action.withProperties(options).execute();
+    return new InternalRow[] {newInternalRow(numFiles)};
+  }
+
+  @Override
+  public String description() {

Review comment:
       I am afraid this description will not show properly in the SQL plan. Let's make it `MigrateTableProcedure` as in other procedures.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540856964



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.SnapshotAction;
+import org.apache.iceberg.actions.Spark3SnapshotAction;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class SnapshotTableProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("source_table", DataTypes.StringType),
+      ProcedureParameter.required("table", DataTypes.StringType),

Review comment:
       I am okay without `dest` prefix but if anyone feels strongly, I am ok to add.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540522595



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/MigrateProcedure.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iceberg.actions.CreateAction;
+import org.apache.iceberg.actions.Spark3MigrateAction;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class MigrateProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_options", STRING_MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("num_datafiles_included", DataTypes.LongType, false, Metadata.empty())

Review comment:
       Should we make it `migrated_files_count` to match other procedures?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540575865



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/MigrateProcedure.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iceberg.actions.CreateAction;
+import org.apache.iceberg.actions.Spark3MigrateAction;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class MigrateProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_options", STRING_MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("num_datafiles_included", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private MigrateProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<MigrateProcedure>() {
+      @Override
+      protected MigrateProcedure doBuild() {
+        return new MigrateProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Map<String, String> options = new HashMap<>();
+    if (!args.isNullAt(1)) {
+      args.getMap(1).foreach(DataTypes.StringType, DataTypes.StringType,
+          (k, v) -> {
+            options.put(k.toString(), v.toString());
+            return BoxedUnit.UNIT;
+          });
+    }
+
+    String tableName = args.getString(0);
+    CatalogAndIdentifier tableIdent = toCatalogAndIdentifer(tableName, PARAMETERS[0].name(), tableCatalog());
+    CreateAction action =  new Spark3MigrateAction(spark(), tableIdent.catalog(), tableIdent.identifier());

Review comment:
       What is the rationale for creating actions directly instead of going through `Actions`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#issuecomment-743124337


   Just minor comments, should be good to go after that.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540797805



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestSnapshotTableProcedure extends SparkExtensionsTestBase {
+  private static final String sourceName = "spark_catalog.default.source";
+  // Currently we can only Snapshot only out of the Spark Session Catalog
+
+  public TestSnapshotTableProcedure(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %S", sourceName);
+  }
+
+  @Test
+  public void testSnapshot() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql("CALL %s.system.snapshot('%s', '%s')", catalogName, sourceName, tableName).get(0);
+
+    Assert.assertEquals("Should have added one file", 1L, result[0]);
+
+    Table createdTable = validationCatalog.loadTable(tableIdent);
+    String tableLocation = createdTable.location();
+    Assert.assertNotEquals("Table should not have the original location", location, tableLocation);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshotWithOptions() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object result = scalarSql(
+        "CALL %s.system.snapshot(source_table => '%s', table => '%s', properties => map('foo','bar'))",
+        catalogName, sourceName, tableName);
+
+    Assert.assertEquals("Should have added one file", 1L, result);
+
+    Table createdTable = validationCatalog.loadTable(tableIdent);
+
+    String tableLocation = createdTable.location();
+    Assert.assertNotEquals("Table should not have the original location", location, tableLocation);
+
+    Map<String, String> props = createdTable.properties();
+    Assert.assertEquals("Should have extra property set", "bar", props.get("foo"));
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshotWithAlternateLocation() throws IOException {
+    Assume.assumeTrue("No Snapshoting with Alternate locations with Hadoop Catalogs", !catalogName.contains("hadoop"));
+    String location = temp.newFolder().toString();
+    String snapshotLocation = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql(
+        "CALL %s.system.snapshot(source_table => '%s', table => '%s', table_location => '%s')",
+        catalogName, sourceName, tableName, snapshotLocation).get(0);
+
+    Assert.assertEquals("Should have added one file", 1L, result[0]);
+
+    String storageLocation = validationCatalog.loadTable(tableIdent).location();
+    Assert.assertEquals("Snapshot should be made at specified location", snapshotLocation, storageLocation);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testInvalidSnapshotsCases() {
+    AssertHelpers.assertThrows("Should reject calls without all required args",
+        AnalysisException.class, "Missing required parameters",
+        () -> sql("CALL %s.system.snapshot('foo')", catalogName));
+
+    AssertHelpers.assertThrows("Should reject calls with invalid arg types",
+        AnalysisException.class, "Wrong arg type",
+        () -> sql("CALL %s.system.snapshot('n', 't', map('foo', 'bar'))", catalogName));
+
+    AssertHelpers.assertThrows("Should reject calls with empty table identifier",
+        IllegalArgumentException.class, "Cannot handle an empty identifier",
+        () -> sql("CALL %s.system.snapshot('', 'dest')", catalogName));
+
+    AssertHelpers.assertThrows("Should reject calls with empty table identifier",

Review comment:
       Should we also add a test where we get a map where keys or values are not strings?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540576906



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/SnapshotProcedure.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.SnapshotAction;
+import org.apache.iceberg.actions.Spark3SnapshotAction;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class SnapshotProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("snapshot_source", DataTypes.StringType),

Review comment:
       +1 to those argument names. I'd be okay with just `properties` and `location`, but I'm fine with the `table_` prefix as well. (It should also match what we do for migrate.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540606464



##########
File path: spark3-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
##########
@@ -76,6 +76,7 @@ callArgument
 
 expression
     : constant
+    | stringMap

Review comment:
       Yeah, I think the list works. The main thing is whether we should allow passing non-string values.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540798713



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
##########
@@ -70,21 +82,22 @@ protected BaseProcedure(TableCatalog tableCatalog) {
   }
 
   protected Identifier toIdentifier(String identifierAsString, String argName) {
-    Preconditions.checkArgument(identifierAsString != null && !identifierAsString.isEmpty(),
-        "Cannot handle an empty identifier for argument %s", argName);
-
-    CatalogAndIdentifier catalogAndIdentifier = Spark3Util.catalogAndIdentifier(
-        "identifier for arg " + argName, spark, identifierAsString, tableCatalog);
-
-    CatalogPlugin catalog = catalogAndIdentifier.catalog();
-    Identifier identifier = catalogAndIdentifier.identifier();
+    CatalogAndIdentifier catalogAndIdentifier = toCatalogAdnIdentifier(identifierAsString, argName, tableCatalog);
 
     Preconditions.checkArgument(
-        catalog.equals(tableCatalog),
+        catalogAndIdentifier.catalog().equals(tableCatalog),
         "Cannot run procedure in catalog '%s': '%s' is a table in catalog '%s'",
-        tableCatalog.name(), identifierAsString, catalog.name());
+        tableCatalog.name(), identifierAsString, catalogAndIdentifier.catalog().name());
+
+    return catalogAndIdentifier.identifier();
+  }
+
+  protected CatalogAndIdentifier toCatalogAdnIdentifier(String identifierAsString, String argName,

Review comment:
       I think there is a typo: `toCatalogAdnIdentifier` ->  `toCatalogAndIdentifier`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540797009



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestSnapshotTableProcedure extends SparkExtensionsTestBase {
+  private static final String sourceName = "spark_catalog.default.source";
+  // Currently we can only Snapshot only out of the Spark Session Catalog
+
+  public TestSnapshotTableProcedure(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %S", sourceName);
+  }
+
+  @Test
+  public void testSnapshot() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql("CALL %s.system.snapshot('%s', '%s')", catalogName, sourceName, tableName).get(0);
+
+    Assert.assertEquals("Should have added one file", 1L, result[0]);
+
+    Table createdTable = validationCatalog.loadTable(tableIdent);
+    String tableLocation = createdTable.location();
+    Assert.assertNotEquals("Table should not have the original location", location, tableLocation);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshotWithOptions() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object result = scalarSql(
+        "CALL %s.system.snapshot(source_table => '%s', table => '%s', properties => map('foo','bar'))",
+        catalogName, sourceName, tableName);
+
+    Assert.assertEquals("Should have added one file", 1L, result);
+
+    Table createdTable = validationCatalog.loadTable(tableIdent);
+
+    String tableLocation = createdTable.location();
+    Assert.assertNotEquals("Table should not have the original location", location, tableLocation);
+
+    Map<String, String> props = createdTable.properties();
+    Assert.assertEquals("Should have extra property set", "bar", props.get("foo"));
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshotWithAlternateLocation() throws IOException {
+    Assume.assumeTrue("No Snapshoting with Alternate locations with Hadoop Catalogs", !catalogName.contains("hadoop"));
+    String location = temp.newFolder().toString();
+    String snapshotLocation = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,

Review comment:
       nit: should fit on one line




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540663057



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/MigrateProcedure.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iceberg.actions.CreateAction;
+import org.apache.iceberg.actions.Spark3MigrateAction;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class MigrateProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_options", STRING_MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("num_datafiles_included", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private MigrateProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<MigrateProcedure>() {
+      @Override
+      protected MigrateProcedure doBuild() {
+        return new MigrateProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Map<String, String> options = new HashMap<>();
+    if (!args.isNullAt(1)) {
+      args.getMap(1).foreach(DataTypes.StringType, DataTypes.StringType,
+          (k, v) -> {
+            options.put(k.toString(), v.toString());
+            return BoxedUnit.UNIT;
+          });
+    }
+
+    String tableName = args.getString(0);
+    CatalogAndIdentifier tableIdent = toCatalogAndIdentifer(tableName, PARAMETERS[0].name(), tableCatalog());
+    CreateAction action =  new Spark3MigrateAction(spark(), tableIdent.catalog(), tableIdent.identifier());

Review comment:
       We have to do this for the default catalog switch. The Actions api uses our normal catalogAndIdentifier method which defaults to using the CatalogManager currentCatalog for the default catalog




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540605474



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/SnapshotProcedure.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.SnapshotAction;
+import org.apache.iceberg.actions.Spark3SnapshotAction;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class SnapshotProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("snapshot_source", DataTypes.StringType),

Review comment:
       I think @RussellSpitzer and I commented at the same time so I didn't see his comment. I think it would be fine to use `source_table` and `dest_table`.
   
   I just don't think that including the procedure name in the argument names is helpful. So rather than `snapshot_source` I would use `source_table`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540577407



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/SnapshotProcedure.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iceberg.actions.SnapshotAction;
+import org.apache.iceberg.actions.Spark3SnapshotAction;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class SnapshotProcedure extends BaseProcedure {
+  private static final DataType MAP = DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType);
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("snapshot_source", DataTypes.StringType),
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_location", DataTypes.StringType),
+      ProcedureParameter.optional("table_options", MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("num_datafiles_included", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private SnapshotProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<SnapshotProcedure>() {
+      @Override
+      protected SnapshotProcedure doBuild() {
+        return new SnapshotProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    String source = args.getString(0);
+    String dest = args.getString(1);
+
+    String snapshotLocation = args.isNullAt(2) ? null : args.getString(2);
+
+    Map<String, String> options = new HashMap<>();
+    if (!args.isNullAt(3)) {
+      args.getMap(3).foreach(DataTypes.StringType, DataTypes.StringType,

Review comment:
       Yeah, maybe we should rely on Scala conversions and then do all the logic in Java? I'm not sure what impact this may have on compatibility, but I'd rather not break because Scala changes something.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540533546



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/MigrateProcedure.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iceberg.actions.CreateAction;
+import org.apache.iceberg.actions.Spark3MigrateAction;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class MigrateProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_options", STRING_MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("num_datafiles_included", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private MigrateProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<MigrateProcedure>() {
+      @Override
+      protected MigrateProcedure doBuild() {
+        return new MigrateProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Map<String, String> options = new HashMap<>();
+    if (!args.isNullAt(1)) {
+      args.getMap(1).foreach(DataTypes.StringType, DataTypes.StringType,
+          (k, v) -> {
+            options.put(k.toString(), v.toString());
+            return BoxedUnit.UNIT;
+          });
+    }
+
+    String tableName = args.getString(0);

Review comment:
       I find it a bit misleading that we don't process arguments in the order they are defined.
   How about this?
   
   ```
       String identAsString = args.getString(0);
       CatalogAndIdentifier catalogAndIdent = toCatalogAndIdentifer(identAsString, PARAMETERS[0].name(), tableCatalog());
   
       Map<String, String> tableProps = Maps.newHashMap();
       MapData providedProps = args.getMap(1);
       if (providedProps != null) {
         providedProps.foreach(DataTypes.StringType, DataTypes.StringType,
             (k, v) -> {
               tableProps.put(k.toString(), v.toString());
               return BoxedUnit.UNIT;
             });
       }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540561987



##########
File path: spark3-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
##########
@@ -76,6 +76,7 @@ callArgument
 
 expression
     : constant
+    | stringMap

Review comment:
       Looks like this is intended to mimic the [`map` function](https://spark.apache.org/docs/latest/api/sql/#map)?
   
   If so, should this allow any constant for keys and values? I would expect something like this:
   
   ```
   mapLiteral
       : MAP '(' pairs+=keyValue, (',' pairs+=keyValue)* ')'
       ;
   
   keyValue
       : key=constant ',' value=constant
       ;
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540657199



##########
File path: spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java
##########
@@ -46,11 +46,11 @@
  * previously referred to a non-iceberg table will refer to the newly migrated iceberg
  * table.
  */
-class Spark3MigrateAction extends Spark3CreateAction {
+public class Spark3MigrateAction extends Spark3CreateAction {

Review comment:
       I think we should really have all of these spark implementations of actions in a "spark" package so we don't have to public everything.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540463297



##########
File path: spark3-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
##########
@@ -76,6 +76,7 @@ callArgument
 
 expression
     : constant
+    | stringMap

Review comment:
       This is required to allow Map(string, string, *) expressions 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540658626



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateProcedures.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestCreateProcedures extends SparkExtensionsTestBase {
+  private static final String sourceName = "spark_catalog.default.source";
+  // Currently we can only Snapshot only out of the Spark Session Catalog
+
+  public TestCreateProcedures(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %S", sourceName);
+  }
+
+  @Test
+  public void testMigrate() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql("DROP TABLE IF EXISTS %s_BACKUP_", tableName);
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    Object[] result = sql("CALL %s.system.migrate('%s')", catalogName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMigrateWithOptions() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    Object[] result = sql("CALL %s.system.migrate('%s', map('foo', 'bar'))", catalogName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    Map<String, String> props = validationCatalog.loadTable(tableIdent).properties();
+    Assert.assertEquals("Should have extra property set", "bar", props.get("foo"));
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshot() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql("CALL %s.system.snapshot('%s', '%s')", catalogName, sourceName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshotWithOptions() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql(
+        "CALL %s.system.snapshot( snapshot_source => '%s', table => '%s', table_options => map('foo','bar'))",
+        catalogName, sourceName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    Map<String, String> props = validationCatalog.loadTable(tableIdent).properties();
+    Assert.assertEquals("Should have extra property set", "bar", props.get("foo"));
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshotWithAlternateLocation() throws IOException {
+    Assume.assumeTrue("No Snapshoting with Alternate locations with Hadoop Catalogs", !catalogName.contains("hadoop"));
+    String location = temp.newFolder().toString();
+    String snapshotLocation = temp.newFolder().toString();
+    sql("CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql(
+        "CALL %s.system.snapshot( snapshot_source => '%s', table => '%s', table_location => '%s')",
+        catalogName, sourceName, tableName, snapshotLocation).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    String storageLocation = validationCatalog.loadTable(tableIdent).location();
+    Assert.assertEquals("Snapshot should be made at specified location", snapshotLocation, storageLocation);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testInvalidSnapshotsCases() {
+    AssertHelpers.assertThrows("Should not allow mixed args",
+        AnalysisException.class, "Named and positional arguments cannot be mixed",
+        () -> sql("CALL %s.system.snapshot('n', table => 't')", catalogName));

Review comment:
       Sure I can remove these




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540529048



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
##########
@@ -70,21 +82,22 @@ protected BaseProcedure(TableCatalog tableCatalog) {
   }
 
   protected Identifier toIdentifier(String identifierAsString, String argName) {
-    Preconditions.checkArgument(identifierAsString != null && !identifierAsString.isEmpty(),
-        "Cannot handle an empty identifier for argument %s", argName);
-
-    CatalogAndIdentifier catalogAndIdentifier = Spark3Util.catalogAndIdentifier(
-        "identifier for arg " + argName, spark, identifierAsString, tableCatalog);
-
-    CatalogPlugin catalog = catalogAndIdentifier.catalog();
-    Identifier identifier = catalogAndIdentifier.identifier();
+    CatalogAndIdentifier catalogAndIdentifier = toCatalogAndIdentifer(identifierAsString, argName, tableCatalog);
 
     Preconditions.checkArgument(
-        catalog.equals(tableCatalog),
+        catalogAndIdentifier.catalog().equals(tableCatalog),
         "Cannot run procedure in catalog '%s': '%s' is a table in catalog '%s'",
-        tableCatalog.name(), identifierAsString, catalog.name());
+        tableCatalog.name(), identifierAsString, catalogAndIdentifier.catalog().name());
+
+    return catalogAndIdentifier.identifier();
+  }
+
+  protected CatalogAndIdentifier toCatalogAndIdentifer(String identifierAsString, String argName,

Review comment:
       typo? `toCatalogAndIdentifer` -> `toCatalogAndIdentifier`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540533952



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/MigrateProcedure.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iceberg.actions.CreateAction;
+import org.apache.iceberg.actions.Spark3MigrateAction;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class MigrateProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_options", STRING_MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("num_datafiles_included", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private MigrateProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<MigrateProcedure>() {
+      @Override
+      protected MigrateProcedure doBuild() {
+        return new MigrateProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Map<String, String> options = new HashMap<>();

Review comment:
       nit: `options` -> `tableProps`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540534636



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/SnapshotProcedure.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.SnapshotAction;
+import org.apache.iceberg.actions.Spark3SnapshotAction;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class SnapshotProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("snapshot_source", DataTypes.StringType),

Review comment:
       What about `source_table`, `table`, `table_location`, `table_properties`? cc @RussellSpitzer @rdblue




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540575079



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/SnapshotProcedure.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.SnapshotAction;
+import org.apache.iceberg.actions.Spark3SnapshotAction;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class SnapshotProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("snapshot_source", DataTypes.StringType),

Review comment:
       I really like source and dest here :/




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540567645



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateProcedures.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestCreateProcedures extends SparkExtensionsTestBase {
+  private static final String sourceName = "spark_catalog.default.source";
+  // Currently we can only Snapshot only out of the Spark Session Catalog
+
+  public TestCreateProcedures(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %S", sourceName);
+  }
+
+  @Test
+  public void testMigrate() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql("DROP TABLE IF EXISTS %s_BACKUP_", tableName);
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    Object[] result = sql("CALL %s.system.migrate('%s')", catalogName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMigrateWithOptions() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    Object[] result = sql("CALL %s.system.migrate('%s', map('foo', 'bar'))", catalogName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    Map<String, String> props = validationCatalog.loadTable(tableIdent).properties();
+    Assert.assertEquals("Should have extra property set", "bar", props.get("foo"));
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshot() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql("CALL %s.system.snapshot('%s', '%s')", catalogName, sourceName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));

Review comment:
       Since the source table has a known location, I think this should validate that the snapshot table uses a different one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540523641



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
##########
@@ -37,9 +37,13 @@
 import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
 import org.apache.spark.sql.execution.CacheManager;
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
 import scala.Option;
 
 abstract class BaseProcedure implements Procedure {
+  protected static final DataType STRING_MAP = DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType);

Review comment:
       2 Procedures at the moment :shrug: so not that common, but more than once




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540795788



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestSnapshotTableProcedure extends SparkExtensionsTestBase {
+  private static final String sourceName = "spark_catalog.default.source";
+  // Currently we can only Snapshot only out of the Spark Session Catalog
+
+  public TestSnapshotTableProcedure(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %S", sourceName);
+  }
+
+  @Test
+  public void testSnapshot() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql("CALL %s.system.snapshot('%s', '%s')", catalogName, sourceName, tableName).get(0);
+
+    Assert.assertEquals("Should have added one file", 1L, result[0]);
+
+    Table createdTable = validationCatalog.loadTable(tableIdent);
+    String tableLocation = createdTable.location();
+    Assert.assertNotEquals("Table should not have the original location", location, tableLocation);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshotWithOptions() throws IOException {

Review comment:
       nit: `withProperties`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540860292



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.SnapshotAction;
+import org.apache.iceberg.actions.Spark3SnapshotAction;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+class SnapshotTableProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
+      ProcedureParameter.required("source_table", DataTypes.StringType),
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("table_location", DataTypes.StringType),
+      ProcedureParameter.optional("properties", STRING_MAP)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
+      new StructField("imported_datafiles_count", DataTypes.LongType, false, Metadata.empty())
+  });
+
+  private SnapshotTableProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<SnapshotTableProcedure>() {
+      @Override
+      protected SnapshotTableProcedure doBuild() {
+        return new SnapshotTableProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    String source = args.getString(0);
+    CatalogAndIdentifier sourceIdent = toCatalogAdnIdentifier(source, PARAMETERS[0].name(), tableCatalog());
+
+    String dest = args.getString(1);
+    CatalogAndIdentifier destIdent = toCatalogAdnIdentifier(dest, PARAMETERS[1].name(), tableCatalog());
+
+    String snapshotLocation = args.isNullAt(2) ? null : args.getString(2);
+
+    Map<String, String> options = Maps.newHashMap();
+    if (!args.isNullAt(3)) {
+      args.getMap(3).foreach(DataTypes.StringType, DataTypes.StringType,
+          (k, v) -> {
+            options.put(k.toString(), v.toString());
+            return BoxedUnit.UNIT;
+          });
+    }
+
+    Preconditions.checkArgument(sourceIdent != destIdent || sourceIdent.catalog() != destIdent.catalog(),
+        "Cannot create a snapshot with the same name as the source of the snapshot.");
+    SnapshotAction action =  new Spark3SnapshotAction(spark(), sourceIdent.catalog(), sourceIdent.identifier(),
+        destIdent.catalog(), destIdent.identifier());
+
+    long importedDataFiles;

Review comment:
       I think this will match the other procedures more.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540572699



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateProcedures.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestCreateProcedures extends SparkExtensionsTestBase {

Review comment:
       I like how the other procedures are tested in a suite named after the procedure, like `TestRemoveOrphanFilesProcedure`. I don't see much value in a suite for both migrate and snapshot together and it isn't obvious where these tests live. Could you split this into `TestMigrateProcedure` and `TestSnapshotProcedure`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540571239



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateProcedures.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestCreateProcedures extends SparkExtensionsTestBase {
+  private static final String sourceName = "spark_catalog.default.source";
+  // Currently we can only Snapshot only out of the Spark Session Catalog
+
+  public TestCreateProcedures(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %S", sourceName);
+  }
+
+  @Test
+  public void testMigrate() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql("DROP TABLE IF EXISTS %s_BACKUP_", tableName);
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    Object[] result = sql("CALL %s.system.migrate('%s')", catalogName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMigrateWithOptions() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    Object[] result = sql("CALL %s.system.migrate('%s', map('foo', 'bar'))", catalogName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    Map<String, String> props = validationCatalog.loadTable(tableIdent).properties();
+    Assert.assertEquals("Should have extra property set", "bar", props.get("foo"));
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshot() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql("CALL %s.system.snapshot('%s', '%s')", catalogName, sourceName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshotWithOptions() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql(
+        "CALL %s.system.snapshot( snapshot_source => '%s', table => '%s', table_options => map('foo','bar'))",
+        catalogName, sourceName, tableName).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    Map<String, String> props = validationCatalog.loadTable(tableIdent).properties();
+    Assert.assertEquals("Should have extra property set", "bar", props.get("foo"));
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshotWithAlternateLocation() throws IOException {
+    Assume.assumeTrue("No Snapshoting with Alternate locations with Hadoop Catalogs", !catalogName.contains("hadoop"));
+    String location = temp.newFolder().toString();
+    String snapshotLocation = temp.newFolder().toString();
+    sql("CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql(
+        "CALL %s.system.snapshot( snapshot_source => '%s', table => '%s', table_location => '%s')",
+        catalogName, sourceName, tableName, snapshotLocation).get(0);
+
+    Assert.assertEquals("Should have migrated one file", 1L, result[0]);
+
+    String storageLocation = validationCatalog.loadTable(tableIdent).location();
+    Assert.assertEquals("Snapshot should be made at specified location", snapshotLocation, storageLocation);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testInvalidSnapshotsCases() {
+    AssertHelpers.assertThrows("Should not allow mixed args",
+        AnalysisException.class, "Named and positional arguments cannot be mixed",
+        () -> sql("CALL %s.system.snapshot('n', table => 't')", catalogName));

Review comment:
       Nit: I don't think we need to keep adding these checks since it tests the resolver, not the procedure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1906: Migrate Procedures

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1906:
URL: https://github.com/apache/iceberg/pull/1906#discussion_r540799572



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestSnapshotTableProcedure extends SparkExtensionsTestBase {
+  private static final String sourceName = "spark_catalog.default.source";
+  // Currently we can only Snapshot only out of the Spark Session Catalog
+
+  public TestSnapshotTableProcedure(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %S", sourceName);
+  }
+
+  @Test
+  public void testSnapshot() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql("CALL %s.system.snapshot('%s', '%s')", catalogName, sourceName, tableName).get(0);
+
+    Assert.assertEquals("Should have added one file", 1L, result[0]);
+
+    Table createdTable = validationCatalog.loadTable(tableIdent);
+    String tableLocation = createdTable.location();
+    Assert.assertNotEquals("Table should not have the original location", location, tableLocation);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshotWithOptions() throws IOException {
+    String location = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object result = scalarSql(
+        "CALL %s.system.snapshot(source_table => '%s', table => '%s', properties => map('foo','bar'))",
+        catalogName, sourceName, tableName);
+
+    Assert.assertEquals("Should have added one file", 1L, result);
+
+    Table createdTable = validationCatalog.loadTable(tableIdent);
+
+    String tableLocation = createdTable.location();
+    Assert.assertNotEquals("Table should not have the original location", location, tableLocation);
+
+    Map<String, String> props = createdTable.properties();
+    Assert.assertEquals("Should have extra property set", "bar", props.get("foo"));
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testSnapshotWithAlternateLocation() throws IOException {
+    Assume.assumeTrue("No Snapshoting with Alternate locations with Hadoop Catalogs", !catalogName.contains("hadoop"));
+    String location = temp.newFolder().toString();
+    String snapshotLocation = temp.newFolder().toString();
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", sourceName,
+        location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
+    Object[] result = sql(
+        "CALL %s.system.snapshot(source_table => '%s', table => '%s', table_location => '%s')",
+        catalogName, sourceName, tableName, snapshotLocation).get(0);
+
+    Assert.assertEquals("Should have added one file", 1L, result[0]);
+
+    String storageLocation = validationCatalog.loadTable(tableIdent).location();
+    Assert.assertEquals("Snapshot should be made at specified location", snapshotLocation, storageLocation);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testInvalidSnapshotsCases() {
+    AssertHelpers.assertThrows("Should reject calls without all required args",
+        AnalysisException.class, "Missing required parameters",
+        () -> sql("CALL %s.system.snapshot('foo')", catalogName));
+
+    AssertHelpers.assertThrows("Should reject calls with invalid arg types",
+        AnalysisException.class, "Wrong arg type",
+        () -> sql("CALL %s.system.snapshot('n', 't', map('foo', 'bar'))", catalogName));
+
+    AssertHelpers.assertThrows("Should reject calls with empty table identifier",
+        IllegalArgumentException.class, "Cannot handle an empty identifier",
+        () -> sql("CALL %s.system.snapshot('', 'dest')", catalogName));
+
+    AssertHelpers.assertThrows("Should reject calls with empty table identifier",

Review comment:
       If I understand correctly, this should be possible now?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org