You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/03/13 16:35:56 UTC
[iceberg] branch master updated: Core: Fix NPE after HadoopMetricsContext deserialization (#4309)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 4d8f8cb Core: Fix NPE after HadoopMetricsContext deserialization (#4309)
4d8f8cb is described below
commit 4d8f8cbc91d167b1bc87c7df1efa12dac8bddadf
Author: Prashant Singh <35...@users.noreply.github.com>
AuthorDate: Sun Mar 13 22:05:47 2022 +0530
Core: Fix NPE after HadoopMetricsContext deserialization (#4309)
---
.../iceberg/hadoop/HadoopMetricsContext.java | 28 +++++------
.../TestHadoopMetricsContextSerialization.java | 56 ++++++++++++++++++++++
2 files changed, 70 insertions(+), 14 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopMetricsContext.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopMetricsContext.java
index ef3a78e..3591226 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopMetricsContext.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopMetricsContext.java
@@ -19,9 +19,6 @@
package org.apache.iceberg.hadoop;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.hadoop.fs.FileSystem;
@@ -36,7 +33,7 @@ public class HadoopMetricsContext implements FileIOMetricsContext {
public static final String SCHEME = "io.metrics-scheme";
private String scheme;
- private transient FileSystem.Statistics statistics;
+ private transient volatile FileSystem.Statistics statistics;
public HadoopMetricsContext(String scheme) {
ValidationException.check(scheme != null,
@@ -71,16 +68,16 @@ public class HadoopMetricsContext implements FileIOMetricsContext {
switch (name) {
case READ_BYTES:
ValidationException.check(type == Long.class, "'%s' requires Long type", READ_BYTES);
- return (Counter<T>) longCounter(statistics::incrementBytesRead);
+ return (Counter<T>) longCounter(statistics()::incrementBytesRead);
case READ_OPERATIONS:
ValidationException.check(type == Integer.class, "'%s' requires Integer type", READ_OPERATIONS);
- return (Counter<T>) integerCounter(statistics::incrementReadOps);
+ return (Counter<T>) integerCounter(statistics()::incrementReadOps);
case WRITE_BYTES:
ValidationException.check(type == Long.class, "'%s' requires Long type", WRITE_BYTES);
- return (Counter<T>) longCounter(statistics::incrementBytesWritten);
+ return (Counter<T>) longCounter(statistics()::incrementBytesWritten);
case WRITE_OPERATIONS:
ValidationException.check(type == Integer.class, "'%s' requires Integer type", WRITE_OPERATIONS);
- return (Counter<T>) integerCounter(statistics::incrementWriteOps);
+ return (Counter<T>) integerCounter(statistics()::incrementWriteOps);
default:
throw new IllegalArgumentException(String.format("Unsupported counter: '%s'", name));
}
@@ -114,12 +111,15 @@ public class HadoopMetricsContext implements FileIOMetricsContext {
};
}
- private void writeObject(ObjectOutputStream out) throws IOException {
- out.defaultWriteObject();
- }
+ private FileSystem.Statistics statistics() {
+ if (statistics == null) {
+ synchronized (this) {
+ if (statistics == null) {
+ this.statistics = FileSystem.getStatistics(scheme, null);
+ }
+ }
+ }
- private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
- in.defaultReadObject();
- statistics = FileSystem.getStatistics(scheme, null);
+ return statistics;
}
}
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/TestHadoopMetricsContextSerialization.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/TestHadoopMetricsContextSerialization.java
new file mode 100644
index 0000000..e1003a1
--- /dev/null
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/TestHadoopMetricsContextSerialization.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.IOException;
+import org.apache.iceberg.hadoop.HadoopMetricsContext;
+import org.apache.iceberg.io.FileIOMetricsContext;
+import org.apache.iceberg.metrics.MetricsContext;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.Test;
+
+public class TestHadoopMetricsContextSerialization {
+
+ @Test(expected = Test.None.class)
+ public void testHadoopMetricsContextKryoSerialization() throws IOException {
+ MetricsContext metricsContext = new HadoopMetricsContext("s3");
+
+ metricsContext.initialize(Maps.newHashMap());
+
+ MetricsContext deserializedMetricContext = KryoHelpers.roundTripSerialize(metricsContext);
+ // statistics are properly re-initialized post de-serialization
+ deserializedMetricContext
+ .counter(FileIOMetricsContext.WRITE_BYTES, Long.class, MetricsContext.Unit.BYTES)
+ .increment();
+ }
+
+ @Test(expected = Test.None.class)
+ public void testHadoopMetricsContextJavaSerialization() throws IOException, ClassNotFoundException {
+ MetricsContext metricsContext = new HadoopMetricsContext("s3");
+
+ metricsContext.initialize(Maps.newHashMap());
+
+ MetricsContext deserializedMetricContext = TestHelpers.roundTripSerialize(metricsContext);
+ // statistics are properly re-initialized post de-serialization
+ deserializedMetricContext
+ .counter(FileIOMetricsContext.WRITE_BYTES, Long.class, MetricsContext.Unit.BYTES)
+ .increment();
+ }
+}