You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/01/25 08:37:11 UTC

[GitHub] [iceberg] xloya opened a new pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

xloya opened a new pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977


   When we write to the partition table, if an executor needs to write a very large number of partitions, and the map's size of writers is not limited, it will eventually lead to OOM. We have had this problem in production environment.  
   
   This pr changes the map of storing writers to caffine cache, providing two conditions of chche size and expire time for writers to expire useless writers.   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xloya commented on pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
xloya commented on pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#issuecomment-1032135907


   > My concern with this approach is that it can potentially create more small files (with close and open for the same partition after cache eviction). @xloya can you share some of the results that you tried with this approach? I am sure it can help with memory usage. but does it create more small files?
   > 
   > I shared a design doc on shuffling support in Flink sink with the community a few months ago. That was a diff approach. https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/edit#heading=h.o4q8a61sahkq
   
   Yes, this will lead to an increase in the number of small files, but with reasonable configuration, I think it can be in a relatively balanced state.  
   For the way you mentioned, I think it is feasible. We will perform `Keyby` operation on Flink writing to solve this problem.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xloya commented on pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
xloya commented on pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#issuecomment-1025048506


   > @aokolnychyi, do you think this is a good idea?
   > 
   > I'm not sure about this. What will end up happening in Spark is that you'll create a lot of new data files. But in that case you should have used a better plan that clustered data instead of using the fanout writer. Maybe this is needed in Flink only?
   
   @rdblue Sorry for my late reply, I am on holiday because of Chinese New Year, I will read comments later! Thx again!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r796162454



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -19,19 +19,52 @@
 
 package org.apache.iceberg.io;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
 
 public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
-  private final Map<PartitionKey, RollingFileWriter> writers = Maps.newHashMap();
+  private Cache<PartitionKey, RollingFileWriter> writers;
 
   protected PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
-                          OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+                          OutputFileFactory fileFactory, FileIO io,
+                          long targetFileSize, Map<String, String> properties) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+    int writersCacheSize = PropertyUtil.propertyAsInt(
+        properties,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_SIZE,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_SIZE_DEFAULT);
+    long evictionTimeout = PropertyUtil.propertyAsLong(
+        properties,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_EVICT_MS,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_EVICT_MS_DEFAULT);
+    initWritersCache(writersCacheSize, evictionTimeout);
+  }
+
+  private synchronized void initWritersCache(int writersCacheSize, long evictionTimeout) {
+    if (writers == null) {
+      writers = Caffeine.newBuilder()
+          .maximumSize(writersCacheSize)
+          .expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS)
+          .removalListener((key, value, cause) -> {
+            try {
+              ((RollingFileWriter) value).close();
+            } catch (IOException e) {
+              throw new UncheckedIOException("Failed to close rolling file writer", e);

Review comment:
       I believe any exceptions thrown inside the removal listener are logged and then swallowed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#issuecomment-1024586529


   @aokolnychyi, do you think this is a good idea?
   
   I'm not sure about this. What will end up happening in Spark is that you'll create a lot of new data files. But in that case you should have used a better plan that clustered data instead of using the fanout writer. Maybe this is needed in Flink only?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xloya commented on pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
xloya commented on pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#issuecomment-1032133971


   
   > @aokolnychyi, do you think this is a good idea?
   > 
   > I'm not sure about this. What will end up happening in Spark is that you'll create a lot of new data files. But in that case you should have used a better plan that clustered data instead of using the fanout writer. Maybe this is needed in Flink only?
   
   In fact this happens when user writes to all partitions using Spark `insert overwrite`. We have tried several ways, unless we use `distribute by` in SQL to break up the data, or the oom will still appear


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r794847496



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -19,19 +19,52 @@
 
 package org.apache.iceberg.io;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
 
 public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
-  private final Map<PartitionKey, RollingFileWriter> writers = Maps.newHashMap();
+  private Cache<PartitionKey, RollingFileWriter> writers;
 
   protected PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
-                          OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+                          OutputFileFactory fileFactory, FileIO io,

Review comment:
       Can you fix indentation here, as long as these lines are changing?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xloya commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
xloya commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r794151808



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -210,6 +211,20 @@ private TableProperties() {
   public static final String SPARK_WRITE_PARTITIONED_FANOUT_ENABLED = "write.spark.fanout.enabled";
   public static final boolean SPARK_WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT = false;
 
+  public static final String PARTITIONED_FANOUT_WRITERS_CACHE_SIZE = "write.partition.fanout.writers-cache-size";
+  public static final int PARTITIONED_FANOUT_WRITERS_CACHE_SIZE_DEFAULT = Integer.MAX_VALUE;
+
+  public static final String PARTITIONED_FANOUT_WRITERS_CACHE_EVICT_MS =
+      "write.partition.fanout.writers-cache-eviction-ms";
+  public static final long PARTITIONED_FANOUT_WRITERS_CACHE_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(5);

Review comment:
       Makes sense, I'll adjust the default value later




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xloya commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
xloya commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r801209738



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -60,11 +93,18 @@ public void write(T row) throws IOException {
 
   @Override
   public void close() throws IOException {
-    if (!writers.isEmpty()) {
-      for (PartitionKey key : writers.keySet()) {
-        writers.get(key).close();
+    ConcurrentMap<PartitionKey, RollingFileWriter> writersMap = writers.asMap();
+    if (writersMap.size() > 0) {
+      // close all remaining rolling file writers
+      try {
+        Tasks.foreach(writersMap.values())
+            .throwFailureWhenFinished()
+            .noRetry()
+            .run(RollingFileWriter::close, IOException.class);
+      } catch (IOException e) {
+        throw new UncheckedIOException("Failed to close rolling file writer", e);
       }
-      writers.clear();
     }
+    writers.invalidateAll();

Review comment:
       In fact, this is mainly to clear the references in the Map, and I think it has no effect on the second closing of the Writer, because the `BaseTaskWriter` will judge whether the writer is `null`. After the first close, the writer has been set to `null`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r794850620



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
##########
@@ -50,17 +55,43 @@
                          Schema schema,
                          RowType flinkSchema,
                          List<Integer> equalityFieldIds,
-                         boolean upsert) {
+                         boolean upsert,
+                         Map<String, String> properties) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds,
         upsert);
     this.partitionKey = new PartitionKey(spec, schema);
+    int writersCacheSize = PropertyUtil.propertyAsInt(
+        properties,
+        TableProperties.PARTITIONED_DELTA_WRITERS_CACHE_SIZE,
+        TableProperties.PARTITIONED_DELTA_WRITERS_CACHE_SIZE_DEFAULT);
+    long evictionTimeout = PropertyUtil.propertyAsLong(
+        properties,
+        TableProperties.PARTITIONED_DELTA_WRITERS_CACHE_EVICT_MS,
+        TableProperties.PARTITIONED_DELTA_WRITERS_CACHE_EVICT_MS_DEFAULT);
+    initWritersCache(writersCacheSize, evictionTimeout);
+  }
+
+  private synchronized void initWritersCache(int writersCacheSize, long evictionTimeout) {

Review comment:
       Why is this synchronized? Isn't this only called from the constructor?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r796157999



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -19,19 +19,52 @@
 
 package org.apache.iceberg.io;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
 
 public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
-  private final Map<PartitionKey, RollingFileWriter> writers = Maps.newHashMap();
+  private Cache<PartitionKey, RollingFileWriter> writers;
 
   protected PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
-                          OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+                          OutputFileFactory fileFactory, FileIO io,
+                          long targetFileSize, Map<String, String> properties) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+    int writersCacheSize = PropertyUtil.propertyAsInt(
+        properties,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_SIZE,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_SIZE_DEFAULT);
+    long evictionTimeout = PropertyUtil.propertyAsLong(
+        properties,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_EVICT_MS,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_EVICT_MS_DEFAULT);
+    initWritersCache(writersCacheSize, evictionTimeout);
+  }
+
+  private synchronized void initWritersCache(int writersCacheSize, long evictionTimeout) {
+    if (writers == null) {
+      writers = Caffeine.newBuilder()
+          .maximumSize(writersCacheSize)
+          .expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS)
+          .removalListener((key, value, cause) -> {

Review comment:
       This would instantiate a thread pool and evicted items would enter a queue to go in there.
   
   So I believe the removalListener would be run in a background thread instantiated by Caffeine.
   
   ```
   You may specify a removal listener for your cache to perform some operation when an entry is removed,
   via Caffeine.removalListener(RemovalListener).
   
   These operations are executed asynchronously using an Executor, where the default executor is
   ForkJoinPool.commonPool() and can be overridden via Caffeine.executor(Executor).
   ```
   
   But I'll take a further look into the details, especially with mutating the value this way.
   
   I also have concerns about `expireAfterAccess`, as the "access" in question has to come from the cache. It's not any access to the underlying value (for example if Spark is already using the file writer, any operations it calls on it that don't go through the cache won't count towards resetting the time since last acces).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r796159827



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -60,11 +93,18 @@ public void write(T row) throws IOException {
 
   @Override
   public void close() throws IOException {
-    if (!writers.isEmpty()) {
-      for (PartitionKey key : writers.keySet()) {
-        writers.get(key).close();
+    ConcurrentMap<PartitionKey, RollingFileWriter> writersMap = writers.asMap();
+    if (writersMap.size() > 0) {
+      // close all remaining rolling file writers
+      try {
+        Tasks.foreach(writersMap.values())
+            .throwFailureWhenFinished()
+            .noRetry()
+            .run(RollingFileWriter::close, IOException.class);
+      } catch (IOException e) {
+        throw new UncheckedIOException("Failed to close rolling file writer", e);
       }
-      writers.clear();
     }
+    writers.invalidateAll();

Review comment:
       Yes I believe it would. The `Tasks.forEach` would call close on the values, but I don't think that Caffeine would know they were closed.
   
   And the removalListener is going to be called for sure on `invalidateAll`. https://github.com/ben-manes/caffeine/wiki/Removal




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r794846901



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -210,6 +211,20 @@ private TableProperties() {
   public static final String SPARK_WRITE_PARTITIONED_FANOUT_ENABLED = "write.spark.fanout.enabled";
   public static final boolean SPARK_WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT = false;
 
+  public static final String PARTITIONED_FANOUT_WRITERS_CACHE_SIZE = "write.partition.fanout.writers-cache-size";
+  public static final int PARTITIONED_FANOUT_WRITERS_CACHE_SIZE_DEFAULT = Integer.MAX_VALUE;
+
+  public static final String PARTITIONED_FANOUT_WRITERS_CACHE_EVICT_MS =
+      "write.partition.fanout.writers-cache-eviction-ms";
+  public static final long PARTITIONED_FANOUT_WRITERS_CACHE_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(5);

Review comment:
       What is the purpose of a timeout? I don't see how that aligns with the need to reduce memory consumption. You could create so many files in 5 minutes that you run out of memory. If you could evict based on memory consumption of the writers, that seems valuable. But I think I would remove this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r794848646



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -60,11 +93,18 @@ public void write(T row) throws IOException {
 
   @Override
   public void close() throws IOException {
-    if (!writers.isEmpty()) {
-      for (PartitionKey key : writers.keySet()) {
-        writers.get(key).close();
+    ConcurrentMap<PartitionKey, RollingFileWriter> writersMap = writers.asMap();
+    if (writersMap.size() > 0) {

Review comment:
       Why not use `isEmpty` like before?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xloya commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
xloya commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r801210020



##########
File path: flink/v1.13/build.gradle
##########
@@ -40,6 +40,8 @@ project(':iceberg-flink:iceberg-flink-1.13') {
     implementation project(':iceberg-parquet')
     implementation project(':iceberg-hive-metastore')
 
+    implementation "com.github.ben-manes.caffeine:caffeine"

Review comment:
       Sure, I will only modify it for the latest version first




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xloya commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
xloya commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r801208794



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -19,19 +19,52 @@
 
 package org.apache.iceberg.io;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
 
 public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
-  private final Map<PartitionKey, RollingFileWriter> writers = Maps.newHashMap();
+  private Cache<PartitionKey, RollingFileWriter> writers;
 
   protected PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
-                          OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+                          OutputFileFactory fileFactory, FileIO io,

Review comment:
       Sure




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xloya commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
xloya commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r801208928



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -60,11 +93,18 @@ public void write(T row) throws IOException {
 
   @Override
   public void close() throws IOException {
-    if (!writers.isEmpty()) {
-      for (PartitionKey key : writers.keySet()) {
-        writers.get(key).close();
+    ConcurrentMap<PartitionKey, RollingFileWriter> writersMap = writers.asMap();
+    if (writersMap.size() > 0) {

Review comment:
       I'll fix it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r794848060



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -19,19 +19,52 @@
 
 package org.apache.iceberg.io;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
 
 public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
-  private final Map<PartitionKey, RollingFileWriter> writers = Maps.newHashMap();
+  private Cache<PartitionKey, RollingFileWriter> writers;
 
   protected PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
-                          OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+                          OutputFileFactory fileFactory, FileIO io,
+                          long targetFileSize, Map<String, String> properties) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+    int writersCacheSize = PropertyUtil.propertyAsInt(
+        properties,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_SIZE,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_SIZE_DEFAULT);
+    long evictionTimeout = PropertyUtil.propertyAsLong(
+        properties,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_EVICT_MS,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_EVICT_MS_DEFAULT);
+    initWritersCache(writersCacheSize, evictionTimeout);
+  }
+
+  private synchronized void initWritersCache(int writersCacheSize, long evictionTimeout) {
+    if (writers == null) {
+      writers = Caffeine.newBuilder()
+          .maximumSize(writersCacheSize)
+          .expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS)
+          .removalListener((key, value, cause) -> {

Review comment:
       @kbendick can you take a look at this? I think there are some threading concerns. Does this run in the current thread or in the background?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] coolderli commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
coolderli commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r793234371



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -210,6 +211,20 @@ private TableProperties() {
   public static final String SPARK_WRITE_PARTITIONED_FANOUT_ENABLED = "write.spark.fanout.enabled";
   public static final boolean SPARK_WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT = false;
 
+  public static final String PARTITIONED_FANOUT_WRITERS_CACHE_SIZE = "write.partition.fanout.writers-cache-size";

Review comment:
       I am not sure if the configure should be table properties. If the table has a spark and a flink writer, the size is hard to configure.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r796157999



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -19,19 +19,52 @@
 
 package org.apache.iceberg.io;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
 
 public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
-  private final Map<PartitionKey, RollingFileWriter> writers = Maps.newHashMap();
+  private Cache<PartitionKey, RollingFileWriter> writers;
 
   protected PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
-                          OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+                          OutputFileFactory fileFactory, FileIO io,
+                          long targetFileSize, Map<String, String> properties) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+    int writersCacheSize = PropertyUtil.propertyAsInt(
+        properties,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_SIZE,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_SIZE_DEFAULT);
+    long evictionTimeout = PropertyUtil.propertyAsLong(
+        properties,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_EVICT_MS,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_EVICT_MS_DEFAULT);
+    initWritersCache(writersCacheSize, evictionTimeout);
+  }
+
+  private synchronized void initWritersCache(int writersCacheSize, long evictionTimeout) {
+    if (writers == null) {
+      writers = Caffeine.newBuilder()
+          .maximumSize(writersCacheSize)
+          .expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS)
+          .removalListener((key, value, cause) -> {

Review comment:
       This would instantiate a thread pool and evicted items would enter a queue to go in there.
   
   So I believe the removalListener would be run in a background thread instantiated by Cafffeine.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xloya commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
xloya commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r801211464



##########
File path: spark/v2.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
##########
@@ -275,7 +275,7 @@ public void writePartitionedLegacyFanoutDataWriter(Blackhole blackhole) throws I
     TaskWriter<InternalRow> writer = new SparkPartitionedFanoutWriter(
         partitionedSpec, fileFormat(), appenders,
         fileFactory, io, TARGET_FILE_SIZE_IN_BYTES,
-        writeSchema, sparkWriteType);
+        writeSchema, sparkWriteType, table().properties());

Review comment:
       Sure




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xloya commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
xloya commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r794152267



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -210,6 +211,20 @@ private TableProperties() {
   public static final String SPARK_WRITE_PARTITIONED_FANOUT_ENABLED = "write.spark.fanout.enabled";
   public static final boolean SPARK_WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT = false;
 
+  public static final String PARTITIONED_FANOUT_WRITERS_CACHE_SIZE = "write.partition.fanout.writers-cache-size";

Review comment:
       I am not sure about this, because the fanout writer belongs to the core module, but the delta writer belongs to the Flink module. To be honest, I am not very sure about where to put the configuration




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r794849236



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -60,11 +93,18 @@ public void write(T row) throws IOException {
 
   @Override
   public void close() throws IOException {
-    if (!writers.isEmpty()) {
-      for (PartitionKey key : writers.keySet()) {
-        writers.get(key).close();
+    ConcurrentMap<PartitionKey, RollingFileWriter> writersMap = writers.asMap();
+    if (writersMap.size() > 0) {
+      // close all remaining rolling file writers
+      try {
+        Tasks.foreach(writersMap.values())
+            .throwFailureWhenFinished()
+            .noRetry()
+            .run(RollingFileWriter::close, IOException.class);
+      } catch (IOException e) {
+        throw new UncheckedIOException("Failed to close rolling file writer", e);
       }
-      writers.clear();
     }
+    writers.invalidateAll();

Review comment:
       Won't this close all of the writers a second time because of the removal listener?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] coolderli commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
coolderli commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r793233413



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -210,6 +211,20 @@ private TableProperties() {
   public static final String SPARK_WRITE_PARTITIONED_FANOUT_ENABLED = "write.spark.fanout.enabled";
   public static final boolean SPARK_WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT = false;
 
+  public static final String PARTITIONED_FANOUT_WRITERS_CACHE_SIZE = "write.partition.fanout.writers-cache-size";
+  public static final int PARTITIONED_FANOUT_WRITERS_CACHE_SIZE_DEFAULT = Integer.MAX_VALUE;
+
+  public static final String PARTITIONED_FANOUT_WRITERS_CACHE_EVICT_MS =
+      "write.partition.fanout.writers-cache-eviction-ms";
+  public static final long PARTITIONED_FANOUT_WRITERS_CACHE_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(5);

Review comment:
       I think the default value should not be 5 minutes. To keep the behavior consistent with before. Maybe we should the value never expire.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#issuecomment-1025382932


   My concern with this approach is that it can potentially create more small files (with close and open for the same partition after cache eviction). @xloya can you share some of the results that you tried with this approach? I am sure it can help with memory usage. but does it create more small files?
   
   I shared a design doc on shuffling support in Flink sink with the community a few months ago. That was a diff approach. https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/edit#heading=h.o4q8a61sahkq


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r796157999



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -19,19 +19,52 @@
 
 package org.apache.iceberg.io;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
 
 public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
-  private final Map<PartitionKey, RollingFileWriter> writers = Maps.newHashMap();
+  private Cache<PartitionKey, RollingFileWriter> writers;
 
   protected PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
-                          OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+                          OutputFileFactory fileFactory, FileIO io,
+                          long targetFileSize, Map<String, String> properties) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+    int writersCacheSize = PropertyUtil.propertyAsInt(
+        properties,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_SIZE,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_SIZE_DEFAULT);
+    long evictionTimeout = PropertyUtil.propertyAsLong(
+        properties,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_EVICT_MS,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_EVICT_MS_DEFAULT);
+    initWritersCache(writersCacheSize, evictionTimeout);
+  }
+
+  private synchronized void initWritersCache(int writersCacheSize, long evictionTimeout) {
+    if (writers == null) {
+      writers = Caffeine.newBuilder()
+          .maximumSize(writersCacheSize)
+          .expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS)
+          .removalListener((key, value, cause) -> {

Review comment:
       This would instantiate a thread pool and evicted items would enter a queue to go in there.
   
   So I believe the removalListener would be run in a background thread instantiated by Caffeine.
   
   ```
   You may specify a removal listener for your cache to perform some operation when an entry is removed,
   via Caffeine.removalListener(RemovalListener).
   
   These operations are executed asynchronously using an Executor, where the default executor is
   ForkJoinPool.commonPool() and can be overridden via Caffeine.executor(Executor).
   ```
   
   But I'll take a further look into the details, especially with mutating the value this way. I also have concerns about `expireAfterAccess`, as the "access" in question has to come from the cache. It's not any access to the underlying value (for example if Spark is already using the file writer, any operations it calls on it that don't go through the cache won't count towards resetting the time since last acces).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xloya commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
xloya commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r801204803



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -210,6 +211,20 @@ private TableProperties() {
   public static final String SPARK_WRITE_PARTITIONED_FANOUT_ENABLED = "write.spark.fanout.enabled";
   public static final boolean SPARK_WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT = false;
 
+  public static final String PARTITIONED_FANOUT_WRITERS_CACHE_SIZE = "write.partition.fanout.writers-cache-size";
+  public static final int PARTITIONED_FANOUT_WRITERS_CACHE_SIZE_DEFAULT = Integer.MAX_VALUE;
+
+  public static final String PARTITIONED_FANOUT_WRITERS_CACHE_EVICT_MS =
+      "write.partition.fanout.writers-cache-eviction-ms";
+  public static final long PARTITIONED_FANOUT_WRITERS_CACHE_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(5);

Review comment:
       @rdblue The reason for adding the timeout option is that it is often difficult to determine the memory consumption based on the number of writers




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xloya commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
xloya commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r801211375



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
##########
@@ -50,17 +55,43 @@
                          Schema schema,
                          RowType flinkSchema,
                          List<Integer> equalityFieldIds,
-                         boolean upsert) {
+                         boolean upsert,
+                         Map<String, String> properties) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds,
         upsert);
     this.partitionKey = new PartitionKey(spec, schema);
+    int writersCacheSize = PropertyUtil.propertyAsInt(
+        properties,
+        TableProperties.PARTITIONED_DELTA_WRITERS_CACHE_SIZE,
+        TableProperties.PARTITIONED_DELTA_WRITERS_CACHE_SIZE_DEFAULT);
+    long evictionTimeout = PropertyUtil.propertyAsLong(
+        properties,
+        TableProperties.PARTITIONED_DELTA_WRITERS_CACHE_EVICT_MS,
+        TableProperties.PARTITIONED_DELTA_WRITERS_CACHE_EVICT_MS_DEFAULT);
+    initWritersCache(writersCacheSize, evictionTimeout);
+  }
+
+  private synchronized void initWritersCache(int writersCacheSize, long evictionTimeout) {

Review comment:
       Yes, this is a legacy modification from before, I think it can be removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xloya commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
xloya commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r801211740



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -19,19 +19,52 @@
 
 package org.apache.iceberg.io;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
 
 public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
-  private final Map<PartitionKey, RollingFileWriter> writers = Maps.newHashMap();
+  private Cache<PartitionKey, RollingFileWriter> writers;
 
   protected PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
-                          OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+                          OutputFileFactory fileFactory, FileIO io,
+                          long targetFileSize, Map<String, String> properties) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+    int writersCacheSize = PropertyUtil.propertyAsInt(
+        properties,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_SIZE,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_SIZE_DEFAULT);
+    long evictionTimeout = PropertyUtil.propertyAsLong(
+        properties,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_EVICT_MS,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_EVICT_MS_DEFAULT);
+    initWritersCache(writersCacheSize, evictionTimeout);
+  }
+
+  private synchronized void initWritersCache(int writersCacheSize, long evictionTimeout) {
+    if (writers == null) {
+      writers = Caffeine.newBuilder()
+          .maximumSize(writersCacheSize)
+          .expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS)
+          .removalListener((key, value, cause) -> {
+            try {
+              ((RollingFileWriter) value).close();
+            } catch (IOException e) {
+              throw new UncheckedIOException("Failed to close rolling file writer", e);

Review comment:
       Yes, but according to our previous implementation of Writers' close, it will also be swallowed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r794851693



##########
File path: spark/v2.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
##########
@@ -275,7 +275,7 @@ public void writePartitionedLegacyFanoutDataWriter(Blackhole blackhole) throws I
     TaskWriter<InternalRow> writer = new SparkPartitionedFanoutWriter(
         partitionedSpec, fileFormat(), appenders,
         fileFactory, io, TARGET_FILE_SIZE_IN_BYTES,
-        writeSchema, sparkWriteType);
+        writeSchema, sparkWriteType, table().properties());

Review comment:
       Can you add a default so that you don't need to change so many test and benchmark files?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r794850008



##########
File path: flink/v1.13/build.gradle
##########
@@ -40,6 +40,8 @@ project(':iceberg-flink:iceberg-flink-1.13') {
     implementation project(':iceberg-parquet')
     implementation project(':iceberg-hive-metastore')
 
+    implementation "com.github.ben-manes.caffeine:caffeine"

Review comment:
       Please change just the latest Flink version. We'll backport the changes after they are reviewed and committed. That avoids needing to review the same code 3 times.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r796157999



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -19,19 +19,52 @@
 
 package org.apache.iceberg.io;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
 
 public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
-  private final Map<PartitionKey, RollingFileWriter> writers = Maps.newHashMap();
+  private Cache<PartitionKey, RollingFileWriter> writers;
 
   protected PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
-                          OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+                          OutputFileFactory fileFactory, FileIO io,
+                          long targetFileSize, Map<String, String> properties) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+    int writersCacheSize = PropertyUtil.propertyAsInt(
+        properties,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_SIZE,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_SIZE_DEFAULT);
+    long evictionTimeout = PropertyUtil.propertyAsLong(
+        properties,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_EVICT_MS,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_EVICT_MS_DEFAULT);
+    initWritersCache(writersCacheSize, evictionTimeout);
+  }
+
+  private synchronized void initWritersCache(int writersCacheSize, long evictionTimeout) {
+    if (writers == null) {
+      writers = Caffeine.newBuilder()
+          .maximumSize(writersCacheSize)
+          .expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS)
+          .removalListener((key, value, cause) -> {

Review comment:
       This would instantiate a thread pool and evicted items would enter a queue to go in there.
   
   So I believe the removalListener would be run in a background thread instantiated by Caffeine.
   
   ```
   You may specify a removal listener for your cache to perform some operation when an entry is removed,
   via Caffeine.removalListener(RemovalListener).
   
   These operations are executed asynchronously using an Executor, where the default executor is
   ForkJoinPool.commonPool() and can be overridden via Caffeine.executor(Executor).
   ```
   
   But I'll take a further look into the details, especially with mutating the value this way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r796157999



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -19,19 +19,52 @@
 
 package org.apache.iceberg.io;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
 
 public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
-  private final Map<PartitionKey, RollingFileWriter> writers = Maps.newHashMap();
+  private Cache<PartitionKey, RollingFileWriter> writers;
 
   protected PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
-                          OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+                          OutputFileFactory fileFactory, FileIO io,
+                          long targetFileSize, Map<String, String> properties) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+    int writersCacheSize = PropertyUtil.propertyAsInt(
+        properties,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_SIZE,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_SIZE_DEFAULT);
+    long evictionTimeout = PropertyUtil.propertyAsLong(
+        properties,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_EVICT_MS,
+        TableProperties.PARTITIONED_FANOUT_WRITERS_CACHE_EVICT_MS_DEFAULT);
+    initWritersCache(writersCacheSize, evictionTimeout);
+  }
+
+  private synchronized void initWritersCache(int writersCacheSize, long evictionTimeout) {
+    if (writers == null) {
+      writers = Caffeine.newBuilder()
+          .maximumSize(writersCacheSize)
+          .expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS)
+          .removalListener((key, value, cause) -> {

Review comment:
       This would instantiate a thread pool and evicted items would enter a queue to go in there.
   
   So I believe the removalListener would be run in a background thread instantiated by Caffeine.
   
   But I'll take a further look into the details, especially with mutating the value this way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #3977: [Core][Spark][Flink] Change partitioned fanout/delta writers map to caffine cache

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #3977:
URL: https://github.com/apache/iceberg/pull/3977#discussion_r796159827



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -60,11 +93,18 @@ public void write(T row) throws IOException {
 
   @Override
   public void close() throws IOException {
-    if (!writers.isEmpty()) {
-      for (PartitionKey key : writers.keySet()) {
-        writers.get(key).close();
+    ConcurrentMap<PartitionKey, RollingFileWriter> writersMap = writers.asMap();
+    if (writersMap.size() > 0) {
+      // close all remaining rolling file writers
+      try {
+        Tasks.foreach(writersMap.values())
+            .throwFailureWhenFinished()
+            .noRetry()
+            .run(RollingFileWriter::close, IOException.class);
+      } catch (IOException e) {
+        throw new UncheckedIOException("Failed to close rolling file writer", e);
       }
-      writers.clear();
     }
+    writers.invalidateAll();

Review comment:
       Yes I believe it would.
   
   1) The `Tasks.forEach` would call close on the values, but I don't think that Caffeine would know they were closed though in some corner 
   2) The removalListener is going to be called on `invalidateAll`. https://github.com/ben-manes/caffeine/wiki/Removal
   
   If it's ok to call `close` twice, then technically it might be ok (although a bit confusing to read). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org