You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/05/02 06:45:00 UTC

[jira] [Work logged] (HIVE-26183) Create delete writer for the UPDATE statemens

     [ https://issues.apache.org/jira/browse/HIVE-26183?focusedWorklogId=764850&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-764850 ]

ASF GitHub Bot logged work on HIVE-26183:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/May/22 06:44
            Start Date: 02/May/22 06:44
    Worklog Time Spent: 10m 
      Work Description: szlta commented on code in PR #3251:
URL: https://github.com/apache/hive/pull/3251#discussion_r862619509


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriterBase.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.InternalRecordWrapper;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.PartitioningWriter;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("checkstyle:VisibilityModifier")
+public abstract class HiveIcebergWriterBase implements FileSinkOperator.RecordWriter,
+    org.apache.hadoop.mapred.RecordWriter<NullWritable, Container<Record>>, HiveIcebergWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergWriterBase.class);
+
+  private static final Map<TaskAttemptID, Map<String, HiveIcebergWriter>> writers = Maps.newConcurrentMap();
+
+  static Map<String, HiveIcebergWriter> removeWriters(TaskAttemptID taskAttemptID) {
+    return writers.remove(taskAttemptID);
+  }
+
+  static Map<String, HiveIcebergWriter> getWriters(TaskAttemptID taskAttemptID) {
+    return writers.get(taskAttemptID);
+  }
+
+  protected final FileIO io;
+  protected final InternalRecordWrapper wrapper;
+  protected final Map<Integer, PartitionSpec> specs;
+  protected final Map<Integer, PartitionKey> partitionKeys;
+  protected final PartitioningWriter writer;
+
+  protected HiveIcebergWriterBase(Schema schema, Map<Integer, PartitionSpec> specs, FileIO io, TaskAttemptID attemptID,
+      String tableName, PartitioningWriter writer, boolean wrapped) {
+    this.io = io;
+    this.wrapper = new InternalRecordWrapper(schema.asStruct());
+    this.specs = specs;
+    this.partitionKeys = Maps.newHashMapWithExpectedSize(specs.size());
+    this.writer = writer;
+    if (!wrapped) {
+      writers.putIfAbsent(attemptID, Maps.newConcurrentMap());
+      writers.get(attemptID).put(tableName, this);
+    }
+  }
+
+  @Override
+  public void write(NullWritable key, Container value) throws IOException {
+    write(value);
+  }
+
+  @Override
+  public void close(Reporter reporter) throws IOException {
+    close(false);
+  }
+
+  @Override
+  public void close(boolean abort) throws IOException {
+    writer.close();
+    FilesForCommit result = files();
+
+    // If abort then remove the unnecessary files
+    if (abort) {
+      Tasks.foreach(result.allFiles())
+          .retry(3)
+          .suppressFailureWhenFinished()
+          .onFailure((file, exception) -> LOG.debug("Failed on to remove file {} on abort", file, exception))

Review Comment:
   Do we expect this to happen frequently? If not I'd suggest at least info level logging.



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergBufferedDeleteWriter.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.InternalRecordWrapper;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
+import org.apache.iceberg.io.DeleteWriteResult;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileWriterFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitioningWriter;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link HiveIcebergBufferedDeleteWriter} needs to handle out of order records.
+ * We need to keep the incoming records in memory until they are written out. To keep the memory consumption minimal
+ * we only write out {@link PositionDelete} files where the row data is omitted, so only the filenames and the rowIds
+ * have to be in the memory.
+ */
+public class HiveIcebergBufferedDeleteWriter implements HiveIcebergWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergBufferedDeleteWriter.class);
+
+  private static final String DELETE_FILE_THREAD_POOL_SIZE = "iceberg.delete.file.thread.pool.size";
+  private static final int DELETE_FILE_THREAD_POOL_SIZE_DEFAULT = 10;
+
+  // Storing deleted data in a map Partition -> FileName -> BitMap
+  private final Map<PartitionKey, Map<String, Roaring64Bitmap>> buffer = Maps.newHashMap();
+  private final Map<Integer, PartitionSpec> specs;
+  private final Map<PartitionKey, PartitionSpec> keyToSpec = Maps.newHashMap();
+  private final FileFormat format;
+  private final FileWriterFactory<Record> writerFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+  private final Configuration configuration;
+  private final Record record;
+  private final InternalRecordWrapper wrapper;
+  private FilesForCommit filesForCommit;
+
+  HiveIcebergBufferedDeleteWriter(Schema schema, Map<Integer, PartitionSpec> specs, FileFormat format,
+      FileWriterFactory<Record> writerFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize,
+      Configuration configuration) {
+    this.specs = specs;
+    this.format = format;
+    this.writerFactory = writerFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+    this.configuration = configuration;
+    this.wrapper = new InternalRecordWrapper(schema.asStruct());
+    this.record = GenericRecord.create(schema);
+  }
+
+  @Override
+  public void write(Writable row) throws IOException {
+    Record rec = ((Container<Record>) row).get();
+    IcebergAcidUtil.getOriginalFromUpdatedRecord(rec, record);
+    String filePath = (String) rec.getField(MetadataColumns.FILE_PATH.name());
+    int specId = IcebergAcidUtil.parseSpecId(rec);
+
+    Map<String, Roaring64Bitmap> deleteMap =
+        buffer.computeIfAbsent(partition(record, specId), key -> {
+          keyToSpec.put(key, specs.get(specId));
+          return Maps.newHashMap();
+        });
+    Roaring64Bitmap deletes = deleteMap.computeIfAbsent(filePath, unused -> new Roaring64Bitmap());
+    deletes.add((Long) rec.getField(MetadataColumns.ROW_POSITION.name()));
+  }
+
+  @Override
+  public void close(boolean abort) throws IOException {
+    long startTime = System.currentTimeMillis();
+    Collection<DeleteFile> deleteFiles = new ConcurrentLinkedQueue<>();
+    if (!abort) {
+      LOG.info("Delete file flush is started");
+      Tasks.foreach(buffer.keySet())
+          .retry(3)
+          .executeWith(fileExecutor(configuration, buffer.size()))
+          .onFailure((partition, exception) -> LOG.debug("Failed to write delete file {}", partition, exception))
+          .run(partition -> {
+            PositionDelete<Record> positionDelete = PositionDelete.create();
+            PartitioningWriter writerForData =
+                new ClusteredPositionDeleteWriter<>(writerFactory, fileFactory, io, format, targetFileSize);
+            try (PartitioningWriter writer = writerForData) {
+              writerForData = writer;
+              for (String filePath : new TreeSet<>(buffer.get(partition).keySet())) {
+                Roaring64Bitmap deletes = buffer.get(partition).get(filePath);
+                deletes.forEach(position -> {
+                  positionDelete.set(filePath, position, null);
+                  writer.write(positionDelete, keyToSpec.get(partition), partition);
+                });
+              }
+            }
+            deleteFiles.addAll(((DeleteWriteResult) writerForData.result()).deleteFiles());
+          }, IOException.class);
+    }
+
+    LOG.info("HiveIcebergBufferedDeleteWriter is closed with abort={}. Created {} delete files and it took {} ns.",
+        abort, deleteFiles.size(), System.currentTimeMillis() - startTime);
+    LOG.debug("Delete files written {}", deleteFiles);
+
+    this.filesForCommit = FilesForCommit.onlyDelete(deleteFiles);
+  }
+
+  @Override
+  public FilesForCommit files() {
+    return filesForCommit;
+  }
+
+  protected PartitionKey partition(Record row, int specId) {
+    PartitionKey partitionKey = new PartitionKey(specs.get(specId), specs.get(specId).schema());
+    partitionKey.partition(wrapper.wrap(row));
+    return partitionKey;
+  }
+
+  /**
+   * Executor service for parallel writing of delete files.
+   * @param conf The configuration containing the pool size
+   * @return The generated executor service
+   */
+  private static ExecutorService fileExecutor(Configuration conf, int maxSize) {

Review Comment:
   Question: do we ever need to close this executor service?



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergBufferedDeleteWriter.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.InternalRecordWrapper;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
+import org.apache.iceberg.io.DeleteWriteResult;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileWriterFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitioningWriter;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link HiveIcebergBufferedDeleteWriter} needs to handle out of order records.
+ * We need to keep the incoming records in memory until they are written out. To keep the memory consumption minimal
+ * we only write out {@link PositionDelete} files where the row data is omitted, so only the filenames and the rowIds
+ * have to be in the memory.
+ */
+public class HiveIcebergBufferedDeleteWriter implements HiveIcebergWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergBufferedDeleteWriter.class);
+
+  private static final String DELETE_FILE_THREAD_POOL_SIZE = "iceberg.delete.file.thread.pool.size";
+  private static final int DELETE_FILE_THREAD_POOL_SIZE_DEFAULT = 10;
+
+  // Storing deleted data in a map Partition -> FileName -> BitMap
+  private final Map<PartitionKey, Map<String, Roaring64Bitmap>> buffer = Maps.newHashMap();
+  private final Map<Integer, PartitionSpec> specs;
+  private final Map<PartitionKey, PartitionSpec> keyToSpec = Maps.newHashMap();
+  private final FileFormat format;
+  private final FileWriterFactory<Record> writerFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+  private final Configuration configuration;
+  private final Record record;
+  private final InternalRecordWrapper wrapper;
+  private FilesForCommit filesForCommit;
+
+  HiveIcebergBufferedDeleteWriter(Schema schema, Map<Integer, PartitionSpec> specs, FileFormat format,
+      FileWriterFactory<Record> writerFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize,
+      Configuration configuration) {
+    this.specs = specs;
+    this.format = format;
+    this.writerFactory = writerFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+    this.configuration = configuration;
+    this.wrapper = new InternalRecordWrapper(schema.asStruct());
+    this.record = GenericRecord.create(schema);
+  }
+
+  @Override
+  public void write(Writable row) throws IOException {
+    Record rec = ((Container<Record>) row).get();
+    IcebergAcidUtil.getOriginalFromUpdatedRecord(rec, record);
+    String filePath = (String) rec.getField(MetadataColumns.FILE_PATH.name());
+    int specId = IcebergAcidUtil.parseSpecId(rec);
+
+    Map<String, Roaring64Bitmap> deleteMap =
+        buffer.computeIfAbsent(partition(record, specId), key -> {
+          keyToSpec.put(key, specs.get(specId));
+          return Maps.newHashMap();
+        });
+    Roaring64Bitmap deletes = deleteMap.computeIfAbsent(filePath, unused -> new Roaring64Bitmap());
+    deletes.add((Long) rec.getField(MetadataColumns.ROW_POSITION.name()));
+  }
+
+  @Override
+  public void close(boolean abort) throws IOException {
+    long startTime = System.currentTimeMillis();
+    Collection<DeleteFile> deleteFiles = new ConcurrentLinkedQueue<>();
+    if (!abort) {
+      LOG.info("Delete file flush is started");
+      Tasks.foreach(buffer.keySet())
+          .retry(3)
+          .executeWith(fileExecutor(configuration, buffer.size()))
+          .onFailure((partition, exception) -> LOG.debug("Failed to write delete file {}", partition, exception))
+          .run(partition -> {
+            PositionDelete<Record> positionDelete = PositionDelete.create();
+            PartitioningWriter writerForData =
+                new ClusteredPositionDeleteWriter<>(writerFactory, fileFactory, io, format, targetFileSize);
+            try (PartitioningWriter writer = writerForData) {
+              writerForData = writer;
+              for (String filePath : new TreeSet<>(buffer.get(partition).keySet())) {
+                Roaring64Bitmap deletes = buffer.get(partition).get(filePath);

Review Comment:
   Maybe refactor so that we don't do the `buffer.get(partition)` lookup twice?



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergBufferedDeleteWriter.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.InternalRecordWrapper;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
+import org.apache.iceberg.io.DeleteWriteResult;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileWriterFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitioningWriter;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link HiveIcebergBufferedDeleteWriter} needs to handle out of order records.
+ * We need to keep the incoming records in memory until they are written out. To keep the memory consumption minimal
+ * we only write out {@link PositionDelete} files where the row data is omitted, so only the filenames and the rowIds
+ * have to be in the memory.
+ */
+public class HiveIcebergBufferedDeleteWriter implements HiveIcebergWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergBufferedDeleteWriter.class);
+
+  private static final String DELETE_FILE_THREAD_POOL_SIZE = "iceberg.delete.file.thread.pool.size";
+  private static final int DELETE_FILE_THREAD_POOL_SIZE_DEFAULT = 10;
+
+  // Storing deleted data in a map Partition -> FileName -> BitMap
+  private final Map<PartitionKey, Map<String, Roaring64Bitmap>> buffer = Maps.newHashMap();
+  private final Map<Integer, PartitionSpec> specs;
+  private final Map<PartitionKey, PartitionSpec> keyToSpec = Maps.newHashMap();
+  private final FileFormat format;
+  private final FileWriterFactory<Record> writerFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+  private final Configuration configuration;
+  private final Record record;
+  private final InternalRecordWrapper wrapper;
+  private FilesForCommit filesForCommit;
+
+  HiveIcebergBufferedDeleteWriter(Schema schema, Map<Integer, PartitionSpec> specs, FileFormat format,
+      FileWriterFactory<Record> writerFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize,
+      Configuration configuration) {
+    this.specs = specs;
+    this.format = format;
+    this.writerFactory = writerFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+    this.configuration = configuration;
+    this.wrapper = new InternalRecordWrapper(schema.asStruct());
+    this.record = GenericRecord.create(schema);
+  }
+
+  @Override
+  public void write(Writable row) throws IOException {
+    Record rec = ((Container<Record>) row).get();
+    IcebergAcidUtil.getOriginalFromUpdatedRecord(rec, record);
+    String filePath = (String) rec.getField(MetadataColumns.FILE_PATH.name());
+    int specId = IcebergAcidUtil.parseSpecId(rec);
+
+    Map<String, Roaring64Bitmap> deleteMap =
+        buffer.computeIfAbsent(partition(record, specId), key -> {
+          keyToSpec.put(key, specs.get(specId));
+          return Maps.newHashMap();
+        });
+    Roaring64Bitmap deletes = deleteMap.computeIfAbsent(filePath, unused -> new Roaring64Bitmap());
+    deletes.add((Long) rec.getField(MetadataColumns.ROW_POSITION.name()));
+  }
+
+  @Override
+  public void close(boolean abort) throws IOException {
+    long startTime = System.currentTimeMillis();
+    Collection<DeleteFile> deleteFiles = new ConcurrentLinkedQueue<>();
+    if (!abort) {
+      LOG.info("Delete file flush is started");
+      Tasks.foreach(buffer.keySet())
+          .retry(3)
+          .executeWith(fileExecutor(configuration, buffer.size()))
+          .onFailure((partition, exception) -> LOG.debug("Failed to write delete file {}", partition, exception))
+          .run(partition -> {
+            PositionDelete<Record> positionDelete = PositionDelete.create();
+            PartitioningWriter writerForData =
+                new ClusteredPositionDeleteWriter<>(writerFactory, fileFactory, io, format, targetFileSize);
+            try (PartitioningWriter writer = writerForData) {
+              writerForData = writer;

Review Comment:
   Why was this line necessary?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 764850)
    Time Spent: 1h  (was: 50m)

> Create delete writer for the UPDATE statemens
> ---------------------------------------------
>
>                 Key: HIVE-26183
>                 URL: https://issues.apache.org/jira/browse/HIVE-26183
>             Project: Hive
>          Issue Type: Sub-task
>            Reporter: Peter Vary
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> During the investigation of the updates of partitioned table we had the following issue:
> - Iceberg inserts are needed to be sorted by the new partition keys
> - Iceberg deletes are needed to be sorted by the old partition keys and filenames
> This could contradict each other. OTOH Hive updates create a single query and writes out the insert/delete record for ever row. This would mean plenty of open writers.
> We might want to create something like a https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java, but we do not want to keep the whole rows in memory.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)