You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/03/13 16:35:56 UTC

[iceberg] branch master updated: Core: Fix NPE after HadoopMetricsContext deserialization (#4309)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d8f8cb  Core: Fix NPE after HadoopMetricsContext deserialization (#4309)
4d8f8cb is described below

commit 4d8f8cbc91d167b1bc87c7df1efa12dac8bddadf
Author: Prashant Singh <35...@users.noreply.github.com>
AuthorDate: Sun Mar 13 22:05:47 2022 +0530

    Core: Fix NPE after HadoopMetricsContext deserialization (#4309)
---
 .../iceberg/hadoop/HadoopMetricsContext.java       | 28 +++++------
 .../TestHadoopMetricsContextSerialization.java     | 56 ++++++++++++++++++++++
 2 files changed, 70 insertions(+), 14 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopMetricsContext.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopMetricsContext.java
index ef3a78e..3591226 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopMetricsContext.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopMetricsContext.java
@@ -19,9 +19,6 @@
 
 package org.apache.iceberg.hadoop;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.util.Map;
 import java.util.function.Consumer;
 import org.apache.hadoop.fs.FileSystem;
@@ -36,7 +33,7 @@ public class HadoopMetricsContext implements FileIOMetricsContext {
   public static final String SCHEME = "io.metrics-scheme";
 
   private String scheme;
-  private transient FileSystem.Statistics statistics;
+  private transient volatile FileSystem.Statistics statistics;
 
   public HadoopMetricsContext(String scheme) {
     ValidationException.check(scheme != null,
@@ -71,16 +68,16 @@ public class HadoopMetricsContext implements FileIOMetricsContext {
     switch (name) {
       case READ_BYTES:
         ValidationException.check(type == Long.class, "'%s' requires Long type", READ_BYTES);
-        return (Counter<T>) longCounter(statistics::incrementBytesRead);
+        return (Counter<T>) longCounter(statistics()::incrementBytesRead);
       case READ_OPERATIONS:
         ValidationException.check(type == Integer.class, "'%s' requires Integer type", READ_OPERATIONS);
-        return (Counter<T>) integerCounter(statistics::incrementReadOps);
+        return (Counter<T>) integerCounter(statistics()::incrementReadOps);
       case WRITE_BYTES:
         ValidationException.check(type == Long.class, "'%s' requires Long type", WRITE_BYTES);
-        return (Counter<T>) longCounter(statistics::incrementBytesWritten);
+        return (Counter<T>) longCounter(statistics()::incrementBytesWritten);
       case WRITE_OPERATIONS:
         ValidationException.check(type == Integer.class, "'%s' requires Integer type", WRITE_OPERATIONS);
-        return (Counter<T>) integerCounter(statistics::incrementWriteOps);
+        return (Counter<T>) integerCounter(statistics()::incrementWriteOps);
       default:
         throw new IllegalArgumentException(String.format("Unsupported counter: '%s'", name));
     }
@@ -114,12 +111,15 @@ public class HadoopMetricsContext implements FileIOMetricsContext {
     };
   }
 
-  private void writeObject(ObjectOutputStream out) throws IOException {
-    out.defaultWriteObject();
-  }
+  private FileSystem.Statistics statistics() {
+    if (statistics == null) {
+      synchronized (this) {
+        if (statistics == null) {
+          this.statistics = FileSystem.getStatistics(scheme, null);
+        }
+      }
+    }
 
-  private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
-    in.defaultReadObject();
-    statistics = FileSystem.getStatistics(scheme, null);
+    return statistics;
   }
 }
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/TestHadoopMetricsContextSerialization.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/TestHadoopMetricsContextSerialization.java
new file mode 100644
index 0000000..e1003a1
--- /dev/null
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/TestHadoopMetricsContextSerialization.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.IOException;
+import org.apache.iceberg.hadoop.HadoopMetricsContext;
+import org.apache.iceberg.io.FileIOMetricsContext;
+import org.apache.iceberg.metrics.MetricsContext;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.Test;
+
+public class TestHadoopMetricsContextSerialization {
+
+  @Test(expected = Test.None.class)
+  public void testHadoopMetricsContextKryoSerialization() throws IOException {
+    MetricsContext metricsContext = new HadoopMetricsContext("s3");
+
+    metricsContext.initialize(Maps.newHashMap());
+
+    MetricsContext deserializedMetricContext = KryoHelpers.roundTripSerialize(metricsContext);
+    // statistics are properly re-initialized post de-serialization
+    deserializedMetricContext
+        .counter(FileIOMetricsContext.WRITE_BYTES, Long.class, MetricsContext.Unit.BYTES)
+        .increment();
+  }
+
+  @Test(expected = Test.None.class)
+  public void testHadoopMetricsContextJavaSerialization() throws IOException, ClassNotFoundException {
+    MetricsContext metricsContext = new HadoopMetricsContext("s3");
+
+    metricsContext.initialize(Maps.newHashMap());
+
+    MetricsContext deserializedMetricContext = TestHelpers.roundTripSerialize(metricsContext);
+    // statistics are properly re-initialized post de-serialization
+    deserializedMetricContext
+        .counter(FileIOMetricsContext.WRITE_BYTES, Long.class, MetricsContext.Unit.BYTES)
+        .increment();
+  }
+}