You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/12/18 23:23:55 UTC

[iceberg] branch master updated: Core: Allow configuring metrics reporter impl via Catalog property (#6404)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 59d0dee025 Core: Allow configuring metrics reporter impl via Catalog property (#6404)
59d0dee025 is described below

commit 59d0dee02501570bca198ad007d9658fdfe08457
Author: Eduard Tudenhöfner <et...@gmail.com>
AuthorDate: Mon Dec 19 00:23:50 2022 +0100

    Core: Allow configuring metrics reporter impl via Catalog property (#6404)
---
 .../iceberg/metrics/LoggingMetricsReporter.java    |  5 ++
 .../java/org/apache/iceberg/CatalogProperties.java |  1 +
 .../main/java/org/apache/iceberg/CatalogUtil.java  | 39 +++++++++++++++
 .../apache/iceberg/rest/RESTSessionCatalog.java    |  8 ++-
 .../java/org/apache/iceberg/TestCatalogUtil.java   | 45 +++++++++++++++++
 .../org/apache/iceberg/rest/TestRESTCatalog.java   | 57 ++++++++++++++++++++++
 6 files changed, 153 insertions(+), 2 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/metrics/LoggingMetricsReporter.java b/api/src/main/java/org/apache/iceberg/metrics/LoggingMetricsReporter.java
index 26e9a0b527..8d41f10e74 100644
--- a/api/src/main/java/org/apache/iceberg/metrics/LoggingMetricsReporter.java
+++ b/api/src/main/java/org/apache/iceberg/metrics/LoggingMetricsReporter.java
@@ -28,6 +28,11 @@ import org.slf4j.LoggerFactory;
  */
 public class LoggingMetricsReporter implements MetricsReporter {
   private static final Logger LOG = LoggerFactory.getLogger(LoggingMetricsReporter.class);
+  private static final LoggingMetricsReporter INSTANCE = new LoggingMetricsReporter();
+
+  public static LoggingMetricsReporter instance() {
+    return INSTANCE;
+  }
 
   @Override
   public void report(MetricsReport report) {
diff --git a/core/src/main/java/org/apache/iceberg/CatalogProperties.java b/core/src/main/java/org/apache/iceberg/CatalogProperties.java
index 95fe6a074c..c2490ee3ea 100644
--- a/core/src/main/java/org/apache/iceberg/CatalogProperties.java
+++ b/core/src/main/java/org/apache/iceberg/CatalogProperties.java
@@ -29,6 +29,7 @@ public class CatalogProperties {
   public static final String WAREHOUSE_LOCATION = "warehouse";
   public static final String TABLE_DEFAULT_PREFIX = "table-default.";
   public static final String TABLE_OVERRIDE_PREFIX = "table-override.";
+  public static final String METRICS_REPORTER_IMPL = "metrics-reporter-impl";
 
   /**
    * Controls whether the catalog will cache table entries upon load.
diff --git a/core/src/main/java/org/apache/iceberg/CatalogUtil.java b/core/src/main/java/org/apache/iceberg/CatalogUtil.java
index c0c5078a3f..ef4d17c249 100644
--- a/core/src/main/java/org/apache/iceberg/CatalogUtil.java
+++ b/core/src/main/java/org/apache/iceberg/CatalogUtil.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.hadoop.Configurable;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.metrics.MetricsReporter;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -400,4 +401,42 @@ public class CatalogUtil {
 
     setConf.invoke(conf);
   }
+
+  /**
+   * Load a custom {@link MetricsReporter} implementation.
+   *
+   * <p>The implementation must have a no-arg constructor.
+   *
+   * @param impl full class name of a custom {@link MetricsReporter} implementation
+   * @return An initialized {@link MetricsReporter}.
+   * @throws IllegalArgumentException if class path not found or right constructor not found or the
+   *     loaded class cannot be cast to the given interface type
+   */
+  public static MetricsReporter loadMetricsReporter(String impl) {
+    LOG.info("Loading custom MetricsReporter implementation: {}", impl);
+    DynConstructors.Ctor<MetricsReporter> ctor;
+    try {
+      ctor =
+          DynConstructors.builder(MetricsReporter.class)
+              .loader(CatalogUtil.class.getClassLoader())
+              .impl(impl)
+              .buildChecked();
+    } catch (NoSuchMethodException e) {
+      throw new IllegalArgumentException(
+          String.format("Cannot initialize MetricsReporter, missing no-arg constructor: %s", impl),
+          e);
+    }
+
+    MetricsReporter reporter;
+    try {
+      reporter = ctor.newInstance();
+    } catch (ClassCastException e) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Cannot initialize MetricsReporter, %s does not implement MetricsReporter.", impl),
+          e);
+    }
+
+    return reporter;
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index 0b4edc9a84..d4757c4ed5 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -177,7 +177,11 @@ public class RESTSessionCatalog extends BaseSessionCatalog
     this.io =
         CatalogUtil.loadFileIO(
             ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(), mergedProps, conf);
-    this.reporter = new LoggingMetricsReporter();
+    String metricsReporterImpl = mergedProps.get(CatalogProperties.METRICS_REPORTER_IMPL);
+    this.reporter =
+        null != metricsReporterImpl
+            ? CatalogUtil.loadMetricsReporter(metricsReporterImpl)
+            : LoggingMetricsReporter.instance();
 
     super.initialize(name, mergedProps);
   }
@@ -316,8 +320,8 @@ public class RESTSessionCatalog extends BaseSessionCatalog
       TableIdentifier tableIdentifier,
       MetricsReport report,
       Supplier<Map<String, String>> headers) {
-    reporter.report(report);
     try {
+      reporter.report(report);
       client.post(
           paths.metrics(tableIdentifier),
           ReportMetricsRequest.of(report),
diff --git a/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java b/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java
index a31cf51b68..86e4a2c837 100644
--- a/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java
+++ b/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java
@@ -29,6 +29,8 @@ import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.assertj.core.api.Assertions;
 import org.junit.Assert;
@@ -187,6 +189,32 @@ public class TestCatalogUtil {
         () -> CatalogUtil.buildIcebergCatalog(name, options, hadoopConf));
   }
 
+  @Test
+  public void loadCustomMetricsReporter_noArg() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("key", "val");
+
+    MetricsReporter metricsReporter =
+        CatalogUtil.loadMetricsReporter(TestMetricsReporterDefault.class.getName());
+    Assertions.assertThat(metricsReporter).isInstanceOf(TestMetricsReporterDefault.class);
+  }
+
+  @Test
+  public void loadCustomMetricsReporter_badArg() {
+    Assertions.assertThatThrownBy(
+            () -> CatalogUtil.loadMetricsReporter(TestMetricsReporterBadArg.class.getName()))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("missing no-arg constructor");
+  }
+
+  @Test
+  public void loadCustomMetricsReporter_badClass() {
+    Assertions.assertThatThrownBy(
+            () -> CatalogUtil.loadMetricsReporter(TestFileIONotImpl.class.getName()))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("does not implement MetricsReporter");
+  }
+
   public static class TestCatalog extends BaseMetastoreCatalog {
 
     private String catalogName;
@@ -399,4 +427,21 @@ public class TestCatalogUtil {
   public static class TestFileIONotImpl {
     public TestFileIONotImpl() {}
   }
+
+  public static class TestMetricsReporterBadArg implements MetricsReporter {
+    private final String arg;
+
+    public TestMetricsReporterBadArg(String arg) {
+      this.arg = arg;
+    }
+
+    @Override
+    public void report(MetricsReport report) {}
+  }
+
+  public static class TestMetricsReporterDefault implements MetricsReporter {
+
+    @Override
+    public void report(MetricsReport report) {}
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
index 9703b4c477..2d1133e9c4 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
@@ -31,16 +31,23 @@ import java.io.IOException;
 import java.nio.file.Path;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.CatalogTests;
 import org.apache.iceberg.catalog.SessionCatalog;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.jdbc.JdbcCatalog;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.rest.RESTCatalogAdapter.HTTPMethod;
 import org.apache.iceberg.rest.responses.ConfigResponse;
@@ -1075,4 +1082,54 @@ public class TestRESTCatalog extends CatalogTests<RESTCatalog> {
             eq(refreshedCatalogHeader),
             any());
   }
+
+  @Test
+  public void testCatalogWithCustomMetricsReporter() throws IOException {
+    this.restCatalog =
+        new RESTCatalog(
+            new SessionCatalog.SessionContext(
+                UUID.randomUUID().toString(),
+                "user",
+                ImmutableMap.of("credential", "user:12345"),
+                ImmutableMap.of()),
+            (config) -> HTTPClient.builder().uri(config.get(CatalogProperties.URI)).build());
+    restCatalog.setConf(new Configuration());
+    restCatalog.initialize(
+        "prod",
+        ImmutableMap.of(
+            CatalogProperties.URI,
+            "http://localhost:8181/",
+            "credential",
+            "catalog:12345",
+            CatalogProperties.METRICS_REPORTER_IMPL,
+            CustomMetricsReporter.class.getName()));
+
+    restCatalog.buildTable(TABLE, SCHEMA).create();
+    Table table = restCatalog.loadTable(TABLE);
+    table
+        .newFastAppend()
+        .appendFile(
+            DataFiles.builder(PartitionSpec.unpartitioned())
+                .withPath("/path/to/data-a.parquet")
+                .withFileSizeInBytes(10)
+                .withRecordCount(2)
+                .build())
+        .commit();
+
+    try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+      assertThat(tasks.iterator()).hasNext();
+    }
+
+    // counter of custom metrics reporter should have been increased
+    assertThat(CustomMetricsReporter.COUNTER.get()).isEqualTo(1);
+  }
+
+  public static class CustomMetricsReporter implements MetricsReporter {
+    static final AtomicInteger COUNTER = new AtomicInteger(0);
+
+    @Override
+    public void report(MetricsReport report) {
+      COUNTER.incrementAndGet();
+    }
+  }
 }