You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/09/09 22:27:33 UTC

[GitHub] [iceberg] aokolnychyi opened a new pull request, #5740: Spark 3.3: Add SparkChangelogTable

aokolnychyi opened a new pull request, #5740:
URL: https://github.com/apache/iceberg/pull/5740

   This PR adds `SparkChangelogTable` for querying changelogs in Spark.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r980547154


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java:
##########
@@ -579,23 +565,68 @@ private static void checkNotPathIdentifier(Identifier identifier, String method)
     }
   }
 
-  private Pair<Table, Long> load(Identifier ident) {
+  private Table load(Identifier ident, String version) {
+    Table table = load(ident);
+
+    if (table instanceof SparkTable) {
+      SparkTable sparkTable = (SparkTable) table;
+
+      Preconditions.checkArgument(
+          sparkTable.snapshotId() == null,
+          "Cannot do time-travel based on both table identifier and AS OF");
+
+      return sparkTable.copyWithSnapshotId(Long.parseLong(version));
+
+    } else if (table instanceof SparkChangelogTable) {
+      throw new UnsupportedOperationException("AS OF is not supported for changelogs");

Review Comment:
   Spark supports both timestamp and version based syntax.
   
   ```
   temporalClause
       : FOR? (SYSTEM_VERSION | VERSION) AS OF version=(INTEGER_VALUE | STRING)
       | FOR? (SYSTEM_TIME | TIMESTAMP) AS OF timestamp=valueExpression
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r989510163


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -75,6 +75,21 @@ private MetadataColumns() {}
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final NestedField CHANGELOG_OPERATION =
+      NestedField.required(
+          Integer.MAX_VALUE - 104,
+          "changelog_operation",

Review Comment:
   Renamed to `_change_type`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r980549230


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogBatch.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+
+public class SparkChangelogBatch implements Batch {
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final Schema expectedSchema;
+  private final boolean caseSensitive;
+  private final boolean localityEnabled;
+  private final SparkChangelogScan scan;
+
+  SparkChangelogBatch(
+      SparkSession spark,
+      Table table,
+      SparkReadConf readConf,
+      Schema expectedSchema,
+      SparkChangelogScan scan) {
+    this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
+    this.table = table;
+    this.expectedSchema = expectedSchema;
+    this.caseSensitive = readConf.caseSensitive();
+    this.localityEnabled = readConf.localityEnabled();
+    this.scan = scan;
+  }
+
+  @Override
+  public InputPartition[] planInputPartitions() {
+    Table serializableTable = SerializableTableWithSize.copyOf(table);
+    Broadcast<Table> tableBroadcast = sparkContext.broadcast(serializableTable);
+    String expectedSchemaString = SchemaParser.toJson(expectedSchema);
+    List<ScanTaskGroup<ChangelogScanTask>> taskGroups = scan.taskGroups();
+
+    InputPartition[] partitions = new InputPartition[taskGroups.size()];
+
+    Tasks.range(partitions.length)
+        .stopOnFailure()
+        .executeWith(localityEnabled ? ThreadPools.getWorkerPool() : null)
+        .run(
+            index ->
+                partitions[index] =
+                    new SparkInputPartition(
+                        taskGroups.get(index),
+                        tableBroadcast,
+                        expectedSchemaString,
+                        caseSensitive,
+                        localityEnabled));
+
+    return partitions;
+  }
+
+  @Override
+  public PartitionReaderFactory createReaderFactory() {
+    return new ReaderFactory();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    SparkChangelogBatch that = (SparkChangelogBatch) o;
+    return scan.equals(that.scan);
+  }
+
+  @Override
+  public int hashCode() {
+    return scan.hashCode();
+  }
+
+  private static class ReaderFactory implements PartitionReaderFactory {
+    @Override
+    public PartitionReader<InternalRow> createReader(InputPartition partition) {
+      if (partition instanceof SparkInputPartition) {
+        return new RowReader((SparkInputPartition) partition);
+      } else {
+        throw new UnsupportedOperationException("Incorrect input partition type: " + partition);

Review Comment:
   Agreed. I copied it from an existing place but will change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r980539081


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogTable.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Set;
+import org.apache.iceberg.ChangelogUtil;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class SparkChangelogTable implements Table, SupportsRead {
+
+  public static final String TABLE_NAME = "changelog";

Review Comment:
   Sounds good. I'll switch to `changes` then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] bryanck commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
bryanck commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r967657781


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogBatch.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+
+public class SparkChangelogBatch implements Batch {
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final Schema expectedSchema;
+  private final boolean caseSensitive;
+  private final boolean localityEnabled;
+  private final SparkChangelogScan scan;
+
+  SparkChangelogBatch(
+      SparkSession spark,
+      Table table,
+      SparkReadConf readConf,
+      Schema expectedSchema,
+      SparkChangelogScan scan) {
+    this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
+    this.table = table;
+    this.expectedSchema = expectedSchema;
+    this.caseSensitive = readConf.caseSensitive();
+    this.localityEnabled = readConf.localityEnabled();
+    this.scan = scan;
+  }
+
+  @Override
+  public InputPartition[] planInputPartitions() {
+    Table serializableTable = SerializableTableWithSize.copyOf(table);
+    Broadcast<Table> tableBroadcast = sparkContext.broadcast(serializableTable);
+    String expectedSchemaString = SchemaParser.toJson(expectedSchema);
+    List<ScanTaskGroup<ChangelogScanTask>> taskGroups = scan.taskGroups();
+
+    InputPartition[] partitions = new InputPartition[taskGroups.size()];
+
+    Tasks.range(partitions.length)
+        .stopOnFailure()
+        .executeWith(localityEnabled ? ThreadPools.getWorkerPool() : null)
+        .run(
+            index ->
+                partitions[index] =
+                    new SparkInputPartition(
+                        taskGroups.get(index),
+                        tableBroadcast,
+                        expectedSchemaString,
+                        caseSensitive,
+                        localityEnabled));
+
+    return partitions;
+  }
+
+  @Override
+  public PartitionReaderFactory createReaderFactory() {
+    return new ReaderFactory();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    SparkChangelogBatch that = (SparkChangelogBatch) o;
+    return scan.equals(that.scan);

Review Comment:
   Yes, that was what I found, the equals call returned false and the filters weren't pushed down. I had a workaround for that, but IIRC I ran into some other issues. Unfortunately I didn't delve deeper at that point and I went with reverting the change. It could be that implementing equals resolves the issue. I could run a benchmark test to confirm if interested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r980548720


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java:
##########
@@ -633,13 +672,21 @@ private Pair<String, List<String>> parseLocationString(String location) {
     }
   }
 
-  private Pair<Table, Long> loadFromPathIdentifier(PathIdentifier ident) {
+  @SuppressWarnings("CyclomaticComplexity")
+  private Table loadFromPathIdentifier(PathIdentifier ident) {
     Pair<String, List<String>> parsed = parseLocationString(ident.location());
 
     String metadataTableName = null;
     Long asOfTimestamp = null;
     Long snapshotId = null;
+    boolean isChangelog = false;
+
     for (String meta : parsed.second()) {
+      if (meta.equalsIgnoreCase(SparkChangelogTable.TABLE_NAME)) {

Review Comment:
   This is for path-based tables, which have a bit weird identifiers like `location#meta1,meta2,meta3` so I am not sure whether changelog must be last. Let me think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r989509756


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -75,6 +75,21 @@ private MetadataColumns() {}
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final NestedField CHANGELOG_OPERATION =

Review Comment:
   Kept as `MetadataColumns` for now. Resolving as no change.



##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogBatchReads.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Row;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameters;
+
+public class TestChangelogBatchReads extends SparkExtensionsTestBase {
+
+  @Parameters(name = "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {
+        1,
+        SparkCatalogConfig.SPARK.catalogName(),
+        SparkCatalogConfig.SPARK.implementation(),
+        SparkCatalogConfig.SPARK.properties()
+      },
+      {
+        2,
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        SparkCatalogConfig.HIVE.properties()
+      }
+    };
+  }
+
+  private final int formatVersion;
+
+  public TestChangelogBatchReads(
+      int formatVersion, String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+    this.formatVersion = formatVersion;
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testDataFilters() {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, formatVersion);

Review Comment:
   Changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r989511316


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java:
##########
@@ -662,15 +709,22 @@ private Pair<Table, Long> loadFromPathIdentifier(PathIdentifier ident) {
         "Cannot specify both snapshot-id and as-of-timestamp: %s",
         ident.location());
 
-    Table table =
+    Preconditions.checkArgument(
+        !isChangelog || (snapshotId == null && asOfTimestamp == null),
+        "Cannot specify snapshot-id and as-of-timestamp for changelogs");
+
+    org.apache.iceberg.Table table =
         tables.load(parsed.first() + (metadataTableName != null ? "#" + metadataTableName : ""));
 
-    if (snapshotId != null) {
-      return Pair.of(table, snapshotId);
+    if (isChangelog) {
+      return new SparkChangelogTable(table, !cacheEnabled);
+    } else if (snapshotId != null) {
+      return new SparkTable(table, snapshotId, !cacheEnabled);
     } else if (asOfTimestamp != null) {
-      return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp));
+      return new SparkTable(
+          table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp), !cacheEnabled);
     } else {
-      return Pair.of(table, null);
+      return new SparkTable(table, null, !cacheEnabled);

Review Comment:
   I tried but it seemed like an overkill as it is just a single place where it makes sense. However, I did refactor this part a bit so it should be slightly better now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r967707279


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogBatchReads.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Row;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameters;
+
+public class TestChangelogBatchReads extends SparkExtensionsTestBase {
+
+  @Parameters(name = "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {
+        1,
+        SparkCatalogConfig.SPARK.catalogName(),
+        SparkCatalogConfig.SPARK.implementation(),
+        SparkCatalogConfig.SPARK.properties()
+      },
+      {
+        2,
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        SparkCatalogConfig.HIVE.properties()
+      }
+    };
+  }
+
+  private final int formatVersion;
+
+  public TestChangelogBatchReads(
+      int formatVersion, String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+    this.formatVersion = formatVersion;
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testDataFilters() {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, formatVersion);
+    sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    sql("INSERT INTO %s VALUES (3, 'c')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap3 = table.currentSnapshot();
+
+    sql("DELETE FROM %s WHERE id = 3", tableName);
+
+    table.refresh();
+
+    Snapshot snap4 = table.currentSnapshot();
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(
+            row(3, "c", "INSERT", 2, snap3.snapshotId()),
+            row(3, "c", "DELETE", 3, snap4.snapshotId())),
+        sql("SELECT * FROM %s.changelog WHERE id = 3 ORDER BY change_ordinal, id", tableName));
+  }
+
+  @Test
+  public void testOverwrites() {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, formatVersion);
+    sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap1 = table.currentSnapshot();

Review Comment:
   nit: snapshot number is inconsistent with the previous test method. if following the previous method, this should be `snap2`. another option is to name it like `snapshotInsertB` and `snapshotOverwriteB`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r967715826


##########
core/src/main/java/org/apache/iceberg/hadoop/Util.java:
##########
@@ -69,19 +72,28 @@ public static String[] blockLocations(CombinedScanTask task, Configuration conf)
     return locationSets.toArray(new String[0]);
   }
 
-  public static String[] blockLocations(FileIO io, CombinedScanTask task) {
+  public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) {

Review Comment:
   Correct. Only `iceberg-api` has the API / ABI compatibility guarantees.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r967706858


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogBatchReads.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Row;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameters;
+
+public class TestChangelogBatchReads extends SparkExtensionsTestBase {
+
+  @Parameters(name = "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {
+        1,
+        SparkCatalogConfig.SPARK.catalogName(),
+        SparkCatalogConfig.SPARK.implementation(),
+        SparkCatalogConfig.SPARK.properties()
+      },
+      {
+        2,
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        SparkCatalogConfig.HIVE.properties()
+      }
+    };
+  }
+
+  private final int formatVersion;
+
+  public TestChangelogBatchReads(
+      int formatVersion, String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+    this.formatVersion = formatVersion;
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testDataFilters() {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, formatVersion);

Review Comment:
   nit: maybe set format version and partition column in the CREATE statement in one shot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r967526904


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogBatch.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+
+public class SparkChangelogBatch implements Batch {
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final Schema expectedSchema;
+  private final boolean caseSensitive;
+  private final boolean localityEnabled;
+  private final SparkChangelogScan scan;
+
+  SparkChangelogBatch(
+      SparkSession spark,
+      Table table,
+      SparkReadConf readConf,
+      Schema expectedSchema,
+      SparkChangelogScan scan) {
+    this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
+    this.table = table;
+    this.expectedSchema = expectedSchema;
+    this.caseSensitive = readConf.caseSensitive();
+    this.localityEnabled = readConf.localityEnabled();
+    this.scan = scan;
+  }
+
+  @Override
+  public InputPartition[] planInputPartitions() {
+    Table serializableTable = SerializableTableWithSize.copyOf(table);
+    Broadcast<Table> tableBroadcast = sparkContext.broadcast(serializableTable);
+    String expectedSchemaString = SchemaParser.toJson(expectedSchema);
+    List<ScanTaskGroup<ChangelogScanTask>> taskGroups = scan.taskGroups();
+
+    InputPartition[] partitions = new InputPartition[taskGroups.size()];
+
+    Tasks.range(partitions.length)
+        .stopOnFailure()
+        .executeWith(localityEnabled ? ThreadPools.getWorkerPool() : null)
+        .run(
+            index ->
+                partitions[index] =
+                    new SparkInputPartition(
+                        taskGroups.get(index),
+                        tableBroadcast,
+                        expectedSchemaString,
+                        caseSensitive,
+                        localityEnabled));
+
+    return partitions;
+  }
+
+  @Override
+  public PartitionReaderFactory createReaderFactory() {
+    return new ReaderFactory();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    SparkChangelogBatch that = (SparkChangelogBatch) o;
+    return scan.equals(that.scan);

Review Comment:
   I don't think it is very clean to implement both `Scan` and `Batch` in one class. I understand we had a performance regression but I think it was because our `Batch` implementation did not implement `equals` and `hashCode`.
   
   Here is the code in Spark `BatchScanExec`.
   
   ```
   override def equals(other: Any): Boolean = other match {
     case other: BatchScanExec =>
       this.batch == other.batch && this.runtimeFilters == other.runtimeFilters
     case _ =>
       false
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#issuecomment-1271856577

   Thanks for reviewing, @stevenzwu @flyrain @kbendick!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r980541298


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogBatchReads.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Row;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameters;
+
+public class TestChangelogBatchReads extends SparkExtensionsTestBase {
+
+  @Parameters(name = "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {
+        1,
+        SparkCatalogConfig.SPARK.catalogName(),
+        SparkCatalogConfig.SPARK.implementation(),
+        SparkCatalogConfig.SPARK.properties()
+      },
+      {
+        2,
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        SparkCatalogConfig.HIVE.properties()
+      }
+    };
+  }
+
+  private final int formatVersion;
+
+  public TestChangelogBatchReads(
+      int formatVersion, String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+    this.formatVersion = formatVersion;
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testDataFilters() {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, formatVersion);
+    sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    sql("INSERT INTO %s VALUES (3, 'c')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap3 = table.currentSnapshot();
+
+    sql("DELETE FROM %s WHERE id = 3", tableName);
+
+    table.refresh();
+
+    Snapshot snap4 = table.currentSnapshot();
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(
+            row(3, "c", "INSERT", 2, snap3.snapshotId()),
+            row(3, "c", "DELETE", 3, snap4.snapshotId())),
+        sql("SELECT * FROM %s.changelog WHERE id = 3 ORDER BY change_ordinal, id", tableName));
+  }
+
+  @Test
+  public void testOverwrites() {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, formatVersion);
+    sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+
+    table.refresh();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(2, "b", "DELETE", 0, snap2.snapshotId()),
+            row(-2, "b", "INSERT", 0, snap2.snapshotId())),
+        changelogRecords(snap1, snap2));
+  }
+
+  @Test
+  public void testMetadataDeletes() {

Review Comment:
   Yep, @kbendick is spot on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r980536380


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -75,6 +75,21 @@ private MetadataColumns() {}
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final NestedField CHANGELOG_OPERATION =

Review Comment:
   At this point, `MetadataColumns` references both metadata columns with IDs from `Integer.MAX_VALUE - (1-100)` and reserved columns with IDs `Integer.MAX_VALUE - (101-200)`. To me, changelog-related columns are more like reserved columns as it is not something we can request on demand. That's why I did not add underscore in their names.



##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -75,6 +75,21 @@ private MetadataColumns() {}
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final NestedField CHANGELOG_OPERATION =

Review Comment:
   At this point, `MetadataColumns` references both metadata columns with IDs from `Integer.MAX_VALUE - (1-100)` and reserved columns with IDs `Integer.MAX_VALUE - (101-200)`.
   
   To me, changelog-related columns are more like reserved columns as it is not something we can request on demand. That's why I did not add underscore in their names.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r980536380


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -75,6 +75,21 @@ private MetadataColumns() {}
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final NestedField CHANGELOG_OPERATION =

Review Comment:
   At this point, `MetadataColumns` references both metadata columns with IDs from `Integer.MAX_VALUE - (1-100)` and reserved columns with IDs `Integer.MAX_VALUE - (101-200)`.
   
   To me, changelog-related columns are more like reserved columns as it is not something we can request on demand. That's why I did not add underscore in their names. I am still debating this, though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r980549230


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogBatch.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+
+public class SparkChangelogBatch implements Batch {
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final Schema expectedSchema;
+  private final boolean caseSensitive;
+  private final boolean localityEnabled;
+  private final SparkChangelogScan scan;
+
+  SparkChangelogBatch(
+      SparkSession spark,
+      Table table,
+      SparkReadConf readConf,
+      Schema expectedSchema,
+      SparkChangelogScan scan) {
+    this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
+    this.table = table;
+    this.expectedSchema = expectedSchema;
+    this.caseSensitive = readConf.caseSensitive();
+    this.localityEnabled = readConf.localityEnabled();
+    this.scan = scan;
+  }
+
+  @Override
+  public InputPartition[] planInputPartitions() {
+    Table serializableTable = SerializableTableWithSize.copyOf(table);
+    Broadcast<Table> tableBroadcast = sparkContext.broadcast(serializableTable);
+    String expectedSchemaString = SchemaParser.toJson(expectedSchema);
+    List<ScanTaskGroup<ChangelogScanTask>> taskGroups = scan.taskGroups();
+
+    InputPartition[] partitions = new InputPartition[taskGroups.size()];
+
+    Tasks.range(partitions.length)
+        .stopOnFailure()
+        .executeWith(localityEnabled ? ThreadPools.getWorkerPool() : null)
+        .run(
+            index ->
+                partitions[index] =
+                    new SparkInputPartition(
+                        taskGroups.get(index),
+                        tableBroadcast,
+                        expectedSchemaString,
+                        caseSensitive,
+                        localityEnabled));
+
+    return partitions;
+  }
+
+  @Override
+  public PartitionReaderFactory createReaderFactory() {
+    return new ReaderFactory();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    SparkChangelogBatch that = (SparkChangelogBatch) o;
+    return scan.equals(that.scan);
+  }
+
+  @Override
+  public int hashCode() {
+    return scan.hashCode();
+  }
+
+  private static class ReaderFactory implements PartitionReaderFactory {
+    @Override
+    public PartitionReader<InternalRow> createReader(InputPartition partition) {
+      if (partition instanceof SparkInputPartition) {
+        return new RowReader((SparkInputPartition) partition);
+      } else {
+        throw new UnsupportedOperationException("Incorrect input partition type: " + partition);

Review Comment:
   Agreed. I copied it from the existing place.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r980548928


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java:
##########
@@ -662,15 +709,22 @@ private Pair<Table, Long> loadFromPathIdentifier(PathIdentifier ident) {
         "Cannot specify both snapshot-id and as-of-timestamp: %s",
         ident.location());
 
-    Table table =
+    Preconditions.checkArgument(
+        !isChangelog || (snapshotId == null && asOfTimestamp == null),
+        "Cannot specify snapshot-id and as-of-timestamp for changelogs");
+
+    org.apache.iceberg.Table table =
         tables.load(parsed.first() + (metadataTableName != null ? "#" + metadataTableName : ""));
 
-    if (snapshotId != null) {
-      return Pair.of(table, snapshotId);
+    if (isChangelog) {
+      return new SparkChangelogTable(table, !cacheEnabled);
+    } else if (snapshotId != null) {
+      return new SparkTable(table, snapshotId, !cacheEnabled);
     } else if (asOfTimestamp != null) {
-      return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp));
+      return new SparkTable(
+          table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp), !cacheEnabled);
     } else {
-      return Pair.of(table, null);
+      return new SparkTable(table, null, !cacheEnabled);

Review Comment:
   I'll try it out if we decide to make changes in `SparkCatalog`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r967524811


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java:
##########
@@ -579,23 +565,68 @@ private static void checkNotPathIdentifier(Identifier identifier, String method)
     }
   }
 
-  private Pair<Table, Long> load(Identifier ident) {
+  private Table load(Identifier ident, String version) {

Review Comment:
   I am using existing options for configuring boundaries. This means we cannot use SQL right now. Only the DF API. Hopefully, we will have support for options in Spark 3.4.
   
   An alternative option is to add a stored procedure to generate a changelog and register it as a view. We will need the procedure in any case to generate pre and pos images. I am reluctant to use table identifiers as it makes the logic tricky.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r989509955


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogBatchReads.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Row;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameters;
+
+public class TestChangelogBatchReads extends SparkExtensionsTestBase {
+
+  @Parameters(name = "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {
+        1,
+        SparkCatalogConfig.SPARK.catalogName(),
+        SparkCatalogConfig.SPARK.implementation(),
+        SparkCatalogConfig.SPARK.properties()
+      },
+      {
+        2,
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        SparkCatalogConfig.HIVE.properties()
+      }
+    };
+  }
+
+  private final int formatVersion;
+
+  public TestChangelogBatchReads(
+      int formatVersion, String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+    this.formatVersion = formatVersion;
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testDataFilters() {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, formatVersion);
+    sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    sql("INSERT INTO %s VALUES (3, 'c')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap3 = table.currentSnapshot();
+
+    sql("DELETE FROM %s WHERE id = 3", tableName);
+
+    table.refresh();
+
+    Snapshot snap4 = table.currentSnapshot();
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(
+            row(3, "c", "INSERT", 2, snap3.snapshotId()),
+            row(3, "c", "DELETE", 3, snap4.snapshotId())),
+        sql("SELECT * FROM %s.changelog WHERE id = 3 ORDER BY change_ordinal, id", tableName));
+  }
+
+  @Test
+  public void testOverwrites() {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, formatVersion);
+    sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap1 = table.currentSnapshot();

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r980542887


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogBatchReads.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Row;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameters;
+
+public class TestChangelogBatchReads extends SparkExtensionsTestBase {
+
+  @Parameters(name = "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {
+        1,
+        SparkCatalogConfig.SPARK.catalogName(),
+        SparkCatalogConfig.SPARK.implementation(),
+        SparkCatalogConfig.SPARK.properties()
+      },
+      {
+        2,
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        SparkCatalogConfig.HIVE.properties()
+      }
+    };
+  }
+
+  private final int formatVersion;
+
+  public TestChangelogBatchReads(
+      int formatVersion, String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+    this.formatVersion = formatVersion;
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testDataFilters() {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, formatVersion);
+    sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    sql("INSERT INTO %s VALUES (3, 'c')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap3 = table.currentSnapshot();
+
+    sql("DELETE FROM %s WHERE id = 3", tableName);
+
+    table.refresh();
+
+    Snapshot snap4 = table.currentSnapshot();
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(
+            row(3, "c", "INSERT", 2, snap3.snapshotId()),
+            row(3, "c", "DELETE", 3, snap4.snapshotId())),
+        sql("SELECT * FROM %s.changelog WHERE id = 3 ORDER BY change_ordinal, id", tableName));
+  }
+
+  @Test
+  public void testOverwrites() {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, formatVersion);
+    sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap1 = table.currentSnapshot();

Review Comment:
   Oops, I'll fix that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r980540747


##########
core/src/main/java/org/apache/iceberg/hadoop/Util.java:
##########
@@ -69,19 +72,28 @@ public static String[] blockLocations(CombinedScanTask task, Configuration conf)
     return locationSets.toArray(new String[0]);
   }
 
-  public static String[] blockLocations(FileIO io, CombinedScanTask task) {
+  public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) {

Review Comment:
   Yep, but this is compatible as `CombinedScanTask`  implements `ScanTaskGroup`. Existing user code should continue to work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r980537486


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -75,6 +75,21 @@ private MetadataColumns() {}
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final NestedField CHANGELOG_OPERATION =
+      NestedField.required(
+          Integer.MAX_VALUE - 104,
+          "changelog_operation",

Review Comment:
   I was matching the existing `ChangelogOperation` enum but `_change_type` or `change_type` sounds good to me too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r980540747


##########
core/src/main/java/org/apache/iceberg/hadoop/Util.java:
##########
@@ -69,19 +72,28 @@ public static String[] blockLocations(CombinedScanTask task, Configuration conf)
     return locationSets.toArray(new String[0]);
   }
 
-  public static String[] blockLocations(FileIO io, CombinedScanTask task) {
+  public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) {

Review Comment:
   Yep but this is compatible as `CombinedScanTask`  implements `ScanTaskGroup`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r968764084


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogTable.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Set;
+import org.apache.iceberg.ChangelogUtil;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class SparkChangelogTable implements Table, SupportsRead {
+
+  public static final String TABLE_NAME = "changelog";

Review Comment:
   I favor `changes`, `changelog` sounds a bit too techy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r989554986


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.IncrementalChangelogScan;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.Statistics;
+import org.apache.spark.sql.connector.read.SupportsReportStatistics;
+import org.apache.spark.sql.types.StructType;
+
+class SparkChangelogScan implements Scan, SupportsReportStatistics {
+
+  private final SparkSession spark;
+  private final Table table;
+  private final IncrementalChangelogScan scan;
+  private final SparkReadConf readConf;
+  private final Schema expectedSchema;
+  private final List<Expression> filters;
+  private final Long startSnapshotId;
+  private final Long endSnapshotId;
+  private final boolean readTimestampWithoutZone;
+
+  // lazy variables
+  private List<ScanTaskGroup<ChangelogScanTask>> taskGroups;
+  private StructType readSchema;
+
+  SparkChangelogScan(
+      SparkSession spark,
+      Table table,
+      IncrementalChangelogScan scan,
+      SparkReadConf readConf,
+      Schema expectedSchema,
+      List<Expression> filters) {
+
+    SparkSchemaUtil.validateMetadataColumnReferences(table.schema(), expectedSchema);
+
+    this.spark = spark;
+    this.table = table;
+    this.scan = scan;
+    this.readConf = readConf;
+    this.expectedSchema = expectedSchema;
+    this.filters = filters != null ? filters : Collections.emptyList();
+    this.startSnapshotId = readConf.startSnapshotId();
+    this.endSnapshotId = readConf.endSnapshotId();
+    this.readTimestampWithoutZone = readConf.handleTimestampWithoutZone();
+  }
+
+  @Override
+  public Statistics estimateStatistics() {
+    long rowsCount = taskGroups().stream().mapToLong(ScanTaskGroup::estimatedRowsCount).sum();
+    long sizeInBytes = SparkSchemaUtil.estimateSize(readSchema(), rowsCount);
+    return new Stats(sizeInBytes, rowsCount);
+  }

Review Comment:
   I double checked and we have the same logic in our regular scans. I think it will be fairly cheap to call this method multiple times because `taskGroups()` caches the result and we will simply iterate over it in memory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r989552917


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java:
##########
@@ -605,19 +636,27 @@ private Pair<Table, Long> load(Identifier ident) {
       }
 
       // loading the namespace as a table worked, check the name to see if it is a valid selector
+      // or if the name points to the changelog
+

Review Comment:
   I've actually added it on purpose because the comment applies to the whole block below, not just the line under it.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java:
##########
@@ -605,19 +636,27 @@ private Pair<Table, Long> load(Identifier ident) {
       }
 
       // loading the namespace as a table worked, check the name to see if it is a valid selector
+      // or if the name points to the changelog
+

Review Comment:
   I actually added it on purpose as the comment applies to the whole block below, not just the line under it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r989584315


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java:
##########
@@ -605,19 +620,27 @@ private Pair<Table, Long> load(Identifier ident) {
       }
 
       // loading the namespace as a table worked, check the name to see if it is a valid selector
+      // or if the name points to the changelog
+
+      if (ident.name().equalsIgnoreCase(SparkChangelogTable.TABLE_NAME)) {
+        return new SparkChangelogTable(table, !cacheEnabled);
+      }
+
       Matcher at = AT_TIMESTAMP.matcher(ident.name());
       if (at.matches()) {
         long asOfTimestamp = Long.parseLong(at.group(1));
-        return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp));
+        long snapshotId = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp);
+        return new SparkTable(table, snapshotId, !cacheEnabled);
       }
 
       Matcher id = SNAPSHOT_ID.matcher(ident.name());
       if (id.matches()) {
         long snapshotId = Long.parseLong(id.group(1));
-        return Pair.of(table, snapshotId);
+        return new SparkTable(table, snapshotId, !cacheEnabled);
       }

Review Comment:
   Not a blocker. It'd be more readable if we wrap the code within the catch clause. like this:
   ```
    try {
         org.apache.iceberg.Table table = icebergCatalog.loadTable(buildIdentifier(ident));
         return new SparkTable(table, !cacheEnabled);
       } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
           Table table = loadAlternativeTable(ident, e);
           if (table != null) {
             return table;
           } else {
             throw e;
           }
      }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r989584315


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java:
##########
@@ -605,19 +620,27 @@ private Pair<Table, Long> load(Identifier ident) {
       }
 
       // loading the namespace as a table worked, check the name to see if it is a valid selector
+      // or if the name points to the changelog
+
+      if (ident.name().equalsIgnoreCase(SparkChangelogTable.TABLE_NAME)) {
+        return new SparkChangelogTable(table, !cacheEnabled);
+      }
+
       Matcher at = AT_TIMESTAMP.matcher(ident.name());
       if (at.matches()) {
         long asOfTimestamp = Long.parseLong(at.group(1));
-        return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp));
+        long snapshotId = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp);
+        return new SparkTable(table, snapshotId, !cacheEnabled);
       }
 
       Matcher id = SNAPSHOT_ID.matcher(ident.name());
       if (id.matches()) {
         long snapshotId = Long.parseLong(id.group(1));
-        return Pair.of(table, snapshotId);
+        return new SparkTable(table, snapshotId, !cacheEnabled);
       }

Review Comment:
   Not a blocker. It'd be more readable if we wrap the code within the catch clause. like this:
   ```
    try {
         org.apache.iceberg.Table table = icebergCatalog.loadTable(buildIdentifier(ident));
         return new SparkTable(table, !cacheEnabled);
       } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
           Table table = loadAlternativeTable(ident);
           if (table != null) {
             return table;
           } else {
             throw e;
           }
      }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r967520687


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogTable.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Set;
+import org.apache.iceberg.ChangelogUtil;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class SparkChangelogTable implements Table, SupportsRead {
+
+  public static final String TABLE_NAME = "changelog";

Review Comment:
   Could be called `changes`. I followed the naming we used in other places so far.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r967527179


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogBatch.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+
+public class SparkChangelogBatch implements Batch {
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final Schema expectedSchema;
+  private final boolean caseSensitive;
+  private final boolean localityEnabled;
+  private final SparkChangelogScan scan;
+
+  SparkChangelogBatch(
+      SparkSession spark,
+      Table table,
+      SparkReadConf readConf,
+      Schema expectedSchema,
+      SparkChangelogScan scan) {
+    this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
+    this.table = table;
+    this.expectedSchema = expectedSchema;
+    this.caseSensitive = readConf.caseSensitive();
+    this.localityEnabled = readConf.localityEnabled();
+    this.scan = scan;
+  }
+
+  @Override
+  public InputPartition[] planInputPartitions() {
+    Table serializableTable = SerializableTableWithSize.copyOf(table);
+    Broadcast<Table> tableBroadcast = sparkContext.broadcast(serializableTable);
+    String expectedSchemaString = SchemaParser.toJson(expectedSchema);
+    List<ScanTaskGroup<ChangelogScanTask>> taskGroups = scan.taskGroups();
+
+    InputPartition[] partitions = new InputPartition[taskGroups.size()];
+
+    Tasks.range(partitions.length)
+        .stopOnFailure()
+        .executeWith(localityEnabled ? ThreadPools.getWorkerPool() : null)
+        .run(
+            index ->
+                partitions[index] =
+                    new SparkInputPartition(
+                        taskGroups.get(index),
+                        tableBroadcast,
+                        expectedSchemaString,
+                        caseSensitive,
+                        localityEnabled));
+
+    return partitions;
+  }
+
+  @Override
+  public PartitionReaderFactory createReaderFactory() {
+    return new ReaderFactory();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    SparkChangelogBatch that = (SparkChangelogBatch) o;
+    return scan.equals(that.scan);

Review Comment:
   @bryanck, do you remember the details on that issue? Do you think my assumption is reasonable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r968764084


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogTable.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Set;
+import org.apache.iceberg.ChangelogUtil;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class SparkChangelogTable implements Table, SupportsRead {
+
+  public static final String TABLE_NAME = "changelog";

Review Comment:
   I favor `changes`. Two names have exactly same meaning, but  `changelog` sounds a bit too techy as a user interface. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r968965129


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java:
##########
@@ -605,19 +636,27 @@ private Pair<Table, Long> load(Identifier ident) {
       }
 
       // loading the namespace as a table worked, check the name to see if it is a valid selector
+      // or if the name points to the changelog
+

Review Comment:
   Can we remove the empty line?



##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -75,6 +75,21 @@ private MetadataColumns() {}
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final NestedField CHANGELOG_OPERATION =
+      NestedField.required(
+          Integer.MAX_VALUE - 104,
+          "changelog_operation",
+          Types.StringType.get(),
+          "Record type in changelog");
+  public static final NestedField CHANGE_ORDINAL =
+      NestedField.optional(
+          Integer.MAX_VALUE - 105, "change_ordinal", Types.IntegerType.get(), "Change ordinal");
+  public static final NestedField COMMIT_SNAPSHOT_ID =
+      NestedField.optional(
+          Integer.MAX_VALUE - 106,
+          "commit_snapshot_id",

Review Comment:
   To `_commit_snapshot_id`?



##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -75,6 +75,21 @@ private MetadataColumns() {}
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final NestedField CHANGELOG_OPERATION =

Review Comment:
   I'm OK to keep them here. I'd consider change log columns as metadata columns as well.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogBatch.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+
+public class SparkChangelogBatch implements Batch {
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final Schema expectedSchema;
+  private final boolean caseSensitive;
+  private final boolean localityEnabled;
+  private final SparkChangelogScan scan;
+
+  SparkChangelogBatch(
+      SparkSession spark,
+      Table table,
+      SparkReadConf readConf,
+      Schema expectedSchema,
+      SparkChangelogScan scan) {
+    this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
+    this.table = table;
+    this.expectedSchema = expectedSchema;
+    this.caseSensitive = readConf.caseSensitive();
+    this.localityEnabled = readConf.localityEnabled();
+    this.scan = scan;
+  }
+
+  @Override
+  public InputPartition[] planInputPartitions() {
+    Table serializableTable = SerializableTableWithSize.copyOf(table);
+    Broadcast<Table> tableBroadcast = sparkContext.broadcast(serializableTable);
+    String expectedSchemaString = SchemaParser.toJson(expectedSchema);
+    List<ScanTaskGroup<ChangelogScanTask>> taskGroups = scan.taskGroups();
+
+    InputPartition[] partitions = new InputPartition[taskGroups.size()];
+
+    Tasks.range(partitions.length)
+        .stopOnFailure()
+        .executeWith(localityEnabled ? ThreadPools.getWorkerPool() : null)
+        .run(
+            index ->
+                partitions[index] =
+                    new SparkInputPartition(
+                        taskGroups.get(index),
+                        tableBroadcast,
+                        expectedSchemaString,
+                        caseSensitive,
+                        localityEnabled));
+
+    return partitions;
+  }
+
+  @Override
+  public PartitionReaderFactory createReaderFactory() {
+    return new ReaderFactory();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    SparkChangelogBatch that = (SparkChangelogBatch) o;
+    return scan.equals(that.scan);
+  }
+
+  @Override
+  public int hashCode() {
+    return scan.hashCode();
+  }
+
+  private static class ReaderFactory implements PartitionReaderFactory {
+    @Override
+    public PartitionReader<InternalRow> createReader(InputPartition partition) {
+      if (partition instanceof SparkInputPartition) {
+        return new RowReader((SparkInputPartition) partition);
+      } else {
+        throw new UnsupportedOperationException("Incorrect input partition type: " + partition);

Review Comment:
   I think we normally use Preconditions like this
   ```
   Preconditions.checkArgument(partition instanceof SparkInputPartition, message);
   ```



##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -75,6 +75,21 @@ private MetadataColumns() {}
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final NestedField CHANGELOG_OPERATION =
+      NestedField.required(
+          Integer.MAX_VALUE - 104,
+          "changelog_operation",

Review Comment:
   To `_change_type` or `_changelog_type`?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.io.Serializable;
+import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+
+class SparkInputPartition implements InputPartition, Serializable {

Review Comment:
   +1 for this refactor.



##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -75,6 +75,21 @@ private MetadataColumns() {}
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final NestedField CHANGELOG_OPERATION =
+      NestedField.required(
+          Integer.MAX_VALUE - 104,
+          "changelog_operation",
+          Types.StringType.get(),
+          "Record type in changelog");
+  public static final NestedField CHANGE_ORDINAL =
+      NestedField.optional(
+          Integer.MAX_VALUE - 105, "change_ordinal", Types.IntegerType.get(), "Change ordinal");

Review Comment:
   To `_change_ordinal`? I'm not a native speaker, but does `ordinal` imply that the value would be `1st, 2nd, 3rd, ...`. We actually use `0, 1, 2, ...`. A name like `_change_seq_num` is more suitable?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java:
##########
@@ -662,15 +709,22 @@ private Pair<Table, Long> loadFromPathIdentifier(PathIdentifier ident) {
         "Cannot specify both snapshot-id and as-of-timestamp: %s",
         ident.location());
 
-    Table table =
+    Preconditions.checkArgument(
+        !isChangelog || (snapshotId == null && asOfTimestamp == null),
+        "Cannot specify snapshot-id and as-of-timestamp for changelogs");
+
+    org.apache.iceberg.Table table =
         tables.load(parsed.first() + (metadataTableName != null ? "#" + metadataTableName : ""));
 
-    if (snapshotId != null) {
-      return Pair.of(table, snapshotId);
+    if (isChangelog) {
+      return new SparkChangelogTable(table, !cacheEnabled);
+    } else if (snapshotId != null) {
+      return new SparkTable(table, snapshotId, !cacheEnabled);
     } else if (asOfTimestamp != null) {
-      return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp));
+      return new SparkTable(
+          table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp), !cacheEnabled);
     } else {
-      return Pair.of(table, null);
+      return new SparkTable(table, null, !cacheEnabled);

Review Comment:
   A refactor suggestion: we may use a builder here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r980583943


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.IncrementalChangelogScan;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.Statistics;
+import org.apache.spark.sql.connector.read.SupportsReportStatistics;
+import org.apache.spark.sql.types.StructType;
+
+class SparkChangelogScan implements Scan, SupportsReportStatistics {
+
+  private final SparkSession spark;
+  private final Table table;
+  private final IncrementalChangelogScan scan;
+  private final SparkReadConf readConf;
+  private final Schema expectedSchema;
+  private final List<Expression> filters;
+  private final Long startSnapshotId;
+  private final Long endSnapshotId;
+  private final boolean readTimestampWithoutZone;
+
+  // lazy variables
+  private List<ScanTaskGroup<ChangelogScanTask>> taskGroups;
+  private StructType readSchema;
+
+  SparkChangelogScan(
+      SparkSession spark,
+      Table table,
+      IncrementalChangelogScan scan,
+      SparkReadConf readConf,
+      Schema expectedSchema,
+      List<Expression> filters) {
+
+    SparkSchemaUtil.validateMetadataColumnReferences(table.schema(), expectedSchema);
+
+    this.spark = spark;
+    this.table = table;
+    this.scan = scan;
+    this.readConf = readConf;
+    this.expectedSchema = expectedSchema;
+    this.filters = filters != null ? filters : Collections.emptyList();
+    this.startSnapshotId = readConf.startSnapshotId();
+    this.endSnapshotId = readConf.endSnapshotId();
+    this.readTimestampWithoutZone = readConf.handleTimestampWithoutZone();
+  }
+
+  @Override
+  public Statistics estimateStatistics() {
+    long rowsCount = taskGroups().stream().mapToLong(ScanTaskGroup::estimatedRowsCount).sum();
+    long sizeInBytes = SparkSchemaUtil.estimateSize(readSchema(), rowsCount);
+    return new Stats(sizeInBytes, rowsCount);
+  }

Review Comment:
   Hm, I haven't heard about that. @bryanck @kbendick, do you have more context?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r980538571


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -75,6 +75,21 @@ private MetadataColumns() {}
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final NestedField CHANGELOG_OPERATION =
+      NestedField.required(
+          Integer.MAX_VALUE - 104,
+          "changelog_operation",
+          Types.StringType.get(),
+          "Record type in changelog");
+  public static final NestedField CHANGE_ORDINAL =
+      NestedField.optional(
+          Integer.MAX_VALUE - 105, "change_ordinal", Types.IntegerType.get(), "Change ordinal");

Review Comment:
   I am following the name in `ChangelogScanTask`.
   Let's discuss whether these columns are metadata columns or reserved columns to see if we need to start with an underscore.



##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -75,6 +75,21 @@ private MetadataColumns() {}
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final NestedField CHANGELOG_OPERATION =
+      NestedField.required(
+          Integer.MAX_VALUE - 104,
+          "changelog_operation",
+          Types.StringType.get(),
+          "Record type in changelog");
+  public static final NestedField CHANGE_ORDINAL =
+      NestedField.optional(
+          Integer.MAX_VALUE - 105, "change_ordinal", Types.IntegerType.get(), "Change ordinal");

Review Comment:
   I am following the name in `ChangelogScanTask`. Let's discuss whether these columns are metadata columns or reserved columns to see if we need to start with an underscore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r967706858


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogBatchReads.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Row;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameters;
+
+public class TestChangelogBatchReads extends SparkExtensionsTestBase {
+
+  @Parameters(name = "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {
+        1,
+        SparkCatalogConfig.SPARK.catalogName(),
+        SparkCatalogConfig.SPARK.implementation(),
+        SparkCatalogConfig.SPARK.properties()
+      },
+      {
+        2,
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        SparkCatalogConfig.HIVE.properties()
+      }
+    };
+  }
+
+  private final int formatVersion;
+
+  public TestChangelogBatchReads(
+      int formatVersion, String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+    this.formatVersion = formatVersion;
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testDataFilters() {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, formatVersion);

Review Comment:
   nit: maybe set format version and partition column in the CREATE statement in one shot. might speed up the tests a little bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r967716214


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogBatchReads.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Row;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameters;
+
+public class TestChangelogBatchReads extends SparkExtensionsTestBase {
+
+  @Parameters(name = "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {
+        1,
+        SparkCatalogConfig.SPARK.catalogName(),
+        SparkCatalogConfig.SPARK.implementation(),
+        SparkCatalogConfig.SPARK.properties()
+      },
+      {
+        2,
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        SparkCatalogConfig.HIVE.properties()
+      }
+    };
+  }
+
+  private final int formatVersion;
+
+  public TestChangelogBatchReads(
+      int formatVersion, String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+    this.formatVersion = formatVersion;
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testDataFilters() {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, formatVersion);
+    sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    sql("INSERT INTO %s VALUES (3, 'c')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap3 = table.currentSnapshot();
+
+    sql("DELETE FROM %s WHERE id = 3", tableName);
+
+    table.refresh();
+
+    Snapshot snap4 = table.currentSnapshot();
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(
+            row(3, "c", "INSERT", 2, snap3.snapshotId()),
+            row(3, "c", "DELETE", 3, snap4.snapshotId())),
+        sql("SELECT * FROM %s.changelog WHERE id = 3 ORDER BY change_ordinal, id", tableName));
+  }
+
+  @Test
+  public void testOverwrites() {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, formatVersion);
+    sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+
+    table.refresh();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(2, "b", "DELETE", 0, snap2.snapshotId()),
+            row(-2, "b", "INSERT", 0, snap2.snapshotId())),
+        changelogRecords(snap1, snap2));
+  }
+
+  @Test
+  public void testMetadataDeletes() {

Review Comment:
   I believe this is because the actual delete operation is issues against an entire partition, and thus the delete uses an optimized / metadata-only operation (the delete doesn't need to read any data files).
   
   Thata's always been my understanding of "metadata deletes". That they are deletes which only require updating metadata, without having to inspect data files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r967968651


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java:
##########
@@ -579,23 +565,68 @@ private static void checkNotPathIdentifier(Identifier identifier, String method)
     }
   }
 
-  private Pair<Table, Long> load(Identifier ident) {
+  private Table load(Identifier ident, String version) {
+    Table table = load(ident);
+
+    if (table instanceof SparkTable) {
+      SparkTable sparkTable = (SparkTable) table;
+
+      Preconditions.checkArgument(
+          sparkTable.snapshotId() == null,
+          "Cannot do time-travel based on both table identifier and AS OF");
+
+      return sparkTable.copyWithSnapshotId(Long.parseLong(version));
+
+    } else if (table instanceof SparkChangelogTable) {
+      throw new UnsupportedOperationException("AS OF is not supported for changelogs");

Review Comment:
   nit: maybe complete `AS OF` as `AsOfTime`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r967525775


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -75,6 +75,21 @@ private MetadataColumns() {}
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final NestedField CHANGELOG_OPERATION =

Review Comment:
   I wonder whether we should add `ReservedColumns` as a separate class. There are a few columns that are not metadata columns as such.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#issuecomment-1242540238

   cc @rdblue @flyrain @stevenzwu


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r980540250


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogBatch.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+
+public class SparkChangelogBatch implements Batch {
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final Schema expectedSchema;
+  private final boolean caseSensitive;
+  private final boolean localityEnabled;
+  private final SparkChangelogScan scan;
+
+  SparkChangelogBatch(
+      SparkSession spark,
+      Table table,
+      SparkReadConf readConf,
+      Schema expectedSchema,
+      SparkChangelogScan scan) {
+    this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
+    this.table = table;
+    this.expectedSchema = expectedSchema;
+    this.caseSensitive = readConf.caseSensitive();
+    this.localityEnabled = readConf.localityEnabled();
+    this.scan = scan;
+  }
+
+  @Override
+  public InputPartition[] planInputPartitions() {
+    Table serializableTable = SerializableTableWithSize.copyOf(table);
+    Broadcast<Table> tableBroadcast = sparkContext.broadcast(serializableTable);
+    String expectedSchemaString = SchemaParser.toJson(expectedSchema);
+    List<ScanTaskGroup<ChangelogScanTask>> taskGroups = scan.taskGroups();
+
+    InputPartition[] partitions = new InputPartition[taskGroups.size()];
+
+    Tasks.range(partitions.length)
+        .stopOnFailure()
+        .executeWith(localityEnabled ? ThreadPools.getWorkerPool() : null)
+        .run(
+            index ->
+                partitions[index] =
+                    new SparkInputPartition(
+                        taskGroups.get(index),
+                        tableBroadcast,
+                        expectedSchemaString,
+                        caseSensitive,
+                        localityEnabled));
+
+    return partitions;
+  }
+
+  @Override
+  public PartitionReaderFactory createReaderFactory() {
+    return new ReaderFactory();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    SparkChangelogBatch that = (SparkChangelogBatch) o;
+    return scan.equals(that.scan);

Review Comment:
   Great, I can submit a separate PR and it would be awesome if you could re-run the benchmark. I'll ping you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r980542575


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogBatchReads.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Row;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameters;
+
+public class TestChangelogBatchReads extends SparkExtensionsTestBase {
+
+  @Parameters(name = "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {
+        1,
+        SparkCatalogConfig.SPARK.catalogName(),
+        SparkCatalogConfig.SPARK.implementation(),
+        SparkCatalogConfig.SPARK.properties()
+      },
+      {
+        2,
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        SparkCatalogConfig.HIVE.properties()
+      }
+    };
+  }
+
+  private final int formatVersion;
+
+  public TestChangelogBatchReads(
+      int formatVersion, String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+    this.formatVersion = formatVersion;
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testDataFilters() {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, formatVersion);

Review Comment:
   Yeah, I can do that. I did it mainly because each call fits on one line but I don't mind switching this to a single create statement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r989510561


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -75,6 +75,21 @@ private MetadataColumns() {}
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final NestedField CHANGELOG_OPERATION =
+      NestedField.required(
+          Integer.MAX_VALUE - 104,
+          "changelog_operation",
+          Types.StringType.get(),
+          "Record type in changelog");
+  public static final NestedField CHANGE_ORDINAL =
+      NestedField.optional(
+          Integer.MAX_VALUE - 105, "change_ordinal", Types.IntegerType.get(), "Change ordinal");

Review Comment:
   I added `_` to the name but still call it as `_change_ordinal` to match what we named in the task API.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogBatch.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+
+public class SparkChangelogBatch implements Batch {
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final Schema expectedSchema;
+  private final boolean caseSensitive;
+  private final boolean localityEnabled;
+  private final SparkChangelogScan scan;
+
+  SparkChangelogBatch(
+      SparkSession spark,
+      Table table,
+      SparkReadConf readConf,
+      Schema expectedSchema,
+      SparkChangelogScan scan) {
+    this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
+    this.table = table;
+    this.expectedSchema = expectedSchema;
+    this.caseSensitive = readConf.caseSensitive();
+    this.localityEnabled = readConf.localityEnabled();
+    this.scan = scan;
+  }
+
+  @Override
+  public InputPartition[] planInputPartitions() {
+    Table serializableTable = SerializableTableWithSize.copyOf(table);
+    Broadcast<Table> tableBroadcast = sparkContext.broadcast(serializableTable);
+    String expectedSchemaString = SchemaParser.toJson(expectedSchema);
+    List<ScanTaskGroup<ChangelogScanTask>> taskGroups = scan.taskGroups();
+
+    InputPartition[] partitions = new InputPartition[taskGroups.size()];
+
+    Tasks.range(partitions.length)
+        .stopOnFailure()
+        .executeWith(localityEnabled ? ThreadPools.getWorkerPool() : null)
+        .run(
+            index ->
+                partitions[index] =
+                    new SparkInputPartition(
+                        taskGroups.get(index),
+                        tableBroadcast,
+                        expectedSchemaString,
+                        caseSensitive,
+                        localityEnabled));
+
+    return partitions;
+  }
+
+  @Override
+  public PartitionReaderFactory createReaderFactory() {
+    return new ReaderFactory();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    SparkChangelogBatch that = (SparkChangelogBatch) o;
+    return scan.equals(that.scan);
+  }
+
+  @Override
+  public int hashCode() {
+    return scan.hashCode();
+  }
+
+  private static class ReaderFactory implements PartitionReaderFactory {
+    @Override
+    public PartitionReader<InternalRow> createReader(InputPartition partition) {
+      if (partition instanceof SparkInputPartition) {
+        return new RowReader((SparkInputPartition) partition);
+      } else {
+        throw new UnsupportedOperationException("Incorrect input partition type: " + partition);

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r989553492


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java:
##########
@@ -633,13 +672,21 @@ private Pair<String, List<String>> parseLocationString(String location) {
     }
   }
 
-  private Pair<Table, Long> loadFromPathIdentifier(PathIdentifier ident) {
+  @SuppressWarnings("CyclomaticComplexity")
+  private Table loadFromPathIdentifier(PathIdentifier ident) {
     Pair<String, List<String>> parsed = parseLocationString(ident.location());
 
     String metadataTableName = null;
     Long asOfTimestamp = null;
     Long snapshotId = null;
+    boolean isChangelog = false;
+
     for (String meta : parsed.second()) {
+      if (meta.equalsIgnoreCase(SparkChangelogTable.TABLE_NAME)) {

Review Comment:
   I double checked this and I think we should follow the existing logic for path-based tables where the order of parts in a selector does not matter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r989509525


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogTable.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Set;
+import org.apache.iceberg.ChangelogUtil;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class SparkChangelogTable implements Table, SupportsRead {
+
+  public static final String TABLE_NAME = "changelog";

Review Comment:
   Renamed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi merged pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi merged PR #5740:
URL: https://github.com/apache/iceberg/pull/5740


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r990213812


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java:
##########
@@ -605,19 +620,27 @@ private Pair<Table, Long> load(Identifier ident) {
       }
 
       // loading the namespace as a table worked, check the name to see if it is a valid selector
+      // or if the name points to the changelog
+
+      if (ident.name().equalsIgnoreCase(SparkChangelogTable.TABLE_NAME)) {
+        return new SparkChangelogTable(table, !cacheEnabled);
+      }
+
       Matcher at = AT_TIMESTAMP.matcher(ident.name());
       if (at.matches()) {
         long asOfTimestamp = Long.parseLong(at.group(1));
-        return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp));
+        long snapshotId = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp);
+        return new SparkTable(table, snapshotId, !cacheEnabled);
       }
 
       Matcher id = SNAPSHOT_ID.matcher(ident.name());
       if (id.matches()) {
         long snapshotId = Long.parseLong(id.group(1));
-        return Pair.of(table, snapshotId);
+        return new SparkTable(table, snapshotId, !cacheEnabled);
       }

Review Comment:
   That's a good idea. Let me do that in a separate PR after this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r980540747


##########
core/src/main/java/org/apache/iceberg/hadoop/Util.java:
##########
@@ -69,19 +72,28 @@ public static String[] blockLocations(CombinedScanTask task, Configuration conf)
     return locationSets.toArray(new String[0]);
   }
 
-  public static String[] blockLocations(FileIO io, CombinedScanTask task) {
+  public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) {

Review Comment:
   Yep but this is compatible as `CombinedScanTask`  implements `ScanTaskGroup`. Existing user code should continue to work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r967707762


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogBatchReads.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Row;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameters;
+
+public class TestChangelogBatchReads extends SparkExtensionsTestBase {
+
+  @Parameters(name = "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {
+        1,
+        SparkCatalogConfig.SPARK.catalogName(),
+        SparkCatalogConfig.SPARK.implementation(),
+        SparkCatalogConfig.SPARK.properties()
+      },
+      {
+        2,
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        SparkCatalogConfig.HIVE.properties()
+      }
+    };
+  }
+
+  private final int formatVersion;
+
+  public TestChangelogBatchReads(
+      int formatVersion, String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+    this.formatVersion = formatVersion;
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testDataFilters() {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, formatVersion);
+    sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    sql("INSERT INTO %s VALUES (3, 'c')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap3 = table.currentSnapshot();
+
+    sql("DELETE FROM %s WHERE id = 3", tableName);
+
+    table.refresh();
+
+    Snapshot snap4 = table.currentSnapshot();
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(
+            row(3, "c", "INSERT", 2, snap3.snapshotId()),
+            row(3, "c", "DELETE", 3, snap4.snapshotId())),
+        sql("SELECT * FROM %s.changelog WHERE id = 3 ORDER BY change_ordinal, id", tableName));
+  }
+
+  @Test
+  public void testOverwrites() {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, formatVersion);
+    sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+
+    table.refresh();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(2, "b", "DELETE", 0, snap2.snapshotId()),
+            row(-2, "b", "INSERT", 0, snap2.snapshotId())),
+        changelogRecords(snap1, snap2));
+  }
+
+  @Test
+  public void testMetadataDeletes() {

Review Comment:
   why is this called metadata delete? is it because of the assertion of `DataOperations.DELETE`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r967706737


##########
core/src/main/java/org/apache/iceberg/hadoop/Util.java:
##########
@@ -69,19 +72,28 @@ public static String[] blockLocations(CombinedScanTask task, Configuration conf)
     return locationSets.toArray(new String[0]);
   }
 
-  public static String[] blockLocations(FileIO io, CombinedScanTask task) {
+  public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) {

Review Comment:
   probably more of a question for my understanding. Iceberg only guarantee compatibility for classes from iceberg-api module, correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r967716214


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogBatchReads.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Row;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameters;
+
+public class TestChangelogBatchReads extends SparkExtensionsTestBase {
+
+  @Parameters(name = "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {
+        1,
+        SparkCatalogConfig.SPARK.catalogName(),
+        SparkCatalogConfig.SPARK.implementation(),
+        SparkCatalogConfig.SPARK.properties()
+      },
+      {
+        2,
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        SparkCatalogConfig.HIVE.properties()
+      }
+    };
+  }
+
+  private final int formatVersion;
+
+  public TestChangelogBatchReads(
+      int formatVersion, String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+    this.formatVersion = formatVersion;
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testDataFilters() {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, formatVersion);
+    sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    sql("INSERT INTO %s VALUES (3, 'c')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap3 = table.currentSnapshot();
+
+    sql("DELETE FROM %s WHERE id = 3", tableName);
+
+    table.refresh();
+
+    Snapshot snap4 = table.currentSnapshot();
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(
+            row(3, "c", "INSERT", 2, snap3.snapshotId()),
+            row(3, "c", "DELETE", 3, snap4.snapshotId())),
+        sql("SELECT * FROM %s.changelog WHERE id = 3 ORDER BY change_ordinal, id", tableName));
+  }
+
+  @Test
+  public void testOverwrites() {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, formatVersion);
+    sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+
+    table.refresh();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(2, "b", "DELETE", 0, snap2.snapshotId()),
+            row(-2, "b", "INSERT", 0, snap2.snapshotId())),
+        changelogRecords(snap1, snap2));
+  }
+
+  @Test
+  public void testMetadataDeletes() {

Review Comment:
   I believe this is because the actual delete operation is issues against an entire partition, the partition of `data = 'a'`. This delete operation uses an optimized / "metadata only" operation; no data files need to be read or rewritten to perform the delete.
   
   Thata's always been my understanding of "metadata deletes". That they are deletes which only require updating metadata, without having to inspect data files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r967716214


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogBatchReads.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Row;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameters;
+
+public class TestChangelogBatchReads extends SparkExtensionsTestBase {
+
+  @Parameters(name = "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {
+        1,
+        SparkCatalogConfig.SPARK.catalogName(),
+        SparkCatalogConfig.SPARK.implementation(),
+        SparkCatalogConfig.SPARK.properties()
+      },
+      {
+        2,
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        SparkCatalogConfig.HIVE.properties()
+      }
+    };
+  }
+
+  private final int formatVersion;
+
+  public TestChangelogBatchReads(
+      int formatVersion, String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+    this.formatVersion = formatVersion;
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testDataFilters() {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, formatVersion);
+    sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    sql("INSERT INTO %s VALUES (3, 'c')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap3 = table.currentSnapshot();
+
+    sql("DELETE FROM %s WHERE id = 3", tableName);
+
+    table.refresh();
+
+    Snapshot snap4 = table.currentSnapshot();
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(
+            row(3, "c", "INSERT", 2, snap3.snapshotId()),
+            row(3, "c", "DELETE", 3, snap4.snapshotId())),
+        sql("SELECT * FROM %s.changelog WHERE id = 3 ORDER BY change_ordinal, id", tableName));
+  }
+
+  @Test
+  public void testOverwrites() {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, formatVersion);
+    sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+
+    table.refresh();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(2, "b", "DELETE", 0, snap2.snapshotId()),
+            row(-2, "b", "INSERT", 0, snap2.snapshotId())),
+        changelogRecords(snap1, snap2));
+  }
+
+  @Test
+  public void testMetadataDeletes() {

Review Comment:
   I believe this is because the actual delete operation is issues against an entire partition, and thus the delete uses an optimized / metadata-only operation (the delete doesn't need to read any data files).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r967717263


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.IncrementalChangelogScan;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.Statistics;
+import org.apache.spark.sql.connector.read.SupportsReportStatistics;
+import org.apache.spark.sql.types.StructType;
+
+class SparkChangelogScan implements Scan, SupportsReportStatistics {
+
+  private final SparkSession spark;
+  private final Table table;
+  private final IncrementalChangelogScan scan;
+  private final SparkReadConf readConf;
+  private final Schema expectedSchema;
+  private final List<Expression> filters;
+  private final Long startSnapshotId;
+  private final Long endSnapshotId;
+  private final boolean readTimestampWithoutZone;
+
+  // lazy variables
+  private List<ScanTaskGroup<ChangelogScanTask>> taskGroups;
+  private StructType readSchema;
+
+  SparkChangelogScan(
+      SparkSession spark,
+      Table table,
+      IncrementalChangelogScan scan,
+      SparkReadConf readConf,
+      Schema expectedSchema,
+      List<Expression> filters) {
+
+    SparkSchemaUtil.validateMetadataColumnReferences(table.schema(), expectedSchema);
+
+    this.spark = spark;
+    this.table = table;
+    this.scan = scan;
+    this.readConf = readConf;
+    this.expectedSchema = expectedSchema;
+    this.filters = filters != null ? filters : Collections.emptyList();
+    this.startSnapshotId = readConf.startSnapshotId();
+    this.endSnapshotId = readConf.endSnapshotId();
+    this.readTimestampWithoutZone = readConf.handleTimestampWithoutZone();
+  }
+
+  @Override
+  public Statistics estimateStatistics() {
+    long rowsCount = taskGroups().stream().mapToLong(ScanTaskGroup::estimatedRowsCount).sum();
+    long sizeInBytes = SparkSchemaUtil.estimateSize(readSchema(), rowsCount);
+    return new Stats(sizeInBytes, rowsCount);
+  }

Review Comment:
   If I remember correctly, statistics were calculated multiple times during the same query in some other scenarios.
   
   Would there be any benefit to caching this result? It was @bryanck I believe who found that we were spending extra time in statistics calculation before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5740: Spark 3.3: Add SparkChangelogTable

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5740:
URL: https://github.com/apache/iceberg/pull/5740#discussion_r967967680


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java:
##########
@@ -633,13 +672,21 @@ private Pair<String, List<String>> parseLocationString(String location) {
     }
   }
 
-  private Pair<Table, Long> loadFromPathIdentifier(PathIdentifier ident) {
+  @SuppressWarnings("CyclomaticComplexity")
+  private Table loadFromPathIdentifier(PathIdentifier ident) {
     Pair<String, List<String>> parsed = parseLocationString(ident.location());
 
     String metadataTableName = null;
     Long asOfTimestamp = null;
     Long snapshotId = null;
+    boolean isChangelog = false;
+
     for (String meta : parsed.second()) {
+      if (meta.equalsIgnoreCase(SparkChangelogTable.TABLE_NAME)) {

Review Comment:
   `changelog` should be the last element of the list, right? this may have false match.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org