You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/07/26 23:35:12 UTC

[GitHub] [iceberg] danielcweeks commented on a diff in pull request #5268: Initial Table Scan Reporting support

danielcweeks commented on code in PR #5268:
URL: https://github.com/apache/iceberg/pull/5268#discussion_r930487486


##########
api/src/main/java/org/apache/iceberg/metrics/ScanReport.java:
##########
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.metrics;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.metrics.MetricsContext.Counter;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * A Table Scan report that contains all relevant information from a Table Scan.
+ */
+public class ScanReport implements Serializable {
+
+  private final String tableName;
+  private final long snapshotId;
+  private final Expression filter;
+  private final Schema projection;
+  private final int resultDataFiles;
+  private final int totalDataManifests;
+  private final int totalDeleteManifests;
+  private final int scannedDataManifests;
+  private final int scannedDeleteManifests;
+  private final int totalDataFiles;
+  private final int totalDeleteFiles;
+  private final Duration totalPlanningDuration;
+  private final long totalFileSizeInBytes;
+  private final int indexedDeleteFiles;
+  private final int resultDeleteFiles;
+  private final int skippedDataManifests;
+  private final int skippedDeleteManifests;
+  private final int skippedDataFiles;
+  private final int skippedDeleteFiles;
+
+  private ScanReport(
+      String tableName, long snapshotId, Expression filter, Schema projection,
+      int resultDataFiles, int totalDataManifests, int totalDeleteManifests, int scannedDataManifests,
+      int scannedDeleteManifests, int totalDataFiles, int totalDeleteFiles, Duration totalPlanningDuration,
+      long totalFileSizeInBytes, int indexedDeleteFiles, int resultDeleteFiles,
+      int skippedDataManifests, int skippedDeleteManifests,
+      int skippedDataFiles, int skippedDeleteFiles) {
+    this.tableName = tableName;
+    this.snapshotId = snapshotId;
+    this.filter = filter;
+    this.projection = projection;
+    this.resultDataFiles = resultDataFiles;

Review Comment:
   I had a conversation with Ryan about what to do with ScanReport and ScanMetrics.  I think we actually want to remove the final values from the scan report here.  I get that will make it easy to serialize a pojo result, but there's a separation of concerns that I think we can achieve rather easily.
   
   Rather than collect all of the metric results in the ScanReport, just keep a reference to the ScanMetrics with the actual counters/timers.  This will allow the ScanReporter to manage the structure of the response.
   
   For example:
   
   ```java
   ScanReport:
     - table identifier
     - snapshot id
     - filter
     - projection
     - ScanMetrics:
         - <various metrics>
   ```
   
   The scan reporter can then construct the payload dynamically based off of the Scan report (espcially if we make a way to iterate over the metrics):
   
   ```json
   {
     "table-identifier": "a.b.c",
     "snapshot-id": 123,
     "filter": "foo=bar",
     "projection": "c1, c2",
     "metrics": {
       "<name>": {
         "type": "counter",   # we probably type metric type info and 
         "value": "val"
       },
       "<name>": {
         "type": "timer",
         "value": "val",
         "histogram": { "p50":  213, "p99":  18983 },
         "min": 23,
         "max": 20199
       }
     }
   }
   ```
   
   This allows us to separate the way we track metrics and the way we construct the payload so that if a future metric (say from micrometer) has a histogram we could then just update the reporter without have to create a new object structure for the values in ScanReport.  As you can see from what the json might look like, the metric info can get more complicated than we probably want to track here. 



##########
api/src/main/java/org/apache/iceberg/metrics/MetricsContext.java:
##########
@@ -126,15 +126,15 @@ public Timer timer(String name, TimeUnit unit) {
 
       @Override
       public <T extends Number> Counter<T> counter(String name, Class<T> type, Unit unit) {
-        return new Counter<T>() {
-          @Override
-          public void increment() {
-          }
-
-          @Override
-          public void increment(T amount) {
-          }
-        };
+        if (Integer.class.equals(type)) {

Review Comment:
   Not sure I follow why we need these NOOP implementations.  Seems like once you create a counter, you don't really need to know if it's a long/int/etc when using it.



##########
api/src/main/java/org/apache/iceberg/metrics/Timer.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.metrics;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+/**
+ * Generalized Timer interface for creating telemetry related instances for measuring duration of operations.
+ */
+public interface Timer {

Review Comment:
   We might want to consider pulling this into the MetricsContext interface similar to `Counter` or at least expose how you create timers through that interface so that they can be named (this is necessary if you're delegating to a framework like micrometer).



##########
api/src/main/java/org/apache/iceberg/metrics/LoggingScanReporter.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.metrics;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A default {@link ScanReporter} implementation that logs the {@link ScanReport} to the log file.
+ */
+public class LoggingScanReporter implements ScanReporter {
+  private static final Logger LOG = LoggerFactory.getLogger(LoggingScanReporter.class);
+
+  @Override
+  @SuppressWarnings("Slf4jConstantLogMessage")
+  public void reportScan(ScanReport scanReport) {
+    Preconditions.checkArgument(null != scanReport, "Invalid scan report: null");
+    LOG.info("Completed scan planning: {}", scanReport);

Review Comment:
   If we update the ScanReport, we can still use this as long as the `ScanReport::toString` just iterates over the metrics and reports the value.



##########
api/src/main/java/org/apache/iceberg/io/CloseableIterable.java:
##########
@@ -75,6 +76,34 @@ public CloseableIterator<E> iterator() {
     };
   }
 
+  /**
+   * Will run the given runnable when {@link CloseableIterable#close()} has been called.
+   *
+   * @param iterable             The underlying {@link CloseableIterable} to iterate over
+   * @param onCompletionRunnable The runnable to run after the underlying iterable was closed
+   * @param <E>                  The type of der underlying iterable

Review Comment:
   "der" -> "the"



##########
api/src/main/java/org/apache/iceberg/metrics/ScanReport.java:
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.metrics;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.metrics.MetricsContext.Counter;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * A Table Scan report that contains all relevant information from a Table Scan.
+ */
+public class ScanReport implements Serializable {
+
+  private final String tableName;
+  private final long snapshotId;
+  private final Expression filter;
+  private final Schema projection;
+  private final int matchingDataFiles;
+  private final int matchingDataManifests;
+  private final int totalDataManifestsRead;
+  private final int addedDataFiles;
+  private final int deletedDataFiles;
+  private final Duration totalPlanningDuration;
+  private final long totalFileSizeBytes;
+
+  private ScanReport(
+      String tableName, long snapshotId, Expression filter, Schema projection,
+      int matchingDataFiles, int matchingDataManifests, int totalDataManifestsRead,
+      int addedDataFiles, int deletedDataFiles, Duration totalPlanningDuration,
+      long totalFileSizeBytes) {
+    this.tableName = tableName;
+    this.snapshotId = snapshotId;
+    this.filter = filter;
+    this.projection = projection;
+    this.matchingDataFiles = matchingDataFiles;
+    this.matchingDataManifests = matchingDataManifests;
+    this.totalDataManifestsRead = totalDataManifestsRead;
+    this.addedDataFiles = addedDataFiles;
+    this.deletedDataFiles = deletedDataFiles;
+    this.totalPlanningDuration = totalPlanningDuration;
+    this.totalFileSizeBytes = totalFileSizeBytes;
+  }
+
+  public String tableName() {
+    return tableName;
+  }
+
+  public long snapshotId() {
+    return snapshotId;
+  }
+
+  public Expression filter() {
+    return filter;
+  }
+
+  public Schema projection() {
+    return projection;
+  }
+
+  public int matchingDataFiles() {
+    return matchingDataFiles;
+  }
+
+  public int matchingDataManifests() {
+    return matchingDataManifests;
+  }
+
+  public int totalDataManifestsRead() {
+    return totalDataManifestsRead;
+  }
+
+  public int addedDataFiles() {
+    return addedDataFiles;
+  }
+
+  public int deletedDataFiles() {
+    return deletedDataFiles;
+  }
+
+  public Duration totalPlanningDuration() {
+    return totalPlanningDuration;
+  }
+
+  public long totalFileSizeBytes() {
+    return totalFileSizeBytes;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("tableName", tableName)
+        .add("snapshotId", snapshotId)
+        .add("filter", filter)
+        .add("projection", projection)
+        .add("matchingDataFiles", matchingDataFiles)
+        .add("matchingDataManifests", matchingDataManifests)
+        .add("totalDataManifestsRead", totalDataManifestsRead)
+        .add("addedDataFiles", addedDataFiles)
+        .add("deletedDataFiles", deletedDataFiles)
+        .add("totalScanDuration", totalPlanningDuration)
+        .add("totalFileSizeBytes", totalFileSizeBytes)
+        .toString();
+  }
+
+  @SuppressWarnings("HiddenField")
+  public static class Builder {
+    private String tableName;
+    private long snapshotId = -1L;
+    private Expression filter;
+    private Schema projection;
+    private ScanMetrics scanMetrics;
+
+    private Builder() {
+    }
+
+    public Builder withTableName(String tableName) {
+      this.tableName = tableName;
+      return this;
+    }
+
+    public Builder withSnapshotId(long snapshotId) {
+      this.snapshotId = snapshotId;
+      return this;
+    }
+
+    public Builder withFilter(Expression filter) {
+      this.filter = filter;
+      return this;
+    }
+
+    public Builder withProjection(Schema projection) {
+      this.projection = projection;
+      return this;
+    }
+
+    public Builder fromScanMetrics(ScanMetrics scanMetrics) {
+      this.scanMetrics = scanMetrics;
+      return this;
+    }
+
+    public ScanReport build() {
+      Preconditions.checkArgument(null != tableName, "TableName must be non-null");
+      Preconditions.checkArgument(null != filter, "Expression filter must be non-null");
+      Preconditions.checkArgument(null != projection, "Schema projection must be non-null");
+      Preconditions.checkArgument(null != scanMetrics, "ScanMetrics must be non-null");
+      return new ScanReport(tableName, snapshotId, filter, projection,
+          scanMetrics.matchingDataFiles().count().orElse(-1),

Review Comment:
   @rdblue I looked into removing `Optional` with @nastra for counter and I believe that it makes sense.  For the hadoop case, we should probably throw unsupported or just adapt the implementation return the value from the file system statistics (which is possible, but would not currently be used) and then not have a default implementation for `count()`, which makes more sense to me.
   
   This avoids the question about `-1` vs. `Optional`.  I feel that if a counter is initialized, it should return 0 even if unused (as opposed to -1 or not present, which doesn't really have meaning in a counter context).
   
   @nastra said he would create a separate PR for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org