You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/12/18 23:23:55 UTC
[iceberg] branch master updated: Core: Allow configuring metrics reporter impl via Catalog property (#6404)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 59d0dee025 Core: Allow configuring metrics reporter impl via Catalog property (#6404)
59d0dee025 is described below
commit 59d0dee02501570bca198ad007d9658fdfe08457
Author: Eduard Tudenhöfner <et...@gmail.com>
AuthorDate: Mon Dec 19 00:23:50 2022 +0100
Core: Allow configuring metrics reporter impl via Catalog property (#6404)
---
.../iceberg/metrics/LoggingMetricsReporter.java | 5 ++
.../java/org/apache/iceberg/CatalogProperties.java | 1 +
.../main/java/org/apache/iceberg/CatalogUtil.java | 39 +++++++++++++++
.../apache/iceberg/rest/RESTSessionCatalog.java | 8 ++-
.../java/org/apache/iceberg/TestCatalogUtil.java | 45 +++++++++++++++++
.../org/apache/iceberg/rest/TestRESTCatalog.java | 57 ++++++++++++++++++++++
6 files changed, 153 insertions(+), 2 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/metrics/LoggingMetricsReporter.java b/api/src/main/java/org/apache/iceberg/metrics/LoggingMetricsReporter.java
index 26e9a0b527..8d41f10e74 100644
--- a/api/src/main/java/org/apache/iceberg/metrics/LoggingMetricsReporter.java
+++ b/api/src/main/java/org/apache/iceberg/metrics/LoggingMetricsReporter.java
@@ -28,6 +28,11 @@ import org.slf4j.LoggerFactory;
*/
public class LoggingMetricsReporter implements MetricsReporter {
private static final Logger LOG = LoggerFactory.getLogger(LoggingMetricsReporter.class);
+ private static final LoggingMetricsReporter INSTANCE = new LoggingMetricsReporter();
+
+ public static LoggingMetricsReporter instance() {
+ return INSTANCE;
+ }
@Override
public void report(MetricsReport report) {
diff --git a/core/src/main/java/org/apache/iceberg/CatalogProperties.java b/core/src/main/java/org/apache/iceberg/CatalogProperties.java
index 95fe6a074c..c2490ee3ea 100644
--- a/core/src/main/java/org/apache/iceberg/CatalogProperties.java
+++ b/core/src/main/java/org/apache/iceberg/CatalogProperties.java
@@ -29,6 +29,7 @@ public class CatalogProperties {
public static final String WAREHOUSE_LOCATION = "warehouse";
public static final String TABLE_DEFAULT_PREFIX = "table-default.";
public static final String TABLE_OVERRIDE_PREFIX = "table-override.";
+ public static final String METRICS_REPORTER_IMPL = "metrics-reporter-impl";
/**
* Controls whether the catalog will cache table entries upon load.
diff --git a/core/src/main/java/org/apache/iceberg/CatalogUtil.java b/core/src/main/java/org/apache/iceberg/CatalogUtil.java
index c0c5078a3f..ef4d17c249 100644
--- a/core/src/main/java/org/apache/iceberg/CatalogUtil.java
+++ b/core/src/main/java/org/apache/iceberg/CatalogUtil.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.Configurable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -400,4 +401,42 @@ public class CatalogUtil {
setConf.invoke(conf);
}
+
+ /**
+ * Load a custom {@link MetricsReporter} implementation.
+ *
+ * <p>The implementation must have a no-arg constructor.
+ *
+ * @param impl full class name of a custom {@link MetricsReporter} implementation
+ * @return An initialized {@link MetricsReporter}.
+ * @throws IllegalArgumentException if class path not found or right constructor not found or the
+ * loaded class cannot be cast to the given interface type
+ */
+ public static MetricsReporter loadMetricsReporter(String impl) {
+ LOG.info("Loading custom MetricsReporter implementation: {}", impl);
+ DynConstructors.Ctor<MetricsReporter> ctor;
+ try {
+ ctor =
+ DynConstructors.builder(MetricsReporter.class)
+ .loader(CatalogUtil.class.getClassLoader())
+ .impl(impl)
+ .buildChecked();
+ } catch (NoSuchMethodException e) {
+ throw new IllegalArgumentException(
+ String.format("Cannot initialize MetricsReporter, missing no-arg constructor: %s", impl),
+ e);
+ }
+
+ MetricsReporter reporter;
+ try {
+ reporter = ctor.newInstance();
+ } catch (ClassCastException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot initialize MetricsReporter, %s does not implement MetricsReporter.", impl),
+ e);
+ }
+
+ return reporter;
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index 0b4edc9a84..d4757c4ed5 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -177,7 +177,11 @@ public class RESTSessionCatalog extends BaseSessionCatalog
this.io =
CatalogUtil.loadFileIO(
ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(), mergedProps, conf);
- this.reporter = new LoggingMetricsReporter();
+ String metricsReporterImpl = mergedProps.get(CatalogProperties.METRICS_REPORTER_IMPL);
+ this.reporter =
+ null != metricsReporterImpl
+ ? CatalogUtil.loadMetricsReporter(metricsReporterImpl)
+ : LoggingMetricsReporter.instance();
super.initialize(name, mergedProps);
}
@@ -316,8 +320,8 @@ public class RESTSessionCatalog extends BaseSessionCatalog
TableIdentifier tableIdentifier,
MetricsReport report,
Supplier<Map<String, String>> headers) {
- reporter.report(report);
try {
+ reporter.report(report);
client.post(
paths.metrics(tableIdentifier),
ReportMetricsRequest.of(report),
diff --git a/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java b/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java
index a31cf51b68..86e4a2c837 100644
--- a/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java
+++ b/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java
@@ -29,6 +29,8 @@ import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
@@ -187,6 +189,32 @@ public class TestCatalogUtil {
() -> CatalogUtil.buildIcebergCatalog(name, options, hadoopConf));
}
+ @Test
+ public void loadCustomMetricsReporter_noArg() {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key", "val");
+
+ MetricsReporter metricsReporter =
+ CatalogUtil.loadMetricsReporter(TestMetricsReporterDefault.class.getName());
+ Assertions.assertThat(metricsReporter).isInstanceOf(TestMetricsReporterDefault.class);
+ }
+
+ @Test
+ public void loadCustomMetricsReporter_badArg() {
+ Assertions.assertThatThrownBy(
+ () -> CatalogUtil.loadMetricsReporter(TestMetricsReporterBadArg.class.getName()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("missing no-arg constructor");
+ }
+
+ @Test
+ public void loadCustomMetricsReporter_badClass() {
+ Assertions.assertThatThrownBy(
+ () -> CatalogUtil.loadMetricsReporter(TestFileIONotImpl.class.getName()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("does not implement MetricsReporter");
+ }
+
public static class TestCatalog extends BaseMetastoreCatalog {
private String catalogName;
@@ -399,4 +427,21 @@ public class TestCatalogUtil {
public static class TestFileIONotImpl {
public TestFileIONotImpl() {}
}
+
+ public static class TestMetricsReporterBadArg implements MetricsReporter {
+ private final String arg;
+
+ public TestMetricsReporterBadArg(String arg) {
+ this.arg = arg;
+ }
+
+ @Override
+ public void report(MetricsReport report) {}
+ }
+
+ public static class TestMetricsReporterDefault implements MetricsReporter {
+
+ @Override
+ public void report(MetricsReport report) {}
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
index 9703b4c477..2d1133e9c4 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
@@ -31,16 +31,23 @@ import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.CatalogTests;
import org.apache.iceberg.catalog.SessionCatalog;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.jdbc.JdbcCatalog;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.rest.RESTCatalogAdapter.HTTPMethod;
import org.apache.iceberg.rest.responses.ConfigResponse;
@@ -1075,4 +1082,54 @@ public class TestRESTCatalog extends CatalogTests<RESTCatalog> {
eq(refreshedCatalogHeader),
any());
}
+
+ @Test
+ public void testCatalogWithCustomMetricsReporter() throws IOException {
+ this.restCatalog =
+ new RESTCatalog(
+ new SessionCatalog.SessionContext(
+ UUID.randomUUID().toString(),
+ "user",
+ ImmutableMap.of("credential", "user:12345"),
+ ImmutableMap.of()),
+ (config) -> HTTPClient.builder().uri(config.get(CatalogProperties.URI)).build());
+ restCatalog.setConf(new Configuration());
+ restCatalog.initialize(
+ "prod",
+ ImmutableMap.of(
+ CatalogProperties.URI,
+ "http://localhost:8181/",
+ "credential",
+ "catalog:12345",
+ CatalogProperties.METRICS_REPORTER_IMPL,
+ CustomMetricsReporter.class.getName()));
+
+ restCatalog.buildTable(TABLE, SCHEMA).create();
+ Table table = restCatalog.loadTable(TABLE);
+ table
+ .newFastAppend()
+ .appendFile(
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/data-a.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(2)
+ .build())
+ .commit();
+
+ try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+ assertThat(tasks.iterator()).hasNext();
+ }
+
+ // counter of custom metrics reporter should have been increased
+ assertThat(CustomMetricsReporter.COUNTER.get()).isEqualTo(1);
+ }
+
+ public static class CustomMetricsReporter implements MetricsReporter {
+ static final AtomicInteger COUNTER = new AtomicInteger(0);
+
+ @Override
+ public void report(MetricsReport report) {
+ COUNTER.incrementAndGet();
+ }
+ }
}