You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "ConeyLiu (via GitHub)" <gi...@apache.org> on 2023/06/07 09:42:16 UTC

[GitHub] [iceberg] ConeyLiu opened a new pull request, #7791: Core: Fixes memory leak caused by Avro decoder caching

ConeyLiu opened a new pull request, #7791:
URL: https://github.com/apache/iceberg/pull/7791

   Closes #5652
   
   This memory leak especially happens flink sink job which could lead there are many schema and decoders cached in memory and OOM in the end. Such as the following heap dumps:
   
   <img width="1505" alt="image" src="https://github.com/apache/iceberg/assets/12733256/5c13e5a3-1bce-41bc-9967-b13bb1ec90ff">
   
   
   Here are two problems at here:
   1.  The guava map uses identity to compare the key when using the weak key. Here are the details: https://github.com/google/guava/blob/master/guava/src/com/google/common/collect/MapMaker.java#LL58C39-L58C39. 
   2. Copied from #5652 
   
   > The DecoderResolver holds a ThreadLocal variable of a two-layer map. The outer map has a weak key while the inner map has a strong one. As the inner map holds a reference to a Schema object, the outer map holding the same weak reference to the Schema object will not release the weak key. That leads to the OOM.
   
   Here, we replace the weak hashmap (based on ConcurrenHashmap) with java.util.WeakHashMap for the following reasons:
   1. We already use ThreadLocal for the map, there is no need to use a ConcurrentHashmap.
   2. java.util.WeakHashMap uses both identity and equality to compare keys. https://docs.oracle.com/javase/8/docs/api/java/util/WeakHashMap.html
   3. Most cached we noticed are the schema of `ManifestEntry` or `ManifestFile`, both of which are subject to infrequent changes.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7791: Core: Fixes OOM caused by Avro decoder caching

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7791:
URL: https://github.com/apache/iceberg/pull/7791#discussion_r1240396863


##########
core/src/test/java/org/apache/iceberg/data/avro/TestDecoderResolver.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data.avro;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.avro.Schema;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.ResolvingDecoder;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDecoderResolver {

Review Comment:
   nice test



##########
core/src/test/java/org/apache/iceberg/data/avro/TestDecoderResolver.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data.avro;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.avro.Schema;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.ResolvingDecoder;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDecoderResolver {
+
+  @Before
+  public void before() {
+    DecoderResolver.DECODER_CACHES.get().clear();
+  }
+
+  @Test
+  public void testDecoderCachingReadSchemaSameAsFileSchema() throws Exception {
+    Decoder dummyDecoder = DecoderFactory.get().binaryDecoder(new byte[] {}, null);
+    Schema fileSchema = avroSchema();
+    ResolvingDecoder resolvingDecoder =
+        DecoderResolver.resolve(dummyDecoder, fileSchema, fileSchema);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(1);
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(fileSchema).size()).isEqualTo(1);
+    checkCached(fileSchema, fileSchema);
+
+    // Equal but new one
+    Schema fileSchema1 = avroSchema();
+    assertThat(fileSchema1).isEqualTo(fileSchema);
+    ResolvingDecoder resolvingDecoder1 =
+        DecoderResolver.resolve(dummyDecoder, fileSchema1, fileSchema1);
+    assertThat(resolvingDecoder1).isNotSameAs(resolvingDecoder);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(2);
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(fileSchema1).size()).isEqualTo(1);
+    checkCached(fileSchema1, fileSchema1);
+
+    // New one
+    Schema fileSchema2 = avroSchema("manifest_path", "manifest_length");
+    ResolvingDecoder resolvingDecoder2 =
+        DecoderResolver.resolve(dummyDecoder, fileSchema2, fileSchema2);
+    assertThat(resolvingDecoder2).isNotSameAs(resolvingDecoder);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(3);
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(fileSchema2).size()).isEqualTo(1);
+    checkCached(fileSchema2, fileSchema2);
+
+    checkCachedSize(3);
+
+    fileSchema = null;
+    checkCachedSize(2);
+
+    fileSchema1 = null;
+    checkCachedSize(1);
+
+    fileSchema2 = null;
+    checkCachedSize(0);
+  }
+
+  @Test
+  public void testDecoderCachingReadSchemaNotSameAsFileSchema() throws Exception {
+    Decoder dummyDecoder = DecoderFactory.get().binaryDecoder(new byte[] {}, null);
+    Schema fileSchema = avroSchema();
+    Schema readSchema = avroSchema("manifest_path", "manifest_length", "partition_spec_id");
+    ResolvingDecoder resolvingDecoder =
+        DecoderResolver.resolve(dummyDecoder, readSchema, fileSchema);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(1);
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(readSchema).size()).isEqualTo(1);
+    checkCached(readSchema, fileSchema);
+
+    // Equal but new one
+    Schema fileSchema1 = avroSchema();
+    Schema readSchema1 = avroSchema("manifest_path", "manifest_length", "partition_spec_id");
+    assertThat(fileSchema1).isEqualTo(fileSchema);
+    assertThat(readSchema1).isEqualTo(readSchema);
+    ResolvingDecoder resolvingDecoder1 =
+        DecoderResolver.resolve(dummyDecoder, readSchema1, fileSchema1);
+    assertThat(resolvingDecoder1).isNotSameAs(resolvingDecoder);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(2);
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(readSchema1).size()).isEqualTo(1);
+    checkCached(readSchema1, fileSchema1);
+
+    // New read schema
+    Schema readSchema2 = avroSchema("manifest_path", "manifest_length");
+    ResolvingDecoder resolvingDecoder2 =
+        DecoderResolver.resolve(dummyDecoder, readSchema2, fileSchema);
+    assertThat(resolvingDecoder2).isNotSameAs(resolvingDecoder);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(3);
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(readSchema2).size()).isEqualTo(1);
+    checkCached(readSchema2, fileSchema);
+
+    checkCachedSize(3);
+
+    readSchema = null;
+    checkCachedSize(2);
+
+    readSchema1 = null;
+    checkCachedSize(1);
+
+    readSchema2 = null;
+    checkCachedSize(0);
+  }
+
+  private Schema avroSchema(String... columns) {
+    if (columns.length == 0) {
+      return AvroSchemaUtil.convert(ManifestFile.schema(), "manifest_file");
+    } else {
+      return AvroSchemaUtil.convert(ManifestFile.schema().select(columns), "manifest_file");
+    }
+  }
+
+  private void checkCached(Schema readSchema, Schema fileSchema) {
+    assertThat(DecoderResolver.DECODER_CACHES.get().containsKey(readSchema)).isTrue();

Review Comment:
   nit: should we change to style like `assertThat(DecoderResolver.DECODER_CACHES.get()).containsKey(readSchema)`



##########
core/src/test/java/org/apache/iceberg/data/avro/TestDecoderResolver.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data.avro;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.avro.Schema;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.ResolvingDecoder;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDecoderResolver {
+
+  @Before
+  public void before() {
+    DecoderResolver.DECODER_CACHES.get().clear();
+  }
+
+  @Test
+  public void testDecoderCachingReadSchemaSameAsFileSchema() throws Exception {
+    Decoder dummyDecoder = DecoderFactory.get().binaryDecoder(new byte[] {}, null);
+    Schema fileSchema = avroSchema();
+    ResolvingDecoder resolvingDecoder =
+        DecoderResolver.resolve(dummyDecoder, fileSchema, fileSchema);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(1);
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(fileSchema).size()).isEqualTo(1);
+    checkCached(fileSchema, fileSchema);
+
+    // Equal but new one
+    Schema fileSchema1 = avroSchema();
+    assertThat(fileSchema1).isEqualTo(fileSchema);
+    ResolvingDecoder resolvingDecoder1 =
+        DecoderResolver.resolve(dummyDecoder, fileSchema1, fileSchema1);
+    assertThat(resolvingDecoder1).isNotSameAs(resolvingDecoder);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(2);
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(fileSchema1).size()).isEqualTo(1);
+    checkCached(fileSchema1, fileSchema1);
+
+    // New one
+    Schema fileSchema2 = avroSchema("manifest_path", "manifest_length");
+    ResolvingDecoder resolvingDecoder2 =
+        DecoderResolver.resolve(dummyDecoder, fileSchema2, fileSchema2);
+    assertThat(resolvingDecoder2).isNotSameAs(resolvingDecoder);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(3);
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(fileSchema2).size()).isEqualTo(1);
+    checkCached(fileSchema2, fileSchema2);
+
+    checkCachedSize(3);
+
+    fileSchema = null;
+    checkCachedSize(2);
+
+    fileSchema1 = null;
+    checkCachedSize(1);
+
+    fileSchema2 = null;
+    checkCachedSize(0);
+  }
+
+  @Test
+  public void testDecoderCachingReadSchemaNotSameAsFileSchema() throws Exception {
+    Decoder dummyDecoder = DecoderFactory.get().binaryDecoder(new byte[] {}, null);
+    Schema fileSchema = avroSchema();
+    Schema readSchema = avroSchema("manifest_path", "manifest_length", "partition_spec_id");
+    ResolvingDecoder resolvingDecoder =
+        DecoderResolver.resolve(dummyDecoder, readSchema, fileSchema);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(1);
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(readSchema).size()).isEqualTo(1);
+    checkCached(readSchema, fileSchema);
+
+    // Equal but new one
+    Schema fileSchema1 = avroSchema();
+    Schema readSchema1 = avroSchema("manifest_path", "manifest_length", "partition_spec_id");
+    assertThat(fileSchema1).isEqualTo(fileSchema);
+    assertThat(readSchema1).isEqualTo(readSchema);
+    ResolvingDecoder resolvingDecoder1 =
+        DecoderResolver.resolve(dummyDecoder, readSchema1, fileSchema1);
+    assertThat(resolvingDecoder1).isNotSameAs(resolvingDecoder);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(2);
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(readSchema1).size()).isEqualTo(1);
+    checkCached(readSchema1, fileSchema1);
+
+    // New read schema
+    Schema readSchema2 = avroSchema("manifest_path", "manifest_length");
+    ResolvingDecoder resolvingDecoder2 =
+        DecoderResolver.resolve(dummyDecoder, readSchema2, fileSchema);
+    assertThat(resolvingDecoder2).isNotSameAs(resolvingDecoder);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(3);
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(readSchema2).size()).isEqualTo(1);
+    checkCached(readSchema2, fileSchema);
+
+    checkCachedSize(3);
+
+    readSchema = null;
+    checkCachedSize(2);
+
+    readSchema1 = null;
+    checkCachedSize(1);
+
+    readSchema2 = null;
+    checkCachedSize(0);
+  }
+
+  private Schema avroSchema(String... columns) {
+    if (columns.length == 0) {
+      return AvroSchemaUtil.convert(ManifestFile.schema(), "manifest_file");
+    } else {
+      return AvroSchemaUtil.convert(ManifestFile.schema().select(columns), "manifest_file");
+    }
+  }
+
+  private void checkCached(Schema readSchema, Schema fileSchema) {
+    assertThat(DecoderResolver.DECODER_CACHES.get().containsKey(readSchema)).isTrue();
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(readSchema).containsKey(fileSchema))
+        .isTrue();
+  }
+
+  private int getActualSize() {
+    // The size of keys included the GCed keys
+    Set<Schema> keys = DecoderResolver.DECODER_CACHES.get().keySet();
+    Set<Schema> identityKeys = Sets.newIdentityHashSet();
+    identityKeys.addAll(keys);
+    return identityKeys.size();
+  }
+
+  private void checkCachedSize(int expected) {

Review Comment:
   wondering if this can be flaky on CI builds where servers are quite often overloaded. we can keep an eye on this. not a concern right now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7791: Core: Fixes OOM caused by Avro decoder caching

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7791:
URL: https://github.com/apache/iceberg/pull/7791#discussion_r1222530986


##########
core/src/main/java/org/apache/iceberg/data/avro/DecoderResolver.java:
##########
@@ -49,11 +50,12 @@ public static <T> T resolveAndRead(
     return value;
   }
 
-  private static ResolvingDecoder resolve(Decoder decoder, Schema readSchema, Schema fileSchema)
+  @VisibleForTesting
+  static ResolvingDecoder resolve(Decoder decoder, Schema readSchema, Schema fileSchema)
       throws IOException {
     Map<Schema, Map<Schema, ResolvingDecoder>> cache = DECODER_CACHES.get();
     Map<Schema, ResolvingDecoder> fileSchemaToResolver =
-        cache.computeIfAbsent(readSchema, k -> Maps.newHashMap());
+        cache.computeIfAbsent(readSchema, k -> new WeakHashMap<>());

Review Comment:
   Change to `WeakHashMap` to solve the problems mentioned in problem 2.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on pull request #7791: Core: Fixes OOM caused by Avro decoder caching

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on PR #7791:
URL: https://github.com/apache/iceberg/pull/7791#issuecomment-1580554936

   Thanks @ConeyLiu for raising this, and sorry for the long wait. Did you confirm with the new `WeakHashMap` that the problem has been resolved?
   
   At Avro, we had a similar issue: https://lists.apache.org/thread/8q3g304thhjgsfk7d6l62w706y365616 And the fix: https://github.com/apache/avro/pull/2090/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on pull request #7791: Core: Fixes OOM caused by Avro decoder caching

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on PR #7791:
URL: https://github.com/apache/iceberg/pull/7791#issuecomment-1584161447

   @ConeyLiu: One last question: Did we run any performance tests to check how the changed caching effects the performance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on pull request #7791: Core: Fixes OOM caused by Avro decoder caching

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on PR #7791:
URL: https://github.com/apache/iceberg/pull/7791#issuecomment-1580559081

   Hmm, that looks unrelated since it caches the fields that are singleton, and not the schema itself. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on pull request #7791: Core: Fixes OOM caused by Avro decoder caching

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on PR #7791:
URL: https://github.com/apache/iceberg/pull/7791#issuecomment-1607069401

   Merged the change as there are 3 approvals, and no comments for quite a while.
   
   Thanks @ConeyLiu for the PR and @Fokko and @stevenzwu for the reviews!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary merged pull request #7791: Core: Fixes OOM caused by Avro decoder caching

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary merged PR #7791:
URL: https://github.com/apache/iceberg/pull/7791


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on pull request #7791: Core: Fixes OOM caused by Avro decoder caching

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on PR #7791:
URL: https://github.com/apache/iceberg/pull/7791#issuecomment-1607223843

   Thanks @pvary for merging this, and also thanks @Fokko @stevenzwu @pvary for reviewing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on pull request #7791: Core: Fixes OOM caused by Avro decoder caching

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on PR #7791:
URL: https://github.com/apache/iceberg/pull/7791#issuecomment-1580323565

   Hi @stevenzwu @nastra @rdblue @Fokko @aokolnychyi, could you help to review this when you are free? Thanks in advance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on pull request #7791: Core: Fixes OOM caused by Avro decoder caching

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on PR #7791:
URL: https://github.com/apache/iceberg/pull/7791#issuecomment-1581980893

   Thanks @Fokko for the review. 
   
   > Did you confirm with the new WeakHashMap that the problem has been resolved?
   
   It needs to run several weeks or months to trigger the problem. I tested locally and the caching works as expected.
   
   > Do you think it is possible to add a check to see if the HashMap actually removes objects that are garbage collected? It would be nice to check its behavior now, but also we make sure that we keep this behavior in the future (it was broken in Avro along the way).
   
   Add the new UTs to cover this. Please take another look when you are free.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #7791: Core: Fixes OOM caused by Avro decoder caching

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on code in PR #7791:
URL: https://github.com/apache/iceberg/pull/7791#discussion_r1223455414


##########
core/src/test/java/org/apache/iceberg/data/avro/TestDecoderResolver.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data.avro;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.ResolvingDecoder;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDecoderResolver {
+
+  @Before
+  public void before() {
+    DecoderResolver.DECODER_CACHES.get().clear();
+  }
+
+  @Test
+  public void testDecoderCachingReadSchemaSameAsFileSchema() throws Exception {
+    Decoder dummyDecoder = DecoderFactory.get().binaryDecoder(new byte[] {}, null);
+    Schema fileSchema = avroSchema();
+    ResolvingDecoder resolvingDecoder =
+        DecoderResolver.resolve(dummyDecoder, fileSchema, fileSchema);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(1);
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(fileSchema).size()).isEqualTo(1);
+    checkCached(fileSchema, fileSchema);
+
+    // Equal but new one
+    Schema fileSchema1 = avroSchema();
+    assertThat(fileSchema1).isEqualTo(fileSchema);
+    ResolvingDecoder resolvingDecoder1 =
+        DecoderResolver.resolve(dummyDecoder, fileSchema1, fileSchema1);
+    assertThat(resolvingDecoder1).isSameAs(resolvingDecoder);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(1);
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(fileSchema1).size()).isEqualTo(1);
+    checkCached(fileSchema1, fileSchema1);
+
+    // New one
+    Schema fileSchema2 = avroSchema("manifest_path", "manifest_length");
+    ResolvingDecoder resolvingDecoder2 =
+        DecoderResolver.resolve(dummyDecoder, fileSchema2, fileSchema2);
+    assertThat(resolvingDecoder2).isNotSameAs(resolvingDecoder);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(2);
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(fileSchema2).size()).isEqualTo(1);
+    checkCached(fileSchema2, fileSchema2);
+
+    fileSchema = null;
+    System.gc();
+    // Wait the weak reference keys are GCed
+    Thread.sleep(1000);

Review Comment:
   nit: Do we want to use Awaitility, like mentioned here: #7773?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on pull request #7791: Core: Fixes OOM caused by Avro decoder caching

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on PR #7791:
URL: https://github.com/apache/iceberg/pull/7791#issuecomment-1585492658

   @pvary Thanks for pointing that out. This is really needed and found the potential problems.
   
   Previously, we use `java.util.WeakHashMap` to replace the `MapMaker().weakKeys().makeMap()`, while the key equally checking is really slow compared with identity. The result is as follows:
   ```
   // java.util.WeakHashMap
   Benchmark                               Mode  Cnt   Score   Error  Units
   ManifestReadBenchmark.readManifestFile    ss    5  12.896 ± 1.155   s/op
   
   // MapMaker().weakKeys().makeMap()
   Benchmark                               Mode  Cnt  Score   Error  Units
   ManifestReadBenchmark.readManifestFile    ss    5  6.180 ± 0.067   s/op
   ```
   
   Because for each avro record, we need to call the [DecoderResolver.resolveAndRead](https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java#L72). So I keep the top map with `MapMaker().weakKeys().makeMap()` while the inner map changed to `java.util.WeakHashMap`.
   Here is the benchmarks:
   ```
   // Master
   Benchmark                               Mode  Cnt   Score   Error  Units
   ManifestReadBenchmark.readManifestFile    ss    5  12.896 ± 1.155   s/op
   
   // This change
   Benchmark                               Mode  Cnt  Score   Error  Units
   ManifestReadBenchmark.readManifestFile    ss    5  6.190 ± 0.069   s/op
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7791: Core: Fixes OOM caused by Avro decoder caching

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7791:
URL: https://github.com/apache/iceberg/pull/7791#discussion_r1223797626


##########
core/src/test/java/org/apache/iceberg/data/avro/TestDecoderResolver.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data.avro;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.ResolvingDecoder;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDecoderResolver {
+
+  @Before
+  public void before() {
+    DecoderResolver.DECODER_CACHES.get().clear();
+  }
+
+  @Test
+  public void testDecoderCachingReadSchemaSameAsFileSchema() throws Exception {
+    Decoder dummyDecoder = DecoderFactory.get().binaryDecoder(new byte[] {}, null);
+    Schema fileSchema = avroSchema();
+    ResolvingDecoder resolvingDecoder =
+        DecoderResolver.resolve(dummyDecoder, fileSchema, fileSchema);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(1);
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(fileSchema).size()).isEqualTo(1);
+    checkCached(fileSchema, fileSchema);
+
+    // Equal but new one
+    Schema fileSchema1 = avroSchema();
+    assertThat(fileSchema1).isEqualTo(fileSchema);
+    ResolvingDecoder resolvingDecoder1 =
+        DecoderResolver.resolve(dummyDecoder, fileSchema1, fileSchema1);
+    assertThat(resolvingDecoder1).isSameAs(resolvingDecoder);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(1);
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(fileSchema1).size()).isEqualTo(1);
+    checkCached(fileSchema1, fileSchema1);
+
+    // New one
+    Schema fileSchema2 = avroSchema("manifest_path", "manifest_length");
+    ResolvingDecoder resolvingDecoder2 =
+        DecoderResolver.resolve(dummyDecoder, fileSchema2, fileSchema2);
+    assertThat(resolvingDecoder2).isNotSameAs(resolvingDecoder);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(2);
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(fileSchema2).size()).isEqualTo(1);
+    checkCached(fileSchema2, fileSchema2);
+
+    fileSchema = null;
+    System.gc();
+    // Wait the weak reference keys are GCed
+    Thread.sleep(1000);

Review Comment:
   Thanks @pvary for reviewing. Replaced with Awaitility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on pull request #7791: Core: Fixes OOM caused by Avro decoder caching

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on PR #7791:
URL: https://github.com/apache/iceberg/pull/7791#issuecomment-1585501651

   Actually, I am not sure why we need [DecoderResolver.resolveAndRead](https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java#L72) for each record reading. Maybe we could create a `ResolvingDecoder` for a given AvroReader instance because for each instance the [readSchema](https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java#L35) and [fileSchema](https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java#L55) are unchanged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7791: Core: Fixes OOM caused by Avro decoder caching

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7791:
URL: https://github.com/apache/iceberg/pull/7791#discussion_r1240584082


##########
core/src/test/java/org/apache/iceberg/data/avro/TestDecoderResolver.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data.avro;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.avro.Schema;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.ResolvingDecoder;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDecoderResolver {
+
+  @Before
+  public void before() {
+    DecoderResolver.DECODER_CACHES.get().clear();
+  }
+
+  @Test
+  public void testDecoderCachingReadSchemaSameAsFileSchema() throws Exception {
+    Decoder dummyDecoder = DecoderFactory.get().binaryDecoder(new byte[] {}, null);
+    Schema fileSchema = avroSchema();
+    ResolvingDecoder resolvingDecoder =
+        DecoderResolver.resolve(dummyDecoder, fileSchema, fileSchema);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(1);
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(fileSchema).size()).isEqualTo(1);
+    checkCached(fileSchema, fileSchema);
+
+    // Equal but new one
+    Schema fileSchema1 = avroSchema();
+    assertThat(fileSchema1).isEqualTo(fileSchema);
+    ResolvingDecoder resolvingDecoder1 =
+        DecoderResolver.resolve(dummyDecoder, fileSchema1, fileSchema1);
+    assertThat(resolvingDecoder1).isNotSameAs(resolvingDecoder);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(2);
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(fileSchema1).size()).isEqualTo(1);
+    checkCached(fileSchema1, fileSchema1);
+
+    // New one
+    Schema fileSchema2 = avroSchema("manifest_path", "manifest_length");
+    ResolvingDecoder resolvingDecoder2 =
+        DecoderResolver.resolve(dummyDecoder, fileSchema2, fileSchema2);
+    assertThat(resolvingDecoder2).isNotSameAs(resolvingDecoder);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(3);
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(fileSchema2).size()).isEqualTo(1);
+    checkCached(fileSchema2, fileSchema2);
+
+    checkCachedSize(3);
+
+    fileSchema = null;
+    checkCachedSize(2);
+
+    fileSchema1 = null;
+    checkCachedSize(1);
+
+    fileSchema2 = null;
+    checkCachedSize(0);
+  }
+
+  @Test
+  public void testDecoderCachingReadSchemaNotSameAsFileSchema() throws Exception {
+    Decoder dummyDecoder = DecoderFactory.get().binaryDecoder(new byte[] {}, null);
+    Schema fileSchema = avroSchema();
+    Schema readSchema = avroSchema("manifest_path", "manifest_length", "partition_spec_id");
+    ResolvingDecoder resolvingDecoder =
+        DecoderResolver.resolve(dummyDecoder, readSchema, fileSchema);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(1);
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(readSchema).size()).isEqualTo(1);
+    checkCached(readSchema, fileSchema);
+
+    // Equal but new one
+    Schema fileSchema1 = avroSchema();
+    Schema readSchema1 = avroSchema("manifest_path", "manifest_length", "partition_spec_id");
+    assertThat(fileSchema1).isEqualTo(fileSchema);
+    assertThat(readSchema1).isEqualTo(readSchema);
+    ResolvingDecoder resolvingDecoder1 =
+        DecoderResolver.resolve(dummyDecoder, readSchema1, fileSchema1);
+    assertThat(resolvingDecoder1).isNotSameAs(resolvingDecoder);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(2);
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(readSchema1).size()).isEqualTo(1);
+    checkCached(readSchema1, fileSchema1);
+
+    // New read schema
+    Schema readSchema2 = avroSchema("manifest_path", "manifest_length");
+    ResolvingDecoder resolvingDecoder2 =
+        DecoderResolver.resolve(dummyDecoder, readSchema2, fileSchema);
+    assertThat(resolvingDecoder2).isNotSameAs(resolvingDecoder);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(3);
+    assertThat(DecoderResolver.DECODER_CACHES.get().get(readSchema2).size()).isEqualTo(1);
+    checkCached(readSchema2, fileSchema);
+
+    checkCachedSize(3);
+
+    readSchema = null;
+    checkCachedSize(2);
+
+    readSchema1 = null;
+    checkCachedSize(1);
+
+    readSchema2 = null;
+    checkCachedSize(0);
+  }
+
+  private Schema avroSchema(String... columns) {
+    if (columns.length == 0) {
+      return AvroSchemaUtil.convert(ManifestFile.schema(), "manifest_file");
+    } else {
+      return AvroSchemaUtil.convert(ManifestFile.schema().select(columns), "manifest_file");
+    }
+  }
+
+  private void checkCached(Schema readSchema, Schema fileSchema) {
+    assertThat(DecoderResolver.DECODER_CACHES.get().containsKey(readSchema)).isTrue();

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org