You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/09/21 22:28:34 UTC

[GitHub] [iceberg] aokolnychyi opened a new pull request #3164: Core: Add PartitioningWriter

aokolnychyi opened a new pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164


   This PR adds the `PartitioningWriter` interface and two implementations:
   - `ClusteredWriter`
   - `FanoutWriter`
   
   Benchmarks are coming in a separate PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715169093



##########
File path: core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructLikeSet;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be clustered by partition spec and by partition within each spec.
+ * <p>
+ * As opposed to {@link FanoutWriter}, this writer keeps at most one file open to reduce
+ * the memory consumption. Prefer using this writer whenever the incoming records can be clustered
+ * by spec/partition.
+ */
+abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Set<Integer> completedSpecIds = Sets.newHashSet();
+
+  private PartitionSpec currentSpec = null;
+  private Comparator<StructLike> partitionComparator = null;
+  private Set<StructLike> completedPartitions = null;
+  private StructLike currentPartition = null;
+  private FileWriter<T, R> currentWriter = null;
+
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    if (!spec.equals(currentSpec)) {
+      if (currentSpec != null) {
+        closeCurrentWriter();
+        completedSpecIds.add(currentSpec.specId());
+        completedPartitions.clear();
+      }
+
+      if (completedSpecIds.contains(spec.specId())) {
+        throw new IllegalStateException("Already closed files for spec: " + spec.specId());
+      }
+
+      StructType partitionType = spec.partitionType();
+
+      currentSpec = spec;
+      partitionComparator = Comparators.forType(partitionType);
+      completedPartitions = StructLikeSet.create(partitionType);
+      // copy the partition key as the key object may be reused
+      currentPartition = StructCopy.copy(partition);
+      currentWriter = newWriter(currentSpec, currentPartition);
+
+    } else if (partition != currentPartition && partitionComparator.compare(partition, currentPartition) != 0) {

Review comment:
       Okay, got it. That sounds fine.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r714966254



##########
File path: core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructLikeSet;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be clustered by partition spec and by partition within each spec.
+ * <p>
+ * As opposed to {@link FanoutWriter}, this writer keeps at most one file open to reduce
+ * the memory consumption. Prefer using this writer whenever the incoming records can be clustered
+ * by spec/partition.
+ */
+abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Set<Integer> completedSpecIds = Sets.newHashSet();
+
+  private PartitionSpec currentSpec = null;
+  private Comparator<StructLike> partitionComparator = null;
+  private Set<StructLike> completedPartitions = null;
+  private StructLike currentPartition = null;
+  private FileWriter<T, R> currentWriter = null;
+
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    if (!spec.equals(currentSpec)) {
+      if (currentSpec != null) {
+        closeCurrentWriter();
+        completedSpecIds.add(currentSpec.specId());
+        completedPartitions.clear();
+      }
+
+      if (completedSpecIds.contains(spec.specId())) {
+        throw new IllegalStateException("Already closed files for spec: " + spec.specId());

Review comment:
       Maybe, it is the right time to add a longer error message that will clarify what happened. I'll look into that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r714590188



##########
File path: core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.StructLikeMap;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that keeps files for each
+ * seen spec/partition pair open until this writer is closed.
+ * <p>
+ * As opposed to {@link ClusteredWriter}, this writer does not require the incoming records
+ * to be clustered by partition spec and partition as all files are kept open. As a consequence,
+ * this writer may potentially consume substantially more memory compared to {@link ClusteredWriter}.
+ * Use this writer only when clustering by spec/partition is not possible (e.g. streaming).
+ */
+abstract class FanoutWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Map<Integer, Map<StructLike, FileWriter<T, R>>> writers = Maps.newHashMap();
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    FileWriter<T, R> writer = writer(spec, partition);
+    writer.write(row);
+  }
+
+  private FileWriter<T, R> writer(PartitionSpec spec, StructLike partition) {
+    Map<StructLike, FileWriter<T, R>> specWriters = writers.computeIfAbsent(
+        spec.specId(),
+        id -> StructLikeMap.create(spec.partitionType()));
+    FileWriter<T, R> writer = specWriters.get(partition);
+
+    if (writer == null) {
+      // copy the partition key as the key object may be reused
+      StructLike copiedPartition = StructCopy.copy(partition);
+      writer = newWriter(spec, copiedPartition);
+      specWriters.put(copiedPartition, writer);
+    }
+
+    return writer;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!closed) {
+      closeWriters();
+      this.closed = true;
+    }
+  }
+
+  private void closeWriters() throws IOException {
+    for (Map<StructLike, FileWriter<T, R>> specWriters : writers.values()) {
+      for (FileWriter<T, R> writer : specWriters.values()) {
+        writer.close();
+        addResult(writer.result());
+      }
+
+      specWriters.clear();
+    }
+
+    writers.clear();
+  }
+
+  @Override
+  public final R result() {
+    Preconditions.checkState(closed, "Cannot get result from unclosed writer");
+    return aggregatedResult();
+  }
+
+  protected EncryptedOutputFile newOutputFile(OutputFileFactory fileFactory, PartitionSpec spec, StructLike partition) {
+    return partition == null ? fileFactory.newOutputFile() : fileFactory.newOutputFile(spec, partition);

Review comment:
       Nit:  similar pre-check to this [comment](https://github.com/apache/iceberg/pull/3164/files#r714561637) ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r714123188



##########
File path: core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.StructLikeMap;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that keeps files for each
+ * seen spec/partition pair open until this writer is closed.
+ * <p>
+ * As opposed to {@link ClusteredWriter}, this writer does not require the incoming records
+ * to be clustered by partition spec and partition as all files are kept open. As a consequence,
+ * this writer may potentially consume substantially more memory compared to {@link ClusteredWriter}.
+ * Use this writer only when clustering by spec/partition is not possible (e.g. streaming).
+ */
+abstract class FanoutWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Map<Integer, Map<StructLike, FileWriter<T, R>>> writers = Maps.newHashMap();
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    FileWriter<T, R> writer = writer(spec, partition);
+    writer.write(row);
+  }
+
+  private FileWriter<T, R> writer(PartitionSpec spec, StructLike partition) {

Review comment:
       Compared to the old implementation, we have an extra `computeIfAbsent` call and using `StructLikeMap` instead of a regular map. The performance hit seems negligible according to benchmark results.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r714251189



##########
File path: core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructLikeSet;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be clustered by partition spec and partition.
+ * <p>
+ * As opposed to {@link FanoutWriter}, this writer keeps at most one file open to reduce
+ * the memory consumption. Prefer using this writer whenever the incoming records can be clustered
+ * by spec/partition.
+ */
+abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Set<Integer> completedSpecIds = Sets.newHashSet();
+
+  private PartitionSpec currentSpec = null;
+  private Comparator<StructLike> partitionComparator = null;
+  private Set<StructLike> completedPartitions = null;
+  private StructLike currentPartition = null;
+  private FileWriter<T, R> currentWriter = null;
+
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    if (!spec.equals(currentSpec)) {
+      if (currentSpec != null) {
+        closeCurrentWriter();
+        completedSpecIds.add(currentSpec.specId());
+        completedPartitions.clear();
+      }
+
+      if (completedSpecIds.contains(spec.specId())) {
+        throw new IllegalStateException("Already closed files for spec: " + spec.specId());
+      }
+
+      StructType partitionType = spec.partitionType();
+
+      currentSpec = spec;
+      partitionComparator = Comparators.forType(partitionType);
+      completedPartitions = StructLikeSet.create(partitionType);
+      // copy the partition key as the key object may be reused
+      currentPartition = StructCopy.copy(partition);
+      currentWriter = newWriter(currentSpec, currentPartition);
+
+    } else if (partition != currentPartition && partitionComparator.compare(partition, currentPartition) != 0) {
+      closeCurrentWriter();
+      completedPartitions.add(currentPartition);
+
+      if (completedPartitions.contains(partition)) {
+        String path = spec.partitionToPath(partition);
+        throw new IllegalStateException("Already closed files for partition: " + path);

Review comment:
       Good idea. I'll update.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r714213087



##########
File path: core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructLikeSet;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be clustered by partition spec and partition.

Review comment:
       This note is on several of the apis, not sure if it is needed but is there a way to say that we need the data to be clustered on partition spec, and within that by partition? Maybe that is obvious to other folks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r714257797



##########
File path: core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructLikeSet;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be clustered by partition spec and partition.
+ * <p>
+ * As opposed to {@link FanoutWriter}, this writer keeps at most one file open to reduce
+ * the memory consumption. Prefer using this writer whenever the incoming records can be clustered
+ * by spec/partition.
+ */
+abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Set<Integer> completedSpecIds = Sets.newHashSet();
+
+  private PartitionSpec currentSpec = null;
+  private Comparator<StructLike> partitionComparator = null;
+  private Set<StructLike> completedPartitions = null;
+  private StructLike currentPartition = null;
+  private FileWriter<T, R> currentWriter = null;
+
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    if (!spec.equals(currentSpec)) {
+      if (currentSpec != null) {
+        closeCurrentWriter();
+        completedSpecIds.add(currentSpec.specId());
+        completedPartitions.clear();
+      }
+
+      if (completedSpecIds.contains(spec.specId())) {
+        throw new IllegalStateException("Already closed files for spec: " + spec.specId());
+      }
+
+      StructType partitionType = spec.partitionType();
+
+      currentSpec = spec;
+      partitionComparator = Comparators.forType(partitionType);
+      completedPartitions = StructLikeSet.create(partitionType);
+      // copy the partition key as the key object may be reused
+      currentPartition = StructCopy.copy(partition);
+      currentWriter = newWriter(currentSpec, currentPartition);
+
+    } else if (partition != currentPartition && partitionComparator.compare(partition, currentPartition) != 0) {
+      closeCurrentWriter();
+      completedPartitions.add(currentPartition);
+
+      if (completedPartitions.contains(partition)) {
+        String path = spec.partitionToPath(partition);
+        throw new IllegalStateException("Already closed files for partition: " + path);

Review comment:
       Fixed.

##########
File path: core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructLikeSet;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be clustered by partition spec and partition.

Review comment:
       Updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715166505



##########
File path: data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
##########
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestPartitioningWriters<T> extends WriterTestBase<T> {
+
+  @Parameterized.Parameters(name = "FileFormat={0}")
+  public static Object[] parameters() {
+    return new Object[][] {
+        new Object[]{FileFormat.AVRO},
+        new Object[]{FileFormat.PARQUET},
+        new Object[]{FileFormat.ORC},
+    };
+  }
+
+  private static final int TABLE_FORMAT_VERSION = 2;
+  private static final long TARGET_FILE_SIZE = 128L * 1024 * 1024;
+
+  private final FileFormat fileFormat;
+  private OutputFileFactory fileFactory = null;
+
+  public TestPartitioningWriters(FileFormat fileFormat) {
+    super(TABLE_FORMAT_VERSION);
+    this.fileFormat = fileFormat;
+  }
+
+  protected abstract StructLikeSet toSet(Iterable<T> records);
+
+  protected FileFormat format() {
+    return fileFormat;
+  }
+
+  @Before
+  public void setupTable() throws Exception {
+    this.tableDir = temp.newFolder();
+    Assert.assertTrue(tableDir.delete()); // created during table creation
+
+    this.metadataDir = new File(tableDir, "metadata");
+    this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+    this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build();
+  }
+
+  @Test
+  public void testClusteredDataWriterNoRecords() throws IOException {
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    writer.close();
+    Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+
+    writer.close();
+    Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+  }
+
+  @Test
+  public void testClusteredDataWriterMultiplePartitions() throws IOException {
+    table.updateSpec()
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    PartitionSpec spec = table.spec();
+
+    writer.write(toRow(1, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(2, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(3, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(4, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(5, "ccc"), spec, partitionKey(spec, "ccc"));
+
+    writer.close();
+
+    DataWriteResult result = writer.result();
+    Assert.assertEquals("Must be 3 data files", 3, result.dataFiles().size());
+
+    RowDelta rowDelta = table.newRowDelta();
+    result.dataFiles().forEach(rowDelta::addRows);
+    rowDelta.commit();
+
+    List<T> expectedRows = ImmutableList.of(
+        toRow(1, "aaa"),
+        toRow(2, "aaa"),
+        toRow(3, "bbb"),
+        toRow(4, "bbb"),
+        toRow(5, "ccc")
+    );
+    Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
+  }
+
+  @Test
+  public void testClusteredDataWriterOutOfOrderPartitions() throws IOException {
+    table.updateSpec()
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    PartitionSpec spec = table.spec();
+
+    writer.write(toRow(1, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(2, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(3, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(4, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(5, "ccc"), spec, partitionKey(spec, "ccc"));
+
+    AssertHelpers.assertThrows("Should fail to write out of order partitions",
+        IllegalStateException.class, "Already closed files for partition",
+        () -> {
+          try {
+            writer.write(toRow(6, "aaa"), spec, partitionKey(spec, "aaa"));
+          } catch (IOException e) {

Review comment:
       I don't think that `write` should throw `IOException`. We always wrap `IOException` in `UncheckedIOException` so it makes no sense for us to throw it from the writer interface.
   
   I think I missed this when reviewing the `FileWriter` interfaces.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715112110



##########
File path: core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.StructLikeMap;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that keeps files for each
+ * seen spec/partition pair open until this writer is closed.
+ * <p>
+ * As opposed to {@link ClusteredWriter}, this writer does not require the incoming records
+ * to be clustered by partition spec and partition as all files are kept open. As a consequence,
+ * this writer may potentially consume substantially more memory compared to {@link ClusteredWriter}.
+ * Use this writer only when clustering by spec/partition is not possible (e.g. streaming).
+ */
+abstract class FanoutWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Map<Integer, Map<StructLike, FileWriter<T, R>>> writers = Maps.newHashMap();
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    FileWriter<T, R> writer = writer(spec, partition);
+    writer.write(row);
+  }
+
+  private FileWriter<T, R> writer(PartitionSpec spec, StructLike partition) {
+    Map<StructLike, FileWriter<T, R>> specWriters = writers.computeIfAbsent(
+        spec.specId(),
+        id -> StructLikeMap.create(spec.partitionType()));
+    FileWriter<T, R> writer = specWriters.get(partition);

Review comment:
       I did more benchmarks for 2.5 million records and 1000k partitions. I was using get/put methods heavily.
   
   ```
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r714564157



##########
File path: core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * An equality delete writer capable of writing to multiple specs and partitions that requires
+ * the incoming delete records to be properly clustered by partition spec and by partition within each spec.
+ */
+public class ClusteredEqualityDeleteWriter<T> extends ClusteredWriter<T, DeleteWriteResult> {
+
+  private final FileWriterFactory<T> writerFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final FileFormat fileFormat;
+  private final long targetFileSizeInBytes;
+  private final List<DeleteFile> deleteFiles;
+
+  public ClusteredEqualityDeleteWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
+                                       FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) {
+    this.writerFactory = writerFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.fileFormat = fileFormat;
+    this.targetFileSizeInBytes = targetFileSizeInBytes;
+    this.deleteFiles = Lists.newArrayList();
+  }
+
+  @Override
+  protected FileWriter<T, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
+    // TODO: support ORC rolling writers

Review comment:
       Filed a separate issue for this: https://github.com/apache/iceberg/issues/3169




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r714285979



##########
File path: core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.StructLikeMap;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that keeps files for each
+ * seen spec/partition pair open until this writer is closed.
+ * <p>
+ * As opposed to {@link ClusteredWriter}, this writer does not require the incoming records
+ * to be clustered by partition spec and partition as all files are kept open. As a consequence,
+ * this writer may potentially consume substantially more memory compared to {@link ClusteredWriter}.
+ * Use this writer only when clustering by spec/partition is not possible (e.g. streaming).
+ */
+abstract class FanoutWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Map<Integer, Map<StructLike, FileWriter<T, R>>> writers = Maps.newHashMap();

Review comment:
       Since we only are allowing StructLikeMap in the value of this, perhaps we should have that in the type signature?
   
   Map<StructLikeMap<FileWriter<T, R>> 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#issuecomment-926235367


   Thanks for reviewing, @RussellSpitzer @openinx @rdblue!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715280777



##########
File path: core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.StructLikeMap;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that keeps files for each
+ * seen spec/partition pair open until this writer is closed.
+ * <p>
+ * As opposed to {@link ClusteredWriter}, this writer does not require the incoming records
+ * to be clustered by partition spec and partition as all files are kept open. As a consequence,
+ * this writer may potentially consume substantially more memory compared to {@link ClusteredWriter}.
+ * Use this writer only when clustering by spec/partition is not possible (e.g. streaming).
+ */
+abstract class FanoutWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Map<Integer, Map<StructLike, FileWriter<T, R>>> writers = Maps.newHashMap();
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    FileWriter<T, R> writer = writer(spec, partition);
+    writer.write(row);
+  }
+
+  private FileWriter<T, R> writer(PartitionSpec spec, StructLike partition) {
+    Map<StructLike, FileWriter<T, R>> specWriters = writers.computeIfAbsent(
+        spec.specId(),
+        id -> StructLikeMap.create(spec.partitionType()));
+    FileWriter<T, R> writer = specWriters.get(partition);

Review comment:
       Thanks for the detail explanation and performance report, @aokolnychyi !   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715161155



##########
File path: core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.StructLikeMap;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that keeps files for each
+ * seen spec/partition pair open until this writer is closed.
+ * <p>
+ * As opposed to {@link ClusteredWriter}, this writer does not require the incoming records
+ * to be clustered by partition spec and partition as all files are kept open. As a consequence,
+ * this writer may potentially consume substantially more memory compared to {@link ClusteredWriter}.
+ * Use this writer only when clustering by spec/partition is not possible (e.g. streaming).
+ */
+abstract class FanoutWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Map<Integer, Map<StructLike, FileWriter<T, R>>> writers = Maps.newHashMap();
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    FileWriter<T, R> writer = writer(spec, partition);
+    writer.write(row);
+  }
+
+  private FileWriter<T, R> writer(PartitionSpec spec, StructLike partition) {
+    Map<StructLike, FileWriter<T, R>> specWriters = writers.computeIfAbsent(
+        spec.specId(),
+        id -> StructLikeMap.create(spec.partitionType()));
+    FileWriter<T, R> writer = specWriters.get(partition);

Review comment:
       This may be because we cache the hash value in `StructLikeWrapper`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r714534008



##########
File path: core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructLikeSet;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be clustered by partition spec and by partition within each spec.
+ * <p>
+ * As opposed to {@link FanoutWriter}, this writer keeps at most one file open to reduce
+ * the memory consumption. Prefer using this writer whenever the incoming records can be clustered
+ * by spec/partition.
+ */
+abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Set<Integer> completedSpecIds = Sets.newHashSet();
+
+  private PartitionSpec currentSpec = null;
+  private Comparator<StructLike> partitionComparator = null;
+  private Set<StructLike> completedPartitions = null;
+  private StructLike currentPartition = null;
+  private FileWriter<T, R> currentWriter = null;
+
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    if (!spec.equals(currentSpec)) {
+      if (currentSpec != null) {
+        closeCurrentWriter();
+        completedSpecIds.add(currentSpec.specId());
+        completedPartitions.clear();
+      }
+
+      if (completedSpecIds.contains(spec.specId())) {
+        throw new IllegalStateException("Already closed files for spec: " + spec.specId());
+      }
+
+      StructType partitionType = spec.partitionType();
+
+      currentSpec = spec;
+      partitionComparator = Comparators.forType(partitionType);
+      completedPartitions = StructLikeSet.create(partitionType);
+      // copy the partition key as the key object may be reused
+      currentPartition = StructCopy.copy(partition);
+      currentWriter = newWriter(currentSpec, currentPartition);
+
+    } else if (partition != currentPartition && partitionComparator.compare(partition, currentPartition) != 0) {

Review comment:
       It will throw NullPointerException if the `partition` is null because the `partitionComparator` cannot compare null values , right ?  I remember we will use null value for `partition` for unifying the partitioned writer and unpartitioned writer code path.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#issuecomment-925538346


   Thanks @aokolnychyi for pinging me, let me take a look today !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r714304303



##########
File path: data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
##########
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestPartitioningWriters<T> extends WriterTestBase<T> {
+
+  @Parameterized.Parameters(name = "FileFormat={0}")
+  public static Object[] parameters() {
+    return new Object[][] {
+        new Object[]{FileFormat.AVRO},
+        new Object[]{FileFormat.PARQUET},
+        new Object[]{FileFormat.ORC},
+    };
+  }
+
+  private static final int TABLE_FORMAT_VERSION = 2;
+  private static final long TARGET_FILE_SIZE = 128L * 1024 * 1024;
+
+  private final FileFormat fileFormat;
+  private OutputFileFactory fileFactory = null;
+
+  public TestPartitioningWriters(FileFormat fileFormat) {
+    super(TABLE_FORMAT_VERSION);
+    this.fileFormat = fileFormat;
+  }
+
+  protected abstract StructLikeSet toSet(Iterable<T> records);
+
+  protected FileFormat format() {
+    return fileFormat;
+  }
+
+  @Before
+  public void setupTable() throws Exception {
+    this.tableDir = temp.newFolder();
+    Assert.assertTrue(tableDir.delete()); // created during table creation
+
+    this.metadataDir = new File(tableDir, "metadata");
+    this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+    this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build();
+  }
+
+  @Test
+  public void testClusteredDataWriterNoRecords() throws IOException {
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    writer.close();

Review comment:
       Why do we close the writer twice?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715137890



##########
File path: core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructLikeSet;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be clustered by partition spec and by partition within each spec.
+ * <p>
+ * As opposed to {@link FanoutWriter}, this writer keeps at most one file open to reduce
+ * the memory consumption. Prefer using this writer whenever the incoming records can be clustered
+ * by spec/partition.
+ */
+abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Set<Integer> completedSpecIds = Sets.newHashSet();
+
+  private PartitionSpec currentSpec = null;
+  private Comparator<StructLike> partitionComparator = null;
+  private Set<StructLike> completedPartitions = null;
+  private StructLike currentPartition = null;
+  private FileWriter<T, R> currentWriter = null;
+
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    if (!spec.equals(currentSpec)) {
+      if (currentSpec != null) {
+        closeCurrentWriter();
+        completedSpecIds.add(currentSpec.specId());
+        completedPartitions.clear();
+      }
+
+      if (completedSpecIds.contains(spec.specId())) {
+        throw new IllegalStateException("Already closed files for spec: " + spec.specId());
+      }
+
+      StructType partitionType = spec.partitionType();
+
+      currentSpec = spec;
+      partitionComparator = Comparators.forType(partitionType);
+      completedPartitions = StructLikeSet.create(partitionType);
+      // copy the partition key as the key object may be reused
+      currentPartition = StructCopy.copy(partition);
+      currentWriter = newWriter(currentSpec, currentPartition);
+
+    } else if (partition != currentPartition && partitionComparator.compare(partition, currentPartition) != 0) {
+      closeCurrentWriter();
+      completedPartitions.add(currentPartition);
+
+      if (completedPartitions.contains(partition)) {
+        String path = spec.partitionToPath(partition);
+        String errMsg = String.format("Already closed files for partition '%s' in spec %d", path, spec.specId());
+        throw new IllegalStateException(errMsg);

Review comment:
       Nit: do we need a variable for `errMsg`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r714315864



##########
File path: data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
##########
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestPartitioningWriters<T> extends WriterTestBase<T> {
+
+  @Parameterized.Parameters(name = "FileFormat={0}")
+  public static Object[] parameters() {
+    return new Object[][] {
+        new Object[]{FileFormat.AVRO},
+        new Object[]{FileFormat.PARQUET},
+        new Object[]{FileFormat.ORC},
+    };
+  }
+
+  private static final int TABLE_FORMAT_VERSION = 2;
+  private static final long TARGET_FILE_SIZE = 128L * 1024 * 1024;
+
+  private final FileFormat fileFormat;
+  private OutputFileFactory fileFactory = null;
+
+  public TestPartitioningWriters(FileFormat fileFormat) {
+    super(TABLE_FORMAT_VERSION);
+    this.fileFormat = fileFormat;
+  }
+
+  protected abstract StructLikeSet toSet(Iterable<T> records);
+
+  protected FileFormat format() {
+    return fileFormat;
+  }
+
+  @Before
+  public void setupTable() throws Exception {
+    this.tableDir = temp.newFolder();
+    Assert.assertTrue(tableDir.delete()); // created during table creation
+
+    this.metadataDir = new File(tableDir, "metadata");
+    this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+    this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build();
+  }
+
+  @Test
+  public void testClusteredDataWriterNoRecords() throws IOException {
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    writer.close();
+    Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+
+    writer.close();
+    Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+  }
+
+  @Test
+  public void testClusteredDataWriterMultiplePartitions() throws IOException {
+    table.updateSpec()
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    PartitionSpec spec = table.spec();
+
+    writer.write(toRow(1, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(2, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(3, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(4, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(5, "ccc"), spec, partitionKey(spec, "ccc"));
+
+    writer.close();
+
+    DataWriteResult result = writer.result();
+    Assert.assertEquals("Must be 3 data files", 3, result.dataFiles().size());
+
+    RowDelta rowDelta = table.newRowDelta();
+    result.dataFiles().forEach(rowDelta::addRows);
+    rowDelta.commit();
+
+    List<T> expectedRows = ImmutableList.of(
+        toRow(1, "aaa"),
+        toRow(2, "aaa"),
+        toRow(3, "bbb"),
+        toRow(4, "bbb"),
+        toRow(5, "ccc")
+    );
+    Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
+  }
+
+  @Test
+  public void testClusteredDataWriterOutOfOrderPartitions() throws IOException {
+    table.updateSpec()
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    PartitionSpec spec = table.spec();
+
+    writer.write(toRow(1, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(2, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(3, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(4, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(5, "ccc"), spec, partitionKey(spec, "ccc"));
+
+    AssertHelpers.assertThrows("Should fail to write out of order partitions",
+        IllegalStateException.class, "Already closed files for partition",
+        () -> {
+          try {
+            writer.write(toRow(6, "aaa"), spec, partitionKey(spec, "aaa"));
+          } catch (IOException e) {
+            throw new UncheckedIOException(e);
+          }
+        });
+
+    writer.close();
+  }
+
+  @Test
+  public void testClusteredEqualityDeleteWriterNoRecords() throws IOException {
+    Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+
+    List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
+    Schema equalityDeleteRowSchema = table.schema().select("id");
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
+    ClusteredEqualityDeleteWriter<T> writer = new ClusteredEqualityDeleteWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    writer.close();
+    Assert.assertEquals(0, writer.result().deleteFiles().size());
+    Assert.assertEquals(0, writer.result().referencedDataFiles().size());
+    Assert.assertFalse(writer.result().referencesDataFiles());
+
+    writer.close();
+    Assert.assertEquals(0, writer.result().deleteFiles().size());
+    Assert.assertEquals(0, writer.result().referencedDataFiles().size());
+    Assert.assertFalse(writer.result().referencesDataFiles());
+  }
+
+  @Test
+  public void testClusteredEqualityDeleteWriterMultipleSpecs() throws IOException {
+    Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+
+    List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
+    Schema equalityDeleteRowSchema = table.schema().select("id");
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
+
+    // add an unpartitioned data file
+    ImmutableList<T> rows1 = ImmutableList.of(
+        toRow(1, "aaa"),
+        toRow(2, "aaa"),
+        toRow(11, "aaa")
+    );
+    DataFile dataFile1 = writeData(writerFactory, fileFactory, rows1, table.spec(), null);
+    table.newFastAppend()
+        .appendFile(dataFile1)
+        .commit();
+
+    // partition by bucket
+    table.updateSpec()
+        .addField(Expressions.bucket("data", 16))
+        .commit();
+
+    // add a data file partitioned by bucket
+    ImmutableList<T> rows2 = ImmutableList.of(
+        toRow(3, "bbb"),
+        toRow(4, "bbb"),
+        toRow(12, "bbb")
+    );
+    DataFile dataFile2 = writeData(writerFactory, fileFactory, rows2, table.spec(), partitionKey(table.spec(), "bbb"));
+    table.newFastAppend()
+        .appendFile(dataFile2)
+        .commit();
+
+    // partition by data
+    table.updateSpec()
+        .removeField(Expressions.bucket("data", 16))
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    // add a data file partitioned by data
+    ImmutableList<T> rows3 = ImmutableList.of(
+        toRow(5, "ccc"),
+        toRow(13, "ccc")
+    );
+    DataFile dataFile3 = writeData(writerFactory, fileFactory, rows3, table.spec(), partitionKey(table.spec(), "ccc"));
+    table.newFastAppend()
+        .appendFile(dataFile3)
+        .commit();
+
+    ClusteredEqualityDeleteWriter<T> writer = new ClusteredEqualityDeleteWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    PartitionSpec unpartitionedSpec = table.specs().get(0);
+    PartitionSpec bucketSpec = table.specs().get(1);
+    PartitionSpec identitySpec = table.specs().get(2);
+
+    writer.write(toRow(1, "aaa"), unpartitionedSpec, null);
+    writer.write(toRow(2, "aaa"), unpartitionedSpec, null);
+    writer.write(toRow(3, "bbb"), bucketSpec, partitionKey(bucketSpec, "bbb"));
+    writer.write(toRow(4, "bbb"), bucketSpec, partitionKey(bucketSpec, "bbb"));
+    writer.write(toRow(5, "ccc"), identitySpec, partitionKey(identitySpec, "ccc"));
+
+    writer.close();
+
+    DeleteWriteResult result = writer.result();
+    Assert.assertEquals("Must be 3 delete files", 3, result.deleteFiles().size());
+    Assert.assertEquals("Must not reference data files", 0, writer.result().referencedDataFiles().size());

Review comment:
       What's the difference between this check and the one beneath it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715112110



##########
File path: core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.StructLikeMap;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that keeps files for each
+ * seen spec/partition pair open until this writer is closed.
+ * <p>
+ * As opposed to {@link ClusteredWriter}, this writer does not require the incoming records
+ * to be clustered by partition spec and partition as all files are kept open. As a consequence,
+ * this writer may potentially consume substantially more memory compared to {@link ClusteredWriter}.
+ * Use this writer only when clustering by spec/partition is not possible (e.g. streaming).
+ */
+abstract class FanoutWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Map<Integer, Map<StructLike, FileWriter<T, R>>> writers = Maps.newHashMap();
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    FileWriter<T, R> writer = writer(spec, partition);
+    writer.write(row);
+  }
+
+  private FileWriter<T, R> writer(PartitionSpec spec, StructLike partition) {
+    Map<StructLike, FileWriter<T, R>> specWriters = writers.computeIfAbsent(
+        spec.specId(),
+        id -> StructLikeMap.create(spec.partitionType()));
+    FileWriter<T, R> writer = specWriters.get(partition);

Review comment:
       I did more benchmarks for 2.5 million records and 1000k partitions. I was using get/put methods heavily.
   
   ```
   StructLikeMap<String> map = StructLikeMap.create(SPEC.partitionType());
   
   PartitionKey partitionKey = new PartitionKey(SPEC, SCHEMA);
   StructType dataSparkType = SparkSchemaUtil.convert(SCHEMA);
   InternalRowWrapper internalRowWrapper = new InternalRowWrapper(dataSparkType);
   
   for (InternalRow row : rows) {
     partitionKey.partition(internalRowWrapper.wrap(row));
     String res = map.get(partitionKey);
     if (res == null) {
       map.put(StructCopy.copy(partitionKey), "XXX");
    }
   }
   
   blackhole.consume(map);
   ```
   
   Performance numbers came very close both time and memory-wise.
   
   ```
   Benchmark                          Mode  Cnt  Score   Error  Units
   MapBenchmark.hashMap                 ss    5  0.274 ± 0.066   s/op
   MapBenchmark.structLikeMap           ss    5  0.358 ± 0.056   s/op
   ```
   
   Given such a minor difference for 2.5 million records, I'd say we should be good without any optimizations.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r714125785



##########
File path: build.gradle
##########
@@ -115,10 +115,6 @@ subprojects {
     options.encoding = 'UTF-8'
   }
 
-  ext {
-    jmhVersion = '1.21'

Review comment:
       Somehow, this did not seem to have any effect. I had to move it to the jmh block.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715118116



##########
File path: core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructLikeSet;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be clustered by partition spec and by partition within each spec.
+ * <p>
+ * As opposed to {@link FanoutWriter}, this writer keeps at most one file open to reduce
+ * the memory consumption. Prefer using this writer whenever the incoming records can be clustered
+ * by spec/partition.
+ */
+abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Set<Integer> completedSpecIds = Sets.newHashSet();
+
+  private PartitionSpec currentSpec = null;
+  private Comparator<StructLike> partitionComparator = null;
+  private Set<StructLike> completedPartitions = null;
+  private StructLike currentPartition = null;
+  private FileWriter<T, R> currentWriter = null;
+
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    if (!spec.equals(currentSpec)) {
+      if (currentSpec != null) {
+        closeCurrentWriter();
+        completedSpecIds.add(currentSpec.specId());
+        completedPartitions.clear();
+      }
+
+      if (completedSpecIds.contains(spec.specId())) {
+        throw new IllegalStateException("Already closed files for spec: " + spec.specId());
+      }
+
+      StructType partitionType = spec.partitionType();
+
+      currentSpec = spec;
+      partitionComparator = Comparators.forType(partitionType);
+      completedPartitions = StructLikeSet.create(partitionType);
+      // copy the partition key as the key object may be reused
+      currentPartition = StructCopy.copy(partition);
+      currentWriter = newWriter(currentSpec, currentPartition);
+
+    } else if (partition != currentPartition && partitionComparator.compare(partition, currentPartition) != 0) {

Review comment:
       You are right the comparator will throw an NPE but I think `partition != currentPartition` prevents us from calling the comparator whenever at least one value is null. Partition can be null only for unpartitioned specs. As long as we are writing unpartitioned records, `partition != currentPartition` will be false.
   
   Whenever `partition != currentPartition` and at least one of them is null, it means we are changing the spec. If so, it will be handled by the if block above and we won't call the comparator at all.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715135501



##########
File path: core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructLikeSet;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be clustered by partition spec and by partition within each spec.
+ * <p>
+ * As opposed to {@link FanoutWriter}, this writer keeps at most one file open to reduce
+ * the memory consumption. Prefer using this writer whenever the incoming records can be clustered
+ * by spec/partition.
+ */
+abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Set<Integer> completedSpecIds = Sets.newHashSet();
+
+  private PartitionSpec currentSpec = null;
+  private Comparator<StructLike> partitionComparator = null;
+  private Set<StructLike> completedPartitions = null;
+  private StructLike currentPartition = null;
+  private FileWriter<T, R> currentWriter = null;
+
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    if (!spec.equals(currentSpec)) {
+      if (currentSpec != null) {
+        closeCurrentWriter();
+        completedSpecIds.add(currentSpec.specId());
+        completedPartitions.clear();
+      }
+
+      if (completedSpecIds.contains(spec.specId())) {
+        throw new IllegalStateException("Already closed files for spec: " + spec.specId());

Review comment:
       Also better to use a string representation of the spec rather than the spec ID.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715166594



##########
File path: core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * An equality delete writer capable of writing to multiple specs and partitions that requires
+ * the incoming delete records to be properly clustered by partition spec and by partition within each spec.
+ */
+public class ClusteredEqualityDeleteWriter<T> extends ClusteredWriter<T, DeleteWriteResult> {
+
+  private final FileWriterFactory<T> writerFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final FileFormat fileFormat;
+  private final long targetFileSizeInBytes;
+  private final List<DeleteFile> deleteFiles;
+
+  public ClusteredEqualityDeleteWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
+                                       FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) {
+    this.writerFactory = writerFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.fileFormat = fileFormat;
+    this.targetFileSizeInBytes = targetFileSizeInBytes;
+    this.deleteFiles = Lists.newArrayList();
+  }
+
+  @Override
+  protected FileWriter<T, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
+    // TODO: support ORC rolling writers

Review comment:
       Thank you!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715161799



##########
File path: core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructLikeSet;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be clustered by partition spec and by partition within each spec.
+ * <p>
+ * As opposed to {@link FanoutWriter}, this writer keeps at most one file open to reduce
+ * the memory consumption. Prefer using this writer whenever the incoming records can be clustered
+ * by spec/partition.
+ */
+abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Set<Integer> completedSpecIds = Sets.newHashSet();
+
+  private PartitionSpec currentSpec = null;
+  private Comparator<StructLike> partitionComparator = null;
+  private Set<StructLike> completedPartitions = null;
+  private StructLike currentPartition = null;
+  private FileWriter<T, R> currentWriter = null;
+
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    if (!spec.equals(currentSpec)) {
+      if (currentSpec != null) {
+        closeCurrentWriter();
+        completedSpecIds.add(currentSpec.specId());
+        completedPartitions.clear();
+      }
+
+      if (completedSpecIds.contains(spec.specId())) {
+        throw new IllegalStateException("Already closed files for spec: " + spec.specId());
+      }
+
+      StructType partitionType = spec.partitionType();
+
+      currentSpec = spec;
+      partitionComparator = Comparators.forType(partitionType);
+      completedPartitions = StructLikeSet.create(partitionType);
+      // copy the partition key as the key object may be reused
+      currentPartition = StructCopy.copy(partition);
+      currentWriter = newWriter(currentSpec, currentPartition);
+
+    } else if (partition != currentPartition && partitionComparator.compare(partition, currentPartition) != 0) {

Review comment:
       @rdblue, in the use case you mention, this if branch won't be invoked as the one above it will work. This if branch is only tested when we wrote at least a record and the new record belongs to the same spec as the previous record. That means if one partition is null, the second must be too, so `partition != currentPartition` is false and the comparator is not used.
   
   This is something that will be invoked for every row so I would like to avoid any extra checks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r714316341



##########
File path: data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
##########
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestPartitioningWriters<T> extends WriterTestBase<T> {
+
+  @Parameterized.Parameters(name = "FileFormat={0}")
+  public static Object[] parameters() {
+    return new Object[][] {
+        new Object[]{FileFormat.AVRO},
+        new Object[]{FileFormat.PARQUET},
+        new Object[]{FileFormat.ORC},
+    };
+  }
+
+  private static final int TABLE_FORMAT_VERSION = 2;
+  private static final long TARGET_FILE_SIZE = 128L * 1024 * 1024;
+
+  private final FileFormat fileFormat;
+  private OutputFileFactory fileFactory = null;
+
+  public TestPartitioningWriters(FileFormat fileFormat) {
+    super(TABLE_FORMAT_VERSION);
+    this.fileFormat = fileFormat;
+  }
+
+  protected abstract StructLikeSet toSet(Iterable<T> records);
+
+  protected FileFormat format() {
+    return fileFormat;
+  }
+
+  @Before
+  public void setupTable() throws Exception {
+    this.tableDir = temp.newFolder();
+    Assert.assertTrue(tableDir.delete()); // created during table creation
+
+    this.metadataDir = new File(tableDir, "metadata");
+    this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+    this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build();
+  }
+
+  @Test
+  public void testClusteredDataWriterNoRecords() throws IOException {
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    writer.close();

Review comment:
       To make sure it is idempotent. Spark may call `close` multiple times.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#issuecomment-925099414


   I went ahead and added benchmarks to this PR.
   
   ```
   Benchmark                                                                Mode  Cnt   Score   Error  Units
   ParquetWritersBenchmark.writePartitionedClusteredDataWriter                ss    5  10.076 ± 0.261   s/op
   ParquetWritersBenchmark.writePartitionedLegacyDataWriter                   ss    5  10.124 ± 0.500   s/op
   
   ParquetWritersBenchmark.writePartitionedFanoutDataWriter                   ss    5  10.082 ± 0.371   s/op
   ParquetWritersBenchmark.writePartitionedLegacyFanoutDataWriter             ss    5   9.971 ± 0.322   s/op
   
   ParquetWritersBenchmark.writeUnpartitionedClusteredDataWriter              ss    5   9.075 ± 0.458   s/op
   ParquetWritersBenchmark.writeUnpartitionedLegacyDataWriter                 ss    5   8.981 ± 0.292   s/op
   
   ParquetWritersBenchmark.writePartitionedClusteredEqualityDeleteWriter      ss    5  10.136 ± 0.389   s/op
   ParquetWritersBenchmark.writeUnpartitionedClusteredPositionDeleteWriter    ss    5   7.462 ± 0.690   s/op
   ```
   ```
   Benchmark                                                             Mode  Cnt   Score   Error  Units
   AvroWritersBenchmark.writePartitionedClusteredDataWriter                ss    5  11.114 ± 0.108   s/op
   AvroWritersBenchmark.writePartitionedLegacyDataWriter                   ss    5  11.094 ± 0.422   s/op
   
   AvroWritersBenchmark.writePartitionedFanoutDataWriter                   ss    5  11.223 ± 0.316   s/op
   AvroWritersBenchmark.writePartitionedLegacyFanoutDataWriter             ss    5  11.029 ± 0.283   s/op
   
   AvroWritersBenchmark.writeUnpartitionedClusteredDataWriter              ss    5  10.716 ± 0.295   s/op
   AvroWritersBenchmark.writeUnpartitionedLegacyDataWriter                 ss    5  10.602 ± 0.509   s/op
   
   AvroWritersBenchmark.writePartitionedClusteredEqualityDeleteWriter      ss    5  10.115 ± 0.215   s/op
   AvroWritersBenchmark.writeUnpartitionedClusteredPositionDeleteWriter    ss    5   7.447 ± 0.526   s/op
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r714602793



##########
File path: core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.StructLikeMap;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that keeps files for each
+ * seen spec/partition pair open until this writer is closed.
+ * <p>
+ * As opposed to {@link ClusteredWriter}, this writer does not require the incoming records
+ * to be clustered by partition spec and partition as all files are kept open. As a consequence,
+ * this writer may potentially consume substantially more memory compared to {@link ClusteredWriter}.
+ * Use this writer only when clustering by spec/partition is not possible (e.g. streaming).
+ */
+abstract class FanoutWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Map<Integer, Map<StructLike, FileWriter<T, R>>> writers = Maps.newHashMap();
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    FileWriter<T, R> writer = writer(spec, partition);
+    writer.write(row);
+  }
+
+  private FileWriter<T, R> writer(PartitionSpec spec, StructLike partition) {
+    Map<StructLike, FileWriter<T, R>> specWriters = writers.computeIfAbsent(

Review comment:
       I think it's okay now,  don't have to address that comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715139837



##########
File path: core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructLikeSet;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be clustered by partition spec and by partition within each spec.
+ * <p>
+ * As opposed to {@link FanoutWriter}, this writer keeps at most one file open to reduce
+ * the memory consumption. Prefer using this writer whenever the incoming records can be clustered
+ * by spec/partition.
+ */
+abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Set<Integer> completedSpecIds = Sets.newHashSet();
+
+  private PartitionSpec currentSpec = null;
+  private Comparator<StructLike> partitionComparator = null;
+  private Set<StructLike> completedPartitions = null;
+  private StructLike currentPartition = null;
+  private FileWriter<T, R> currentWriter = null;
+
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    if (!spec.equals(currentSpec)) {
+      if (currentSpec != null) {
+        closeCurrentWriter();
+        completedSpecIds.add(currentSpec.specId());
+        completedPartitions.clear();
+      }
+
+      if (completedSpecIds.contains(spec.specId())) {
+        throw new IllegalStateException("Already closed files for spec: " + spec.specId());
+      }
+
+      StructType partitionType = spec.partitionType();
+
+      currentSpec = spec;
+      partitionComparator = Comparators.forType(partitionType);
+      completedPartitions = StructLikeSet.create(partitionType);
+      // copy the partition key as the key object may be reused
+      currentPartition = StructCopy.copy(partition);
+      currentWriter = newWriter(currentSpec, currentPartition);
+
+    } else if (partition != currentPartition && partitionComparator.compare(partition, currentPartition) != 0) {

Review comment:
       I agree. If `currentPartition` is null (as it is initialized) and a non-null partition is passed in, then the first check is true and the second check runs, which will pass both to the comparator. If we don't think that the comparator can handle null then we should update this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715167332



##########
File path: core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructLikeSet;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be clustered by partition spec and by partition within each spec.
+ * <p>
+ * As opposed to {@link FanoutWriter}, this writer keeps at most one file open to reduce
+ * the memory consumption. Prefer using this writer whenever the incoming records can be clustered
+ * by spec/partition.
+ */
+abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Set<Integer> completedSpecIds = Sets.newHashSet();
+
+  private PartitionSpec currentSpec = null;
+  private Comparator<StructLike> partitionComparator = null;
+  private Set<StructLike> completedPartitions = null;
+  private StructLike currentPartition = null;
+  private FileWriter<T, R> currentWriter = null;
+
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    if (!spec.equals(currentSpec)) {
+      if (currentSpec != null) {
+        closeCurrentWriter();
+        completedSpecIds.add(currentSpec.specId());
+        completedPartitions.clear();
+      }
+
+      if (completedSpecIds.contains(spec.specId())) {
+        throw new IllegalStateException("Already closed files for spec: " + spec.specId());
+      }
+
+      StructType partitionType = spec.partitionType();
+
+      currentSpec = spec;
+      partitionComparator = Comparators.forType(partitionType);
+      completedPartitions = StructLikeSet.create(partitionType);
+      // copy the partition key as the key object may be reused
+      currentPartition = StructCopy.copy(partition);
+      currentWriter = newWriter(currentSpec, currentPartition);
+
+    } else if (partition != currentPartition && partitionComparator.compare(partition, currentPartition) != 0) {
+      closeCurrentWriter();
+      completedPartitions.add(currentPartition);
+
+      if (completedPartitions.contains(partition)) {
+        String path = spec.partitionToPath(partition);
+        String errMsg = String.format("Already closed files for partition '%s' in spec %d", path, spec.specId());
+        throw new IllegalStateException(errMsg);

Review comment:
       I am not a big fan of splitting lines so I added an extra variable. This place changed a little bit. Let me know what you currently think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715165493



##########
File path: core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructLikeSet;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be clustered by partition spec and by partition within each spec.
+ * <p>
+ * As opposed to {@link FanoutWriter}, this writer keeps at most one file open to reduce
+ * the memory consumption. Prefer using this writer whenever the incoming records can be clustered
+ * by spec/partition.
+ */
+abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Set<Integer> completedSpecIds = Sets.newHashSet();
+
+  private PartitionSpec currentSpec = null;
+  private Comparator<StructLike> partitionComparator = null;
+  private Set<StructLike> completedPartitions = null;
+  private StructLike currentPartition = null;
+  private FileWriter<T, R> currentWriter = null;
+
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    if (!spec.equals(currentSpec)) {
+      if (currentSpec != null) {
+        closeCurrentWriter();
+        completedSpecIds.add(currentSpec.specId());
+        completedPartitions.clear();
+      }
+
+      if (completedSpecIds.contains(spec.specId())) {
+        throw new IllegalStateException("Already closed files for spec: " + spec.specId());
+      }
+
+      StructType partitionType = spec.partitionType();
+
+      currentSpec = spec;
+      partitionComparator = Comparators.forType(partitionType);
+      completedPartitions = StructLikeSet.create(partitionType);
+      // copy the partition key as the key object may be reused
+      currentPartition = StructCopy.copy(partition);
+      currentWriter = newWriter(currentSpec, currentPartition);
+
+    } else if (partition != currentPartition && partitionComparator.compare(partition, currentPartition) != 0) {
+      closeCurrentWriter();
+      completedPartitions.add(currentPartition);
+
+      if (completedPartitions.contains(partition)) {
+        String path = spec.partitionToPath(partition);
+        String errMsg = String.format("Already closed files for partition '%s' in spec %d", path, spec.specId());
+        throw new IllegalStateException(errMsg);
+      }
+
+      // copy the partition key as the key object may be reused
+      currentPartition = StructCopy.copy(partition);
+      currentWriter = newWriter(currentSpec, currentPartition);
+    }
+
+    currentWriter.write(row);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!closed) {
+      closeCurrentWriter();
+      this.closed = true;
+    }
+  }
+
+  private void closeCurrentWriter() throws IOException {
+    if (currentWriter != null) {
+      currentWriter.close();
+
+      addResult(currentWriter.result());
+
+      this.currentWriter = null;
+    }
+  }
+
+  @Override
+  public final R result() {
+    Preconditions.checkState(closed, "Cannot get result from unclosed writer");
+    return aggregatedResult();
+  }
+
+  protected EncryptedOutputFile newOutputFile(OutputFileFactory fileFactory, PartitionSpec spec, StructLike partition) {
+    return partition == null ? fileFactory.newOutputFile() : fileFactory.newOutputFile(spec, partition);

Review comment:
       Added.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715114350



##########
File path: data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
##########
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestPartitioningWriters<T> extends WriterTestBase<T> {
+
+  @Parameterized.Parameters(name = "FileFormat={0}")
+  public static Object[] parameters() {
+    return new Object[][] {
+        new Object[]{FileFormat.AVRO},
+        new Object[]{FileFormat.PARQUET},
+        new Object[]{FileFormat.ORC},
+    };
+  }
+
+  private static final int TABLE_FORMAT_VERSION = 2;
+  private static final long TARGET_FILE_SIZE = 128L * 1024 * 1024;
+
+  private final FileFormat fileFormat;
+  private OutputFileFactory fileFactory = null;
+
+  public TestPartitioningWriters(FileFormat fileFormat) {
+    super(TABLE_FORMAT_VERSION);
+    this.fileFormat = fileFormat;
+  }
+
+  protected abstract StructLikeSet toSet(Iterable<T> records);
+
+  protected FileFormat format() {
+    return fileFormat;
+  }
+
+  @Before
+  public void setupTable() throws Exception {
+    this.tableDir = temp.newFolder();
+    Assert.assertTrue(tableDir.delete()); // created during table creation
+
+    this.metadataDir = new File(tableDir, "metadata");
+    this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+    this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build();
+  }
+
+  @Test
+  public void testClusteredDataWriterNoRecords() throws IOException {
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    writer.close();
+    Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+
+    writer.close();
+    Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+  }
+
+  @Test
+  public void testClusteredDataWriterMultiplePartitions() throws IOException {
+    table.updateSpec()
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    PartitionSpec spec = table.spec();
+
+    writer.write(toRow(1, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(2, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(3, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(4, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(5, "ccc"), spec, partitionKey(spec, "ccc"));
+
+    writer.close();
+
+    DataWriteResult result = writer.result();
+    Assert.assertEquals("Must be 3 data files", 3, result.dataFiles().size());
+
+    RowDelta rowDelta = table.newRowDelta();
+    result.dataFiles().forEach(rowDelta::addRows);
+    rowDelta.commit();
+
+    List<T> expectedRows = ImmutableList.of(
+        toRow(1, "aaa"),
+        toRow(2, "aaa"),
+        toRow(3, "bbb"),
+        toRow(4, "bbb"),
+        toRow(5, "ccc")
+    );
+    Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
+  }
+
+  @Test
+  public void testClusteredDataWriterOutOfOrderPartitions() throws IOException {
+    table.updateSpec()
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    PartitionSpec spec = table.spec();
+
+    writer.write(toRow(1, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(2, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(3, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(4, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(5, "ccc"), spec, partitionKey(spec, "ccc"));
+
+    AssertHelpers.assertThrows("Should fail to write out of order partitions",
+        IllegalStateException.class, "Already closed files for partition",
+        () -> {
+          try {
+            writer.write(toRow(6, "aaa"), spec, partitionKey(spec, "aaa"));
+          } catch (IOException e) {
+            throw new UncheckedIOException(e);
+          }
+        });
+
+    writer.close();
+  }
+
+  @Test
+  public void testClusteredEqualityDeleteWriterNoRecords() throws IOException {
+    Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+
+    List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
+    Schema equalityDeleteRowSchema = table.schema().select("id");
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
+    ClusteredEqualityDeleteWriter<T> writer = new ClusteredEqualityDeleteWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    writer.close();
+    Assert.assertEquals(0, writer.result().deleteFiles().size());
+    Assert.assertEquals(0, writer.result().referencedDataFiles().size());
+    Assert.assertFalse(writer.result().referencesDataFiles());
+
+    writer.close();
+    Assert.assertEquals(0, writer.result().deleteFiles().size());
+    Assert.assertEquals(0, writer.result().referencedDataFiles().size());
+    Assert.assertFalse(writer.result().referencesDataFiles());
+  }
+
+  @Test
+  public void testClusteredEqualityDeleteWriterMultipleSpecs() throws IOException {
+    Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+
+    List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
+    Schema equalityDeleteRowSchema = table.schema().select("id");
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
+
+    // add an unpartitioned data file
+    ImmutableList<T> rows1 = ImmutableList.of(
+        toRow(1, "aaa"),
+        toRow(2, "aaa"),
+        toRow(11, "aaa")
+    );
+    DataFile dataFile1 = writeData(writerFactory, fileFactory, rows1, table.spec(), null);
+    table.newFastAppend()
+        .appendFile(dataFile1)
+        .commit();
+
+    // partition by bucket
+    table.updateSpec()
+        .addField(Expressions.bucket("data", 16))
+        .commit();
+
+    // add a data file partitioned by bucket
+    ImmutableList<T> rows2 = ImmutableList.of(
+        toRow(3, "bbb"),
+        toRow(4, "bbb"),
+        toRow(12, "bbb")
+    );
+    DataFile dataFile2 = writeData(writerFactory, fileFactory, rows2, table.spec(), partitionKey(table.spec(), "bbb"));
+    table.newFastAppend()
+        .appendFile(dataFile2)
+        .commit();
+
+    // partition by data
+    table.updateSpec()
+        .removeField(Expressions.bucket("data", 16))
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    // add a data file partitioned by data
+    ImmutableList<T> rows3 = ImmutableList.of(
+        toRow(5, "ccc"),
+        toRow(13, "ccc")
+    );
+    DataFile dataFile3 = writeData(writerFactory, fileFactory, rows3, table.spec(), partitionKey(table.spec(), "ccc"));
+    table.newFastAppend()
+        .appendFile(dataFile3)
+        .commit();
+
+    ClusteredEqualityDeleteWriter<T> writer = new ClusteredEqualityDeleteWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    PartitionSpec unpartitionedSpec = table.specs().get(0);
+    PartitionSpec bucketSpec = table.specs().get(1);
+    PartitionSpec identitySpec = table.specs().get(2);
+
+    writer.write(toRow(1, "aaa"), unpartitionedSpec, null);
+    writer.write(toRow(2, "aaa"), unpartitionedSpec, null);
+    writer.write(toRow(3, "bbb"), bucketSpec, partitionKey(bucketSpec, "bbb"));
+    writer.write(toRow(4, "bbb"), bucketSpec, partitionKey(bucketSpec, "bbb"));
+    writer.write(toRow(5, "ccc"), identitySpec, partitionKey(identitySpec, "ccc"));
+
+    writer.close();
+
+    DeleteWriteResult result = writer.result();
+    Assert.assertEquals("Must be 3 delete files", 3, result.deleteFiles().size());
+    Assert.assertEquals("Must not reference data files", 0, writer.result().referencedDataFiles().size());

Review comment:
       Just checking `referencesDataFiles` is consistent with the number of referenced data files reported.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#issuecomment-924435749


   cc @openinx @stevenzwu @RussellSpitzer @rdblue @kbendick @karuppayya @flyrain @pvary @jackye1995 @yyanyy @szehon-ho @rymurr @jun-he


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715133810



##########
File path: core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructLikeSet;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be clustered by partition spec and by partition within each spec.
+ * <p>
+ * As opposed to {@link FanoutWriter}, this writer keeps at most one file open to reduce
+ * the memory consumption. Prefer using this writer whenever the incoming records can be clustered
+ * by spec/partition.
+ */
+abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Set<Integer> completedSpecIds = Sets.newHashSet();
+
+  private PartitionSpec currentSpec = null;
+  private Comparator<StructLike> partitionComparator = null;
+  private Set<StructLike> completedPartitions = null;
+  private StructLike currentPartition = null;
+  private FileWriter<T, R> currentWriter = null;
+
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    if (!spec.equals(currentSpec)) {
+      if (currentSpec != null) {
+        closeCurrentWriter();
+        completedSpecIds.add(currentSpec.specId());
+        completedPartitions.clear();
+      }
+
+      if (completedSpecIds.contains(spec.specId())) {
+        throw new IllegalStateException("Already closed files for spec: " + spec.specId());

Review comment:
       Yeah, I agree with @openinx here. This is a good opportunity to improve that error message. Now that this is the clustered writer, we can say that incoming records need to be clustered by partition. You can use `PartitionSet` for this so it's really easy to track.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715168388



##########
File path: data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
##########
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestPartitioningWriters<T> extends WriterTestBase<T> {
+
+  @Parameterized.Parameters(name = "FileFormat={0}")
+  public static Object[] parameters() {
+    return new Object[][] {
+        new Object[]{FileFormat.AVRO},
+        new Object[]{FileFormat.PARQUET},
+        new Object[]{FileFormat.ORC},
+    };
+  }
+
+  private static final int TABLE_FORMAT_VERSION = 2;
+  private static final long TARGET_FILE_SIZE = 128L * 1024 * 1024;
+
+  private final FileFormat fileFormat;
+  private OutputFileFactory fileFactory = null;
+
+  public TestPartitioningWriters(FileFormat fileFormat) {
+    super(TABLE_FORMAT_VERSION);
+    this.fileFormat = fileFormat;
+  }
+
+  protected abstract StructLikeSet toSet(Iterable<T> records);
+
+  protected FileFormat format() {
+    return fileFormat;
+  }
+
+  @Before
+  public void setupTable() throws Exception {
+    this.tableDir = temp.newFolder();
+    Assert.assertTrue(tableDir.delete()); // created during table creation
+
+    this.metadataDir = new File(tableDir, "metadata");
+    this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+    this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build();
+  }
+
+  @Test
+  public void testClusteredDataWriterNoRecords() throws IOException {
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    writer.close();
+    Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+
+    writer.close();
+    Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+  }
+
+  @Test
+  public void testClusteredDataWriterMultiplePartitions() throws IOException {
+    table.updateSpec()
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    PartitionSpec spec = table.spec();
+
+    writer.write(toRow(1, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(2, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(3, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(4, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(5, "ccc"), spec, partitionKey(spec, "ccc"));
+
+    writer.close();
+
+    DataWriteResult result = writer.result();
+    Assert.assertEquals("Must be 3 data files", 3, result.dataFiles().size());
+
+    RowDelta rowDelta = table.newRowDelta();
+    result.dataFiles().forEach(rowDelta::addRows);
+    rowDelta.commit();
+
+    List<T> expectedRows = ImmutableList.of(
+        toRow(1, "aaa"),
+        toRow(2, "aaa"),
+        toRow(3, "bbb"),
+        toRow(4, "bbb"),
+        toRow(5, "ccc")
+    );
+    Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
+  }
+
+  @Test
+  public void testClusteredDataWriterOutOfOrderPartitions() throws IOException {
+    table.updateSpec()
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    PartitionSpec spec = table.spec();
+
+    writer.write(toRow(1, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(2, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(3, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(4, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(5, "ccc"), spec, partitionKey(spec, "ccc"));
+
+    AssertHelpers.assertThrows("Should fail to write out of order partitions",
+        IllegalStateException.class, "Already closed files for partition",
+        () -> {
+          try {
+            writer.write(toRow(6, "aaa"), spec, partitionKey(spec, "aaa"));
+          } catch (IOException e) {

Review comment:
       Somehow, I assumed our delete writers throw one. I'll update `FileWriter` and `PartitioningWriter` interfaces.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715166709



##########
File path: core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructLikeSet;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be clustered by partition spec and by partition within each spec.
+ * <p>
+ * As opposed to {@link FanoutWriter}, this writer keeps at most one file open to reduce
+ * the memory consumption. Prefer using this writer whenever the incoming records can be clustered
+ * by spec/partition.
+ */
+abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Set<Integer> completedSpecIds = Sets.newHashSet();
+
+  private PartitionSpec currentSpec = null;
+  private Comparator<StructLike> partitionComparator = null;
+  private Set<StructLike> completedPartitions = null;
+  private StructLike currentPartition = null;
+  private FileWriter<T, R> currentWriter = null;
+
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    if (!spec.equals(currentSpec)) {
+      if (currentSpec != null) {
+        closeCurrentWriter();
+        completedSpecIds.add(currentSpec.specId());
+        completedPartitions.clear();
+      }
+
+      if (completedSpecIds.contains(spec.specId())) {
+        throw new IllegalStateException("Already closed files for spec: " + spec.specId());
+      }
+
+      StructType partitionType = spec.partitionType();
+
+      currentSpec = spec;

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r714962375



##########
File path: core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.StructLikeMap;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that keeps files for each
+ * seen spec/partition pair open until this writer is closed.
+ * <p>
+ * As opposed to {@link ClusteredWriter}, this writer does not require the incoming records
+ * to be clustered by partition spec and partition as all files are kept open. As a consequence,
+ * this writer may potentially consume substantially more memory compared to {@link ClusteredWriter}.
+ * Use this writer only when clustering by spec/partition is not possible (e.g. streaming).
+ */
+abstract class FanoutWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Map<Integer, Map<StructLike, FileWriter<T, R>>> writers = Maps.newHashMap();
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    FileWriter<T, R> writer = writer(spec, partition);
+    writer.write(row);
+  }
+
+  private FileWriter<T, R> writer(PartitionSpec spec, StructLike partition) {
+    Map<StructLike, FileWriter<T, R>> specWriters = writers.computeIfAbsent(
+        spec.specId(),
+        id -> StructLikeMap.create(spec.partitionType()));
+    FileWriter<T, R> writer = specWriters.get(partition);

Review comment:
       If I am not mistaken, we only use the fanout writer for partitioned tables. Even in the old implementation.
   
   You are right about this being the place where we need attention. Like I mentioned [here](https://github.com/apache/iceberg/pull/3164#discussion_r714123188), we have an extra `computeIfAbsent` call and using `StructLikeMap` instead of a regular map with `PartitionKey`. While the performance hit seems to be negligible according to benchmark results I posted, I'd up to optimize this as much as possible.
   
   One thing to consider is the performance of `equals` and `hashCode` in `StructLikeWrapper` vs `PartitionKey`. It is relatively simple and efficient in `PartitionKey` where we compare/iterate through object array. In the wrapper, these methods are more involved but don't seem drastically expensive.
   
   One optimization idea is to introduce a cache of Comparators and JavaHash objects we use in the wrapper. At this point, we will create a comparator and a java hash for every partition we add to `StructLikeMap`. Even if we write to 1k partitions, I am not sure the difference is noticeable.
   
   Another optimization idea can be to introduce a new interface to indicate when a StructLike is backed by an array of values. If two structs implement that interface, we can just compare the arrays in `StructLikeWrapper`.
   
   I am going to do a separate benchmark for `HashMap` with `PartitionKey` and `StructLikeMap` with `PartitionKey`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r714315650



##########
File path: core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.StructLikeMap;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that keeps files for each
+ * seen spec/partition pair open until this writer is closed.
+ * <p>
+ * As opposed to {@link ClusteredWriter}, this writer does not require the incoming records
+ * to be clustered by partition spec and partition as all files are kept open. As a consequence,
+ * this writer may potentially consume substantially more memory compared to {@link ClusteredWriter}.
+ * Use this writer only when clustering by spec/partition is not possible (e.g. streaming).
+ */
+abstract class FanoutWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Map<Integer, Map<StructLike, FileWriter<T, R>>> writers = Maps.newHashMap();

Review comment:
       Sounds safer. I'll change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi merged pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi merged pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715169858



##########
File path: data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
##########
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestPartitioningWriters<T> extends WriterTestBase<T> {
+
+  @Parameterized.Parameters(name = "FileFormat={0}")
+  public static Object[] parameters() {
+    return new Object[][] {
+        new Object[]{FileFormat.AVRO},
+        new Object[]{FileFormat.PARQUET},
+        new Object[]{FileFormat.ORC},
+    };
+  }
+
+  private static final int TABLE_FORMAT_VERSION = 2;
+  private static final long TARGET_FILE_SIZE = 128L * 1024 * 1024;
+
+  private final FileFormat fileFormat;
+  private OutputFileFactory fileFactory = null;
+
+  public TestPartitioningWriters(FileFormat fileFormat) {
+    super(TABLE_FORMAT_VERSION);
+    this.fileFormat = fileFormat;
+  }
+
+  protected abstract StructLikeSet toSet(Iterable<T> records);
+
+  protected FileFormat format() {
+    return fileFormat;
+  }
+
+  @Before
+  public void setupTable() throws Exception {
+    this.tableDir = temp.newFolder();
+    Assert.assertTrue(tableDir.delete()); // created during table creation
+
+    this.metadataDir = new File(tableDir, "metadata");
+    this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+    this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build();
+  }
+
+  @Test
+  public void testClusteredDataWriterNoRecords() throws IOException {
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    writer.close();
+    Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+
+    writer.close();
+    Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+  }
+
+  @Test
+  public void testClusteredDataWriterMultiplePartitions() throws IOException {
+    table.updateSpec()
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    PartitionSpec spec = table.spec();
+
+    writer.write(toRow(1, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(2, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(3, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(4, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(5, "ccc"), spec, partitionKey(spec, "ccc"));
+
+    writer.close();
+
+    DataWriteResult result = writer.result();
+    Assert.assertEquals("Must be 3 data files", 3, result.dataFiles().size());
+
+    RowDelta rowDelta = table.newRowDelta();
+    result.dataFiles().forEach(rowDelta::addRows);
+    rowDelta.commit();
+
+    List<T> expectedRows = ImmutableList.of(
+        toRow(1, "aaa"),
+        toRow(2, "aaa"),
+        toRow(3, "bbb"),
+        toRow(4, "bbb"),
+        toRow(5, "ccc")
+    );
+    Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
+  }
+
+  @Test
+  public void testClusteredDataWriterOutOfOrderPartitions() throws IOException {
+    table.updateSpec()
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    PartitionSpec spec = table.spec();
+
+    writer.write(toRow(1, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(2, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(3, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(4, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(5, "ccc"), spec, partitionKey(spec, "ccc"));
+
+    AssertHelpers.assertThrows("Should fail to write out of order partitions",
+        IllegalStateException.class, "Already closed files for partition",
+        () -> {
+          try {
+            writer.write(toRow(6, "aaa"), spec, partitionKey(spec, "aaa"));
+          } catch (IOException e) {

Review comment:
       Okay, I think I remember now. Classes like `PartitioningWriter` close other writers and `close` throws an exception. I'll need to wrap such places and rethrow `UncheckedIOException`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715165241



##########
File path: core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructLikeSet;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be clustered by partition spec and by partition within each spec.
+ * <p>
+ * As opposed to {@link FanoutWriter}, this writer keeps at most one file open to reduce
+ * the memory consumption. Prefer using this writer whenever the incoming records can be clustered
+ * by spec/partition.
+ */
+abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Set<Integer> completedSpecIds = Sets.newHashSet();
+
+  private PartitionSpec currentSpec = null;
+  private Comparator<StructLike> partitionComparator = null;
+  private Set<StructLike> completedPartitions = null;
+  private StructLike currentPartition = null;
+  private FileWriter<T, R> currentWriter = null;
+
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    if (!spec.equals(currentSpec)) {
+      if (currentSpec != null) {
+        closeCurrentWriter();
+        completedSpecIds.add(currentSpec.specId());
+        completedPartitions.clear();
+      }
+
+      if (completedSpecIds.contains(spec.specId())) {
+        throw new IllegalStateException("Already closed files for spec: " + spec.specId());

Review comment:
       The new exception looks like this:
   
   ```
   java.lang.IllegalStateException: Incoming records violate the writer assumption that records are clustered by spec and by partition within each spec. Either cluster the incoming records or switch to fanout writers.
   Encountered records that belong to already closed files:
   partition 'data=aaa' in spec [
     1000: data: identity(2)
   ]
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r714214274



##########
File path: core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructLikeSet;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be clustered by partition spec and partition.
+ * <p>
+ * As opposed to {@link FanoutWriter}, this writer keeps at most one file open to reduce
+ * the memory consumption. Prefer using this writer whenever the incoming records can be clustered
+ * by spec/partition.
+ */
+abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Set<Integer> completedSpecIds = Sets.newHashSet();
+
+  private PartitionSpec currentSpec = null;
+  private Comparator<StructLike> partitionComparator = null;
+  private Set<StructLike> completedPartitions = null;
+  private StructLike currentPartition = null;
+  private FileWriter<T, R> currentWriter = null;
+
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    if (!spec.equals(currentSpec)) {
+      if (currentSpec != null) {
+        closeCurrentWriter();
+        completedSpecIds.add(currentSpec.specId());
+        completedPartitions.clear();
+      }
+
+      if (completedSpecIds.contains(spec.specId())) {
+        throw new IllegalStateException("Already closed files for spec: " + spec.specId());
+      }
+
+      StructType partitionType = spec.partitionType();
+
+      currentSpec = spec;
+      partitionComparator = Comparators.forType(partitionType);
+      completedPartitions = StructLikeSet.create(partitionType);
+      // copy the partition key as the key object may be reused
+      currentPartition = StructCopy.copy(partition);
+      currentWriter = newWriter(currentSpec, currentPartition);
+
+    } else if (partition != currentPartition && partitionComparator.compare(partition, currentPartition) != 0) {
+      closeCurrentWriter();
+      completedPartitions.add(currentPartition);
+
+      if (completedPartitions.contains(partition)) {
+        String path = spec.partitionToPath(partition);
+        throw new IllegalStateException("Already closed files for partition: " + path);

Review comment:
       Not sure if this will come up, but now we probably want to say "Already closed files for partition X in spec Y"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715166505



##########
File path: data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
##########
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestPartitioningWriters<T> extends WriterTestBase<T> {
+
+  @Parameterized.Parameters(name = "FileFormat={0}")
+  public static Object[] parameters() {
+    return new Object[][] {
+        new Object[]{FileFormat.AVRO},
+        new Object[]{FileFormat.PARQUET},
+        new Object[]{FileFormat.ORC},
+    };
+  }
+
+  private static final int TABLE_FORMAT_VERSION = 2;
+  private static final long TARGET_FILE_SIZE = 128L * 1024 * 1024;
+
+  private final FileFormat fileFormat;
+  private OutputFileFactory fileFactory = null;
+
+  public TestPartitioningWriters(FileFormat fileFormat) {
+    super(TABLE_FORMAT_VERSION);
+    this.fileFormat = fileFormat;
+  }
+
+  protected abstract StructLikeSet toSet(Iterable<T> records);
+
+  protected FileFormat format() {
+    return fileFormat;
+  }
+
+  @Before
+  public void setupTable() throws Exception {
+    this.tableDir = temp.newFolder();
+    Assert.assertTrue(tableDir.delete()); // created during table creation
+
+    this.metadataDir = new File(tableDir, "metadata");
+    this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+    this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build();
+  }
+
+  @Test
+  public void testClusteredDataWriterNoRecords() throws IOException {
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    writer.close();
+    Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+
+    writer.close();
+    Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+  }
+
+  @Test
+  public void testClusteredDataWriterMultiplePartitions() throws IOException {
+    table.updateSpec()
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    PartitionSpec spec = table.spec();
+
+    writer.write(toRow(1, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(2, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(3, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(4, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(5, "ccc"), spec, partitionKey(spec, "ccc"));
+
+    writer.close();
+
+    DataWriteResult result = writer.result();
+    Assert.assertEquals("Must be 3 data files", 3, result.dataFiles().size());
+
+    RowDelta rowDelta = table.newRowDelta();
+    result.dataFiles().forEach(rowDelta::addRows);
+    rowDelta.commit();
+
+    List<T> expectedRows = ImmutableList.of(
+        toRow(1, "aaa"),
+        toRow(2, "aaa"),
+        toRow(3, "bbb"),
+        toRow(4, "bbb"),
+        toRow(5, "ccc")
+    );
+    Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
+  }
+
+  @Test
+  public void testClusteredDataWriterOutOfOrderPartitions() throws IOException {
+    table.updateSpec()
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    PartitionSpec spec = table.spec();
+
+    writer.write(toRow(1, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(2, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(3, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(4, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(5, "ccc"), spec, partitionKey(spec, "ccc"));
+
+    AssertHelpers.assertThrows("Should fail to write out of order partitions",
+        IllegalStateException.class, "Already closed files for partition",
+        () -> {
+          try {
+            writer.write(toRow(6, "aaa"), spec, partitionKey(spec, "aaa"));
+          } catch (IOException e) {

Review comment:
       I don't think that `write` should throw `IOException`. We always wrap `IOException` in `UncheckedIOException` so it makes no sense for us to throw it from the writer interface.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715166067



##########
File path: core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.StructLikeMap;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that keeps files for each
+ * seen spec/partition pair open until this writer is closed.
+ * <p>
+ * As opposed to {@link ClusteredWriter}, this writer does not require the incoming records
+ * to be clustered by partition spec and partition as all files are kept open. As a consequence,
+ * this writer may potentially consume substantially more memory compared to {@link ClusteredWriter}.
+ * Use this writer only when clustering by spec/partition is not possible (e.g. streaming).
+ */
+abstract class FanoutWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Map<Integer, Map<StructLike, FileWriter<T, R>>> writers = Maps.newHashMap();
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    FileWriter<T, R> writer = writer(spec, partition);
+    writer.write(row);
+  }
+
+  private FileWriter<T, R> writer(PartitionSpec spec, StructLike partition) {
+    Map<StructLike, FileWriter<T, R>> specWriters = writers.computeIfAbsent(
+        spec.specId(),
+        id -> StructLikeMap.create(spec.partitionType()));
+    FileWriter<T, R> writer = specWriters.get(partition);
+
+    if (writer == null) {
+      // copy the partition key as the key object may be reused
+      StructLike copiedPartition = StructCopy.copy(partition);
+      writer = newWriter(spec, copiedPartition);
+      specWriters.put(copiedPartition, writer);
+    }
+
+    return writer;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!closed) {
+      closeWriters();
+      this.closed = true;
+    }
+  }
+
+  private void closeWriters() throws IOException {
+    for (Map<StructLike, FileWriter<T, R>> specWriters : writers.values()) {
+      for (FileWriter<T, R> writer : specWriters.values()) {
+        writer.close();
+        addResult(writer.result());
+      }
+
+      specWriters.clear();
+    }
+
+    writers.clear();
+  }
+
+  @Override
+  public final R result() {
+    Preconditions.checkState(closed, "Cannot get result from unclosed writer");
+    return aggregatedResult();
+  }
+
+  protected EncryptedOutputFile newOutputFile(OutputFileFactory fileFactory, PartitionSpec spec, StructLike partition) {
+    return partition == null ? fileFactory.newOutputFile() : fileFactory.newOutputFile(spec, partition);

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715164511



##########
File path: core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.StructLikeMap;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that keeps files for each
+ * seen spec/partition pair open until this writer is closed.
+ * <p>
+ * As opposed to {@link ClusteredWriter}, this writer does not require the incoming records
+ * to be clustered by partition spec and partition as all files are kept open. As a consequence,
+ * this writer may potentially consume substantially more memory compared to {@link ClusteredWriter}.
+ * Use this writer only when clustering by spec/partition is not possible (e.g. streaming).
+ */
+abstract class FanoutWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Map<Integer, Map<StructLike, FileWriter<T, R>>> writers = Maps.newHashMap();

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r714962375



##########
File path: core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.StructLikeMap;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that keeps files for each
+ * seen spec/partition pair open until this writer is closed.
+ * <p>
+ * As opposed to {@link ClusteredWriter}, this writer does not require the incoming records
+ * to be clustered by partition spec and partition as all files are kept open. As a consequence,
+ * this writer may potentially consume substantially more memory compared to {@link ClusteredWriter}.
+ * Use this writer only when clustering by spec/partition is not possible (e.g. streaming).
+ */
+abstract class FanoutWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Map<Integer, Map<StructLike, FileWriter<T, R>>> writers = Maps.newHashMap();
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    FileWriter<T, R> writer = writer(spec, partition);
+    writer.write(row);
+  }
+
+  private FileWriter<T, R> writer(PartitionSpec spec, StructLike partition) {
+    Map<StructLike, FileWriter<T, R>> specWriters = writers.computeIfAbsent(
+        spec.specId(),
+        id -> StructLikeMap.create(spec.partitionType()));
+    FileWriter<T, R> writer = specWriters.get(partition);

Review comment:
       If I am not mistaken, we only use the fanout writer for partitioned tables. Even in the old implementation.
   
   You are right about this being the place where we need attention. Like I said [here](https://github.com/apache/iceberg/pull/3164#discussion_r714123188), we have an extra `computeIfAbsent` call and using `StructLikeMap` instead of a regular map with `PartitionKey`. While the performance hit seems to be negligible according to benchmark results I posted, I'd up to optimize this as much as possible.
   
   One thing to consider is the performance of `equals` and `hashCode` in `StructLikeWrapper` vs `PartitionKey`. It is relatively simple and efficient in `PartitionKey` where we compare/iterate through object array. In the wrapper, these methods are more involved but don't seem drastically expensive.
   
   One optimization idea is to introduce a cache of Comparators and JavaHash objects we use in the wrapper. At this point, we will create a comparator and a java hash for every partition we add to `StructLikeMap`. Even if we write to 1k partitions, I am not sure the difference is noticeable.
   
   Another optimization idea can be to introduce a new interface to indicate when a StructLike is backed by an array of values. If two structs implement that interface, we can just compare the arrays in `StructLikeWrapper`.
   
   I am going to do a separate benchmark for `HashMap` with `PartitionKey` and `StructLikeMap` with `PartitionKey`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715164414



##########
File path: core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.StructLikeMap;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that keeps files for each
+ * seen spec/partition pair open until this writer is closed.
+ * <p>
+ * As opposed to {@link ClusteredWriter}, this writer does not require the incoming records
+ * to be clustered by partition spec and partition as all files are kept open. As a consequence,
+ * this writer may potentially consume substantially more memory compared to {@link ClusteredWriter}.
+ * Use this writer only when clustering by spec/partition is not possible (e.g. streaming).
+ */
+abstract class FanoutWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Map<Integer, Map<StructLike, FileWriter<T, R>>> writers = Maps.newHashMap();
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    FileWriter<T, R> writer = writer(spec, partition);
+    writer.write(row);
+  }
+
+  private FileWriter<T, R> writer(PartitionSpec spec, StructLike partition) {
+    Map<StructLike, FileWriter<T, R>> specWriters = writers.computeIfAbsent(

Review comment:
       Yeah, `PartitionMap` could a be a solution here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r714561637



##########
File path: core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructLikeSet;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be clustered by partition spec and by partition within each spec.
+ * <p>
+ * As opposed to {@link FanoutWriter}, this writer keeps at most one file open to reduce
+ * the memory consumption. Prefer using this writer whenever the incoming records can be clustered
+ * by spec/partition.
+ */
+abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Set<Integer> completedSpecIds = Sets.newHashSet();
+
+  private PartitionSpec currentSpec = null;
+  private Comparator<StructLike> partitionComparator = null;
+  private Set<StructLike> completedPartitions = null;
+  private StructLike currentPartition = null;
+  private FileWriter<T, R> currentWriter = null;
+
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    if (!spec.equals(currentSpec)) {
+      if (currentSpec != null) {
+        closeCurrentWriter();
+        completedSpecIds.add(currentSpec.specId());
+        completedPartitions.clear();
+      }
+
+      if (completedSpecIds.contains(spec.specId())) {
+        throw new IllegalStateException("Already closed files for spec: " + spec.specId());
+      }
+
+      StructType partitionType = spec.partitionType();
+
+      currentSpec = spec;
+      partitionComparator = Comparators.forType(partitionType);
+      completedPartitions = StructLikeSet.create(partitionType);
+      // copy the partition key as the key object may be reused
+      currentPartition = StructCopy.copy(partition);
+      currentWriter = newWriter(currentSpec, currentPartition);
+
+    } else if (partition != currentPartition && partitionComparator.compare(partition, currentPartition) != 0) {
+      closeCurrentWriter();
+      completedPartitions.add(currentPartition);
+
+      if (completedPartitions.contains(partition)) {
+        String path = spec.partitionToPath(partition);
+        String errMsg = String.format("Already closed files for partition '%s' in spec %d", path, spec.specId());
+        throw new IllegalStateException(errMsg);
+      }
+
+      // copy the partition key as the key object may be reused
+      currentPartition = StructCopy.copy(partition);
+      currentWriter = newWriter(currentSpec, currentPartition);
+    }
+
+    currentWriter.write(row);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!closed) {
+      closeCurrentWriter();
+      this.closed = true;
+    }
+  }
+
+  private void closeCurrentWriter() throws IOException {
+    if (currentWriter != null) {
+      currentWriter.close();
+
+      addResult(currentWriter.result());
+
+      this.currentWriter = null;
+    }
+  }
+
+  @Override
+  public final R result() {
+    Preconditions.checkState(closed, "Cannot get result from unclosed writer");
+    return aggregatedResult();
+  }
+
+  protected EncryptedOutputFile newOutputFile(OutputFileFactory fileFactory, PartitionSpec spec, StructLike partition) {
+    return partition == null ? fileFactory.newOutputFile() : fileFactory.newOutputFile(spec, partition);

Review comment:
       Should we add pre-check for the unpartitioned spec with a non-null partition data ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715134245



##########
File path: core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructLikeSet;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be clustered by partition spec and by partition within each spec.
+ * <p>
+ * As opposed to {@link FanoutWriter}, this writer keeps at most one file open to reduce
+ * the memory consumption. Prefer using this writer whenever the incoming records can be clustered
+ * by spec/partition.
+ */
+abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Set<Integer> completedSpecIds = Sets.newHashSet();
+
+  private PartitionSpec currentSpec = null;
+  private Comparator<StructLike> partitionComparator = null;
+  private Set<StructLike> completedPartitions = null;
+  private StructLike currentPartition = null;
+  private FileWriter<T, R> currentWriter = null;
+
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    if (!spec.equals(currentSpec)) {
+      if (currentSpec != null) {
+        closeCurrentWriter();
+        completedSpecIds.add(currentSpec.specId());
+        completedPartitions.clear();
+      }
+
+      if (completedSpecIds.contains(spec.specId())) {
+        throw new IllegalStateException("Already closed files for spec: " + spec.specId());
+      }
+
+      StructType partitionType = spec.partitionType();
+
+      currentSpec = spec;

Review comment:
       Style: prefer using `this.currentSpec` so that it is obvious that the assignment is to a field and not a local variable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r714251044



##########
File path: core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructLikeSet;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be clustered by partition spec and partition.

Review comment:
       I can surely make that more clear. I'll update.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r714531202



##########
File path: core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructLikeSet;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be clustered by partition spec and by partition within each spec.
+ * <p>
+ * As opposed to {@link FanoutWriter}, this writer keeps at most one file open to reduce
+ * the memory consumption. Prefer using this writer whenever the incoming records can be clustered
+ * by spec/partition.
+ */
+abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Set<Integer> completedSpecIds = Sets.newHashSet();
+
+  private PartitionSpec currentSpec = null;
+  private Comparator<StructLike> partitionComparator = null;
+  private Set<StructLike> completedPartitions = null;
+  private StructLike currentPartition = null;
+  private FileWriter<T, R> currentWriter = null;
+
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    if (!spec.equals(currentSpec)) {
+      if (currentSpec != null) {
+        closeCurrentWriter();
+        completedSpecIds.add(currentSpec.specId());
+        completedPartitions.clear();
+      }
+
+      if (completedSpecIds.contains(spec.specId())) {
+        throw new IllegalStateException("Already closed files for spec: " + spec.specId());

Review comment:
       I will prefer to add the partitionSpec and the specId together (rather than only the specId) in the IllegalStateException message, because I've seen many users publish questions about what's wrong about the message `Already closed files for partition ...`,  it just a sort issue. What I am trying to say is:  it's quite easy for the iceberg beginners to get the meaning of `Already closed files for spec: 3`  if we keep the current message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715159578



##########
File path: core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.StructLikeMap;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that keeps files for each
+ * seen spec/partition pair open until this writer is closed.
+ * <p>
+ * As opposed to {@link ClusteredWriter}, this writer does not require the incoming records
+ * to be clustered by partition spec and partition as all files are kept open. As a consequence,
+ * this writer may potentially consume substantially more memory compared to {@link ClusteredWriter}.
+ * Use this writer only when clustering by spec/partition is not possible (e.g. streaming).
+ */
+abstract class FanoutWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Map<Integer, Map<StructLike, FileWriter<T, R>>> writers = Maps.newHashMap();
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    FileWriter<T, R> writer = writer(spec, partition);
+    writer.write(row);
+  }
+
+  private FileWriter<T, R> writer(PartitionSpec spec, StructLike partition) {
+    Map<StructLike, FileWriter<T, R>> specWriters = writers.computeIfAbsent(

Review comment:
       We could make a `PartitionMap` class that works like `PartitionSet` for this. No need to do it right now though. I agree that we should move forward with this implementation and update it later if needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r715171906



##########
File path: data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
##########
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestPartitioningWriters<T> extends WriterTestBase<T> {
+
+  @Parameterized.Parameters(name = "FileFormat={0}")
+  public static Object[] parameters() {
+    return new Object[][] {
+        new Object[]{FileFormat.AVRO},
+        new Object[]{FileFormat.PARQUET},
+        new Object[]{FileFormat.ORC},
+    };
+  }
+
+  private static final int TABLE_FORMAT_VERSION = 2;
+  private static final long TARGET_FILE_SIZE = 128L * 1024 * 1024;
+
+  private final FileFormat fileFormat;
+  private OutputFileFactory fileFactory = null;
+
+  public TestPartitioningWriters(FileFormat fileFormat) {
+    super(TABLE_FORMAT_VERSION);
+    this.fileFormat = fileFormat;
+  }
+
+  protected abstract StructLikeSet toSet(Iterable<T> records);
+
+  protected FileFormat format() {
+    return fileFormat;
+  }
+
+  @Before
+  public void setupTable() throws Exception {
+    this.tableDir = temp.newFolder();
+    Assert.assertTrue(tableDir.delete()); // created during table creation
+
+    this.metadataDir = new File(tableDir, "metadata");
+    this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+    this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build();
+  }
+
+  @Test
+  public void testClusteredDataWriterNoRecords() throws IOException {
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    writer.close();
+    Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+
+    writer.close();
+    Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+  }
+
+  @Test
+  public void testClusteredDataWriterMultiplePartitions() throws IOException {
+    table.updateSpec()
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    PartitionSpec spec = table.spec();
+
+    writer.write(toRow(1, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(2, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(3, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(4, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(5, "ccc"), spec, partitionKey(spec, "ccc"));
+
+    writer.close();
+
+    DataWriteResult result = writer.result();
+    Assert.assertEquals("Must be 3 data files", 3, result.dataFiles().size());
+
+    RowDelta rowDelta = table.newRowDelta();
+    result.dataFiles().forEach(rowDelta::addRows);
+    rowDelta.commit();
+
+    List<T> expectedRows = ImmutableList.of(
+        toRow(1, "aaa"),
+        toRow(2, "aaa"),
+        toRow(3, "bbb"),
+        toRow(4, "bbb"),
+        toRow(5, "ccc")
+    );
+    Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
+  }
+
+  @Test
+  public void testClusteredDataWriterOutOfOrderPartitions() throws IOException {
+    table.updateSpec()
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    PartitionSpec spec = table.spec();
+
+    writer.write(toRow(1, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(2, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(3, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(4, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(5, "ccc"), spec, partitionKey(spec, "ccc"));
+
+    AssertHelpers.assertThrows("Should fail to write out of order partitions",
+        IllegalStateException.class, "Already closed files for partition",
+        () -> {
+          try {
+            writer.write(toRow(6, "aaa"), spec, partitionKey(spec, "aaa"));
+          } catch (IOException e) {

Review comment:
       I'll do that in a follow-up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r713464668



##########
File path: core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.StructLikeMap;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that keeps files for each
+ * seen spec/partition pair open until this writer is closed.
+ * <p>
+ * As opposed to {@link ClusteredWriter}, this writer does not require the incoming records
+ * to be clustered by partition spec and partition as all files are kept open. As a consequence,
+ * this writer may potentially consume substantially more memory compared to {@link ClusteredWriter}.
+ * Use this writer only when clustering by spec/partition is not possible (e.g. streaming).
+ */
+abstract class FanoutWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Map<Integer, Map<StructLike, FileWriter<T, R>>> writers = Maps.newHashMap();
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    FileWriter<T, R> writer = writer(spec, partition);
+    writer.write(row);
+  }
+
+  private FileWriter<T, R> writer(PartitionSpec spec, StructLike partition) {
+    Map<StructLike, FileWriter<T, R>> specWriters = writers.computeIfAbsent(

Review comment:
       @openinx, I've tried to address [this](https://github.com/apache/iceberg/pull/2945/files#r700177068) comment. However, this would require to maintain a map of StructLike wrappers by spec. I am not sure that will be cleaner.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3164: Core: Add PartitioningWriter

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3164:
URL: https://github.com/apache/iceberg/pull/3164#discussion_r714601749



##########
File path: core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.StructLikeMap;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that keeps files for each
+ * seen spec/partition pair open until this writer is closed.
+ * <p>
+ * As opposed to {@link ClusteredWriter}, this writer does not require the incoming records
+ * to be clustered by partition spec and partition as all files are kept open. As a consequence,
+ * this writer may potentially consume substantially more memory compared to {@link ClusteredWriter}.
+ * Use this writer only when clustering by spec/partition is not possible (e.g. streaming).
+ */
+abstract class FanoutWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Map<Integer, Map<StructLike, FileWriter<T, R>>> writers = Maps.newHashMap();
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    FileWriter<T, R> writer = writer(spec, partition);
+    writer.write(row);
+  }
+
+  private FileWriter<T, R> writer(PartitionSpec spec, StructLike partition) {
+    Map<StructLike, FileWriter<T, R>> specWriters = writers.computeIfAbsent(
+        spec.specId(),
+        id -> StructLikeMap.create(spec.partitionType()));
+    FileWriter<T, R> writer = specWriters.get(partition);

Review comment:
       For fanout write path,  this line is the one of the hottest line because it will need to compare the partition field values for every row.  For unpartitioned table,  we also need to get the null key from the `specWriters` map.  In the old implementation,  we don't need to get the writer from the map for unpartitioned table.  Is there any performance regression when comparing the two ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org