You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/06/02 22:25:52 UTC

[iceberg] branch master updated: ORC: Support metrics in Iceberg metadata (#199)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 260bfa0  ORC: Support metrics in Iceberg metadata (#199)
260bfa0 is described below

commit 260bfa0a33aebc25d07e9a3add46fb7ba0eeca43
Author: Edgar Rodriguez <26...@users.noreply.github.com>
AuthorDate: Tue Jun 2 15:25:44 2020 -0700

    ORC: Support metrics in Iceberg metadata (#199)
---
 .../java/org/apache/iceberg/util/DateTimeUtil.java |   4 +-
 .../test/java/org/apache/iceberg/TestMetrics.java  |  71 ++++--
 .../TestOrcMetrics.java}                           |  56 +++--
 .../apache/iceberg/parquet/TestParquetMetrics.java |   9 +-
 .../java/org/apache/iceberg/orc/ORCSchemaUtil.java |   2 +-
 .../org/apache/iceberg/orc/OrcFileAppender.java    |   2 +
 .../java/org/apache/iceberg/orc/OrcMetrics.java    | 248 +++++++++++++++++++--
 7 files changed, 327 insertions(+), 65 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java b/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
index c155f00..9f1e26e 100644
--- a/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
@@ -31,8 +31,8 @@ public class DateTimeUtil {
   private DateTimeUtil() {
   }
 
-  private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
-  private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+  public static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+  public static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
 
   public static LocalDate dateFromDays(int daysFromEpoch) {
     return ChronoUnit.DAYS.addTo(EPOCH_DAY, daysFromEpoch);
diff --git a/core/src/test/java/org/apache/iceberg/TestMetrics.java b/core/src/test/java/org/apache/iceberg/TestMetrics.java
index 1ebc157..63f36e4 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetrics.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetrics.java
@@ -87,13 +87,16 @@ public abstract class TestMetrics {
       required(7, "stringCol", StringType.get()),
       optional(8, "dateCol", DateType.get()),
       required(9, "timeCol", TimeType.get()),
-      required(10, "timestampCol", TimestampType.withoutZone()),
+      required(10, "timestampColAboveEpoch", TimestampType.withoutZone()),
       required(11, "fixedCol", FixedType.ofLength(4)),
-      required(12, "binaryCol", BinaryType.get())
+      required(12, "binaryCol", BinaryType.get()),
+      required(13, "timestampColBelowEpoch", TimestampType.withoutZone())
   );
 
   private final byte[] fixed = "abcd".getBytes(StandardCharsets.UTF_8);
 
+  public abstract FileFormat fileFormat();
+
   public abstract Metrics getMetrics(InputFile file);
 
   public abstract InputFile writeRecords(Schema schema, Record... records) throws IOException;
@@ -101,7 +104,7 @@ public abstract class TestMetrics {
   public abstract InputFile writeRecordsWithSmallRowGroups(Schema schema, Record... records)
       throws IOException;
 
-  public abstract int splitCount(InputFile parquetFile) throws IOException;
+  public abstract int splitCount(InputFile inputFile) throws IOException;
 
   public boolean supportsSmallRowGroups() {
     return false;
@@ -119,9 +122,10 @@ public abstract class TestMetrics {
     firstRecord.setField("stringCol", "AAA");
     firstRecord.setField("dateCol", DateTimeUtil.dateFromDays(1500));
     firstRecord.setField("timeCol", DateTimeUtil.timeFromMicros(2000L));
-    firstRecord.setField("timestampCol", DateTimeUtil.timestampFromMicros(0L));
+    firstRecord.setField("timestampColAboveEpoch", DateTimeUtil.timestampFromMicros(0L));
     firstRecord.setField("fixedCol", fixed);
     firstRecord.setField("binaryCol", ByteBuffer.wrap("S".getBytes()));
+    firstRecord.setField("timestampColBelowEpoch", DateTimeUtil.timestampFromMicros(0L));
     Record secondRecord = GenericRecord.create(SIMPLE_SCHEMA);
     secondRecord.setField("booleanCol", true);
     secondRecord.setField("intCol", 3);
@@ -132,9 +136,10 @@ public abstract class TestMetrics {
     secondRecord.setField("stringCol", "AAA");
     secondRecord.setField("dateCol", DateTimeUtil.dateFromDays(1500));
     secondRecord.setField("timeCol", DateTimeUtil.timeFromMicros(2000L));
-    secondRecord.setField("timestampCol", DateTimeUtil.timestampFromMicros(0L));
+    secondRecord.setField("timestampColAboveEpoch", DateTimeUtil.timestampFromMicros(0L));
     secondRecord.setField("fixedCol", fixed);
     secondRecord.setField("binaryCol", ByteBuffer.wrap("S".getBytes()));
+    secondRecord.setField("timestampColBelowEpoch", DateTimeUtil.timestampFromMicros(0L));
 
     InputFile recordsFile = writeRecords(SIMPLE_SCHEMA, firstRecord, secondRecord);
 
@@ -152,6 +157,7 @@ public abstract class TestMetrics {
     assertCounts(10, 2L, 0L, metrics);
     assertCounts(11, 2L, 0L, metrics);
     assertCounts(12, 2L, 0L, metrics);
+    assertCounts(13, 2L, 0L, metrics);
   }
 
   @Test
@@ -166,9 +172,10 @@ public abstract class TestMetrics {
     firstRecord.setField("stringCol", "AAA");
     firstRecord.setField("dateCol", DateTimeUtil.dateFromDays(1500));
     firstRecord.setField("timeCol", DateTimeUtil.timeFromMicros(2000L));
-    firstRecord.setField("timestampCol", DateTimeUtil.timestampFromMicros(0L));
+    firstRecord.setField("timestampColAboveEpoch", DateTimeUtil.timestampFromMicros(0L));
     firstRecord.setField("fixedCol", fixed);
     firstRecord.setField("binaryCol", ByteBuffer.wrap("S".getBytes()));
+    firstRecord.setField("timestampColBelowEpoch", DateTimeUtil.timestampFromMicros(-1_900_300L));
     Record secondRecord = GenericRecord.create(SIMPLE_SCHEMA);
     secondRecord.setField("booleanCol", false);
     secondRecord.setField("intCol", Integer.MIN_VALUE);
@@ -179,9 +186,10 @@ public abstract class TestMetrics {
     secondRecord.setField("stringCol", "ZZZ");
     secondRecord.setField("dateCol", null);
     secondRecord.setField("timeCol", DateTimeUtil.timeFromMicros(3000L));
-    secondRecord.setField("timestampCol", DateTimeUtil.timestampFromMicros(1000L));
+    secondRecord.setField("timestampColAboveEpoch", DateTimeUtil.timestampFromMicros(900L));
     secondRecord.setField("fixedCol", fixed);
     secondRecord.setField("binaryCol", ByteBuffer.wrap("W".getBytes()));
+    secondRecord.setField("timestampColBelowEpoch", DateTimeUtil.timestampFromMicros(0L));
 
     InputFile recordsFile = writeRecords(SIMPLE_SCHEMA, firstRecord, secondRecord);
 
@@ -206,13 +214,27 @@ public abstract class TestMetrics {
     assertCounts(9, 2L, 0L, metrics);
     assertBounds(9, TimeType.get(), 2000L, 3000L, metrics);
     assertCounts(10, 2L, 0L, metrics);
-    assertBounds(10, TimestampType.withoutZone(), 0L, 1000L, metrics);
+    if (fileFormat() == FileFormat.ORC) {
+      // ORC-611: ORC only supports millisecond precision, so we adjust by 1 millisecond
+      assertBounds(10, TimestampType.withoutZone(), 0L, 1000L, metrics);
+    } else {
+      assertBounds(10, TimestampType.withoutZone(), 0L, 900L, metrics);
+    }
     assertCounts(11, 2L, 0L, metrics);
     assertBounds(11, FixedType.ofLength(4),
         ByteBuffer.wrap(fixed), ByteBuffer.wrap(fixed), metrics);
     assertCounts(12, 2L, 0L, metrics);
     assertBounds(12, BinaryType.get(),
         ByteBuffer.wrap("S".getBytes()), ByteBuffer.wrap("W".getBytes()), metrics);
+    if (fileFormat() == FileFormat.ORC) {
+      // TODO: enable when ORC-342 is fixed - ORC-342: creates inaccurate timestamp/stats below epoch
+      // ORC-611: ORC only supports millisecond precision, so we adjust by 1 millisecond, e.g.
+      // assertBounds(13, TimestampType.withoutZone(), -1000L, 1000L, metrics); would fail for a value
+      // in the range `[1970-01-01 00:00:00.000,1970-01-01 00:00:00.999]`
+      assertBounds(13, TimestampType.withoutZone(), -1_901_000L, 1000L, metrics);
+    } else {
+      assertBounds(13, TimestampType.withoutZone(), -1_900_300L, 0L, metrics);
+    }
   }
 
   @Test
@@ -292,14 +314,22 @@ public abstract class TestMetrics {
 
     Metrics metrics = getMetrics(recordsFile);
     Assert.assertEquals(1L, (long) metrics.recordCount());
-    assertCounts(1, 1, 0, metrics);
+    if (fileFormat() != FileFormat.ORC) {
+      assertCounts(1, 1L, 0L, metrics);
+      assertCounts(2, 1L, 0L, metrics);
+      assertCounts(4, 3L, 0L, metrics);
+      assertCounts(6, 1L, 0L, metrics);
+    } else {
+      assertCounts(1, null, null, metrics);
+      assertCounts(2, null, null, metrics);
+      assertCounts(4, null, null, metrics);
+      assertCounts(6, null, null, metrics);
+    }
     assertBounds(1, IntegerType.get(), null, null, metrics);
-    assertCounts(2, 1, 0, metrics);
     assertBounds(2, StringType.get(), null, null, metrics);
-    assertCounts(4, 3, 0, metrics);
     assertBounds(4, IntegerType.get(), null, null, metrics);
-    assertCounts(6, 1, 0, metrics);
     assertBounds(6, StringType.get(), null, null, metrics);
+    assertBounds(7, structType, null, null, metrics);
   }
 
   @Test
@@ -316,7 +346,7 @@ public abstract class TestMetrics {
 
     Metrics metrics = getMetrics(recordsFile);
     Assert.assertEquals(2L, (long) metrics.recordCount());
-    assertCounts(1, 2, 2, metrics);
+    assertCounts(1, 2L, 2L, metrics);
     assertBounds(1, IntegerType.get(), null, null, metrics);
   }
 
@@ -338,13 +368,14 @@ public abstract class TestMetrics {
       newRecord.setField("stringCol", "AAA");
       newRecord.setField("dateCol", DateTimeUtil.dateFromDays(i + 1));
       newRecord.setField("timeCol", DateTimeUtil.timeFromMicros(i + 1L));
-      newRecord.setField("timestampCol", DateTimeUtil.timestampFromMicros(i + 1L));
+      newRecord.setField("timestampColAboveEpoch", DateTimeUtil.timestampFromMicros(i + 1L));
       newRecord.setField("fixedCol", fixed);
       newRecord.setField("binaryCol", ByteBuffer.wrap("S".getBytes()));
+      newRecord.setField("timestampColBelowEpoch", DateTimeUtil.timestampFromMicros((i + 1L) * -1L));
       records.add(newRecord);
     }
 
-    // create parquet file with multiple row groups. by using smaller number of bytes
+    // create file with multiple row groups. by using smaller number of bytes
     InputFile recordsFile = writeRecordsWithSmallRowGroups(SIMPLE_SCHEMA, records.toArray(new Record[0]));
 
     Assert.assertNotNull(recordsFile);
@@ -387,7 +418,7 @@ public abstract class TestMetrics {
       records.add(newRecord);
     }
 
-    // create parquet file with multiple row groups. by using smaller number of bytes
+    // create file with multiple row groups. by using smaller number of bytes
     InputFile recordsFile = writeRecordsWithSmallRowGroups(NESTED_SCHEMA, records.toArray(new Record[0]));
 
     Assert.assertNotNull(recordsFile);
@@ -407,14 +438,14 @@ public abstract class TestMetrics {
         ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics);
   }
 
-  private void assertCounts(int fieldId, long valueCount, long nullValueCount, Metrics metrics) {
+  private void assertCounts(int fieldId, Long valueCount, Long nullValueCount, Metrics metrics) {
     Map<Integer, Long> valueCounts = metrics.valueCounts();
     Map<Integer, Long> nullValueCounts = metrics.nullValueCounts();
-    Assert.assertEquals(valueCount, (long) valueCounts.get(fieldId));
-    Assert.assertEquals(nullValueCount, (long) nullValueCounts.get(fieldId));
+    Assert.assertEquals(valueCount, valueCounts.get(fieldId));
+    Assert.assertEquals(nullValueCount, nullValueCounts.get(fieldId));
   }
 
-  private <T> void assertBounds(int fieldId, Type type, T lowerBound, T upperBound, Metrics metrics) {
+  protected <T> void assertBounds(int fieldId, Type type, T lowerBound, T upperBound, Metrics metrics) {
     Map<Integer, ByteBuffer> lowerBounds = metrics.lowerBounds();
     Map<Integer, ByteBuffer> upperBounds = metrics.upperBounds();
 
diff --git a/data/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java b/data/src/test/java/org/apache/iceberg/orc/TestOrcMetrics.java
similarity index 60%
copy from data/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
copy to data/src/test/java/org/apache/iceberg/orc/TestOrcMetrics.java
index a4c28cb..82ef54b 100644
--- a/data/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
+++ b/data/src/test/java/org/apache/iceberg/orc/TestOrcMetrics.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iceberg.parquet;
+package org.apache.iceberg.orc;
 
 import java.io.File;
 import java.io.IOException;
@@ -27,37 +27,44 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.Metrics;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TestMetrics;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.iceberg.types.Type;
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
 /**
- * Test Metrics for Parquet.
+ * Test Metrics for ORC.
  */
-public class TestParquetMetrics extends TestMetrics {
-  private static final Map<String, String> SMALL_ROW_GROUP_CONFIG = ImmutableMap.of(
-      TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, "1600");
+public class TestOrcMetrics extends TestMetrics {
+
+  static final ImmutableSet<Object> BINARY_TYPES = ImmutableSet.of(Type.TypeID.BINARY,
+      Type.TypeID.FIXED, Type.TypeID.UUID);
 
   @Rule
   public TemporaryFolder temp = new TemporaryFolder();
 
   @Override
+  public FileFormat fileFormat() {
+    return FileFormat.ORC;
+  }
+
+  @Override
   public Metrics getMetrics(InputFile file) {
-    return ParquetUtil.fileMetrics(file);
+    return OrcMetrics.fromInputFile(file);
   }
 
   @Override
   public InputFile writeRecordsWithSmallRowGroups(Schema schema, Record... records) throws IOException {
-    return writeRecords(schema, SMALL_ROW_GROUP_CONFIG, records);
+    throw new UnsupportedOperationException("supportsSmallRowGroups = " + supportsSmallRowGroups());
   }
 
   @Override
@@ -66,13 +73,13 @@ public class TestParquetMetrics extends TestMetrics {
   }
 
   private InputFile writeRecords(Schema schema, Map<String, String> properties, Record... records) throws IOException {
-    File tmpFolder = temp.newFolder("parquet");
+    File tmpFolder = temp.newFolder("orc");
     String filename = UUID.randomUUID().toString();
-    OutputFile file = Files.localOutput(new File(tmpFolder, FileFormat.PARQUET.addExtension(filename)));
-    try (FileAppender<Record> writer = Parquet.write(file)
+    OutputFile file = Files.localOutput(new File(tmpFolder, FileFormat.ORC.addExtension(filename)));
+    try (FileAppender<Record> writer = ORC.write(file)
         .schema(schema)
         .setAll(properties)
-        .createWriterFunc(GenericParquetWriter::buildWriter)
+        .createWriterFunc(GenericOrcWriter::buildWriter)
         .build()) {
       writer.addAll(Lists.newArrayList(records));
     }
@@ -80,14 +87,23 @@ public class TestParquetMetrics extends TestMetrics {
   }
 
   @Override
-  public int splitCount(InputFile parquetFile) throws IOException {
-    try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(parquetFile))) {
-      return reader.getRowGroups().size();
-    }
+  public int splitCount(InputFile inputFile) throws IOException {
+    return 0;
+  }
+
+  private boolean isBinaryType(Type type) {
+    return BINARY_TYPES.contains(type.typeId());
   }
 
   @Override
-  public boolean supportsSmallRowGroups() {
-    return true;
+  protected <T> void assertBounds(int fieldId, Type type, T lowerBound, T upperBound, Metrics metrics) {
+    if (isBinaryType(type)) {
+      Assert.assertFalse("ORC binary field should not have lower bounds.",
+          metrics.lowerBounds().containsKey(fieldId));
+      Assert.assertFalse("ORC binary field should not have upper bounds.",
+          metrics.lowerBounds().containsKey(fieldId));
+      return;
+    }
+    super.assertBounds(fieldId, type, lowerBound, upperBound, metrics);
   }
 }
diff --git a/data/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java b/data/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
index a4c28cb..9af818a 100644
--- a/data/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
+++ b/data/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
@@ -51,6 +51,11 @@ public class TestParquetMetrics extends TestMetrics {
   public TemporaryFolder temp = new TemporaryFolder();
 
   @Override
+  public FileFormat fileFormat() {
+    return FileFormat.PARQUET;
+  }
+
+  @Override
   public Metrics getMetrics(InputFile file) {
     return ParquetUtil.fileMetrics(file);
   }
@@ -80,8 +85,8 @@ public class TestParquetMetrics extends TestMetrics {
   }
 
   @Override
-  public int splitCount(InputFile parquetFile) throws IOException {
-    try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(parquetFile))) {
+  public int splitCount(InputFile inputFile) throws IOException {
+    try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(inputFile))) {
       return reader.getRowGroups().size();
     }
   }
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
index 07be70f..970f891 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
@@ -377,7 +377,7 @@ public final class ORCSchemaUtil {
     }
   }
 
-  private static Optional<Integer> icebergID(TypeDescription orcType) {
+  static Optional<Integer> icebergID(TypeDescription orcType) {
     return Optional.ofNullable(orcType.getAttributeValue(ICEBERG_ID_ATTRIBUTE))
         .map(Integer::parseInt);
   }
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
index 4e961bb..acc4855 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
@@ -91,6 +91,8 @@ class OrcFileAppender<D> implements FileAppender<D> {
 
   @Override
   public Metrics metrics() {
+    Preconditions.checkState(isClosed,
+        "Cannot return metrics while appending to an open file.");
     return OrcMetrics.fromWriter(writer);
   }
 
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
index 3689ec5..a5bcc2f 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
@@ -20,13 +20,40 @@
 package org.apache.iceberg.orc;
 
 import java.io.IOException;
-import java.util.Collections;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.Metrics;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.hadoop.HadoopInputFile;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Queues;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.orc.BooleanColumnStatistics;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
 import org.apache.orc.Reader;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.TimestampColumnStatistics;
+import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
 
 public class OrcMetrics {
@@ -40,30 +67,211 @@ public class OrcMetrics {
     return fromInputFile(file, config);
   }
 
-  public static Metrics fromInputFile(InputFile file, Configuration config) {
+  static Metrics fromInputFile(InputFile file, Configuration config) {
     try (Reader orcReader = ORC.newFileReader(file, config)) {
-
-      // TODO: implement rest of the methods for ORC metrics
-      // https://github.com/apache/incubator-iceberg/pull/199
-      return new Metrics(orcReader.getNumberOfRows(),
-          null,
-          null,
-          Collections.emptyMap(),
-          null,
-          null);
+      return buildOrcMetrics(orcReader.getNumberOfRows(), orcReader.getSchema(), orcReader.getStatistics());
     } catch (IOException ioe) {
-      throw new RuntimeIOException(ioe, "Failed to read footer of file: %s", file);
+      throw new RuntimeIOException(ioe, "Failed to open file: %s", file.location());
     }
   }
 
   static Metrics fromWriter(Writer writer) {
-    // TODO: implement rest of the methods for ORC metrics in
-    // https://github.com/apache/incubator-iceberg/pull/199
-    return new Metrics(writer.getNumberOfRows(),
-        null,
-        null,
-        Collections.emptyMap(),
-        null,
-        null);
+    try {
+      return buildOrcMetrics(writer.getNumberOfRows(), writer.getSchema(), writer.getStatistics());
+    } catch (IOException ioe) {
+      throw new RuntimeIOException(ioe, "Failed to get statistics from writer");
+    }
+  }
+
+  private static Metrics buildOrcMetrics(final long numOfRows, final TypeDescription orcSchema,
+                                         final ColumnStatistics[] colStats) {
+    final Schema schema = ORCSchemaUtil.convert(orcSchema);
+    final Set<TypeDescription> columnsInContainers = findColumnsInContainers(schema, orcSchema);
+    Map<Integer, Long> columnSizes = Maps.newHashMapWithExpectedSize(colStats.length);
+    Map<Integer, Long> valueCounts = Maps.newHashMapWithExpectedSize(colStats.length);
+    Map<Integer, Long> nullCounts = Maps.newHashMapWithExpectedSize(colStats.length);
+    Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap();
+    Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap();
+
+    for (int i = 0; i < colStats.length; i++) {
+      final ColumnStatistics colStat = colStats[i];
+      final TypeDescription orcCol = orcSchema.findSubtype(i);
+      final Optional<Types.NestedField> icebergColOpt = ORCSchemaUtil.icebergID(orcCol)
+          .map(schema::findField);
+
+      if (icebergColOpt.isPresent()) {
+        final Types.NestedField icebergCol = icebergColOpt.get();
+        final int fieldId = icebergCol.fieldId();
+
+        columnSizes.put(fieldId, colStat.getBytesOnDisk());
+
+        if (!columnsInContainers.contains(orcCol)) {
+          // Since ORC does not track null values nor repeated ones, the value count for columns in
+          // containers (maps, list) may be larger than what it actually is, however these are not
+          // used in experssions right now. For such cases, we use the value number of values
+          // directly stored in ORC.
+          if (colStat.hasNull()) {
+            nullCounts.put(fieldId, numOfRows - colStat.getNumberOfValues());
+          } else {
+            nullCounts.put(fieldId, 0L);
+          }
+          valueCounts.put(fieldId, colStat.getNumberOfValues() + nullCounts.get(fieldId));
+
+          Optional<ByteBuffer> orcMin = (colStat.getNumberOfValues() > 0) ?
+              fromOrcMin(icebergCol, colStat) : Optional.empty();
+          orcMin.ifPresent(byteBuffer -> lowerBounds.put(icebergCol.fieldId(), byteBuffer));
+          Optional<ByteBuffer> orcMax = (colStat.getNumberOfValues() > 0) ?
+              fromOrcMax(icebergCol, colStat) : Optional.empty();
+          orcMax.ifPresent(byteBuffer -> upperBounds.put(icebergCol.fieldId(), byteBuffer));
+        }
+      }
+    }
+
+    return new Metrics(numOfRows,
+        columnSizes,
+        valueCounts,
+        nullCounts,
+        lowerBounds,
+        upperBounds);
+  }
+
+  private static Optional<ByteBuffer> fromOrcMin(Types.NestedField column,
+                                                 ColumnStatistics columnStats) {
+    Object min = null;
+    if (columnStats instanceof IntegerColumnStatistics) {
+      min = ((IntegerColumnStatistics) columnStats).getMinimum();
+      if (column.type().typeId() == Type.TypeID.INTEGER) {
+        min = Math.toIntExact((long) min);
+      }
+    } else if (columnStats instanceof DoubleColumnStatistics) {
+      min = ((DoubleColumnStatistics) columnStats).getMinimum();
+      if (column.type().typeId() == Type.TypeID.FLOAT) {
+        min = ((Double) min).floatValue();
+      }
+    } else if (columnStats instanceof StringColumnStatistics) {
+      min = ((StringColumnStatistics) columnStats).getMinimum();
+    } else if (columnStats instanceof DecimalColumnStatistics) {
+      min = Optional
+          .ofNullable(((DecimalColumnStatistics) columnStats).getMinimum())
+          .map(minStats -> minStats.bigDecimalValue()
+              .setScale(((Types.DecimalType) column.type()).scale()))
+          .orElse(null);
+    } else if (columnStats instanceof DateColumnStatistics) {
+      min = Optional.ofNullable(((DateColumnStatistics) columnStats).getMinimum())
+          .map(minStats -> DateTimeUtil.daysFromDate(
+              DateTimeUtil.EPOCH.plus(minStats.getTime(), ChronoUnit.MILLIS).toLocalDate()))
+          .orElse(null);
+    } else if (columnStats instanceof TimestampColumnStatistics) {
+      TimestampColumnStatistics tColStats = (TimestampColumnStatistics) columnStats;
+      Timestamp minValue = tColStats.getMinimumUTC();
+      min = Optional.ofNullable(minValue)
+          .map(v -> TimeUnit.MILLISECONDS.toMicros(v.getTime()))
+          .orElse(null);
+    } else if (columnStats instanceof BooleanColumnStatistics) {
+      BooleanColumnStatistics booleanStats = (BooleanColumnStatistics) columnStats;
+      min = booleanStats.getFalseCount() <= 0;
+    }
+    return Optional.ofNullable(Conversions.toByteBuffer(column.type(), min));
+  }
+
+  private static Optional<ByteBuffer> fromOrcMax(Types.NestedField column,
+                                                 ColumnStatistics columnStats) {
+    Object max = null;
+    if (columnStats instanceof IntegerColumnStatistics) {
+      max = ((IntegerColumnStatistics) columnStats).getMaximum();
+      if (column.type().typeId() == Type.TypeID.INTEGER) {
+        max = Math.toIntExact((long) max);
+      }
+    } else if (columnStats instanceof DoubleColumnStatistics) {
+      max = ((DoubleColumnStatistics) columnStats).getMaximum();
+      if (column.type().typeId() == Type.TypeID.FLOAT) {
+        max = ((Double) max).floatValue();
+      }
+    } else if (columnStats instanceof StringColumnStatistics) {
+      max = ((StringColumnStatistics) columnStats).getMaximum();
+    } else if (columnStats instanceof DecimalColumnStatistics) {
+      max = Optional
+          .ofNullable(((DecimalColumnStatistics) columnStats).getMaximum())
+          .map(maxStats -> maxStats.bigDecimalValue()
+              .setScale(((Types.DecimalType) column.type()).scale()))
+          .orElse(null);
+    } else if (columnStats instanceof DateColumnStatistics) {
+      max = Optional.ofNullable(((DateColumnStatistics) columnStats).getMaximum())
+          .map(maxStats -> DateTimeUtil.daysFromDate(
+              DateTimeUtil.EPOCH.plus(maxStats.getTime(), ChronoUnit.MILLIS).toLocalDate()))
+          .orElse(null);
+    } else if (columnStats instanceof TimestampColumnStatistics) {
+      TimestampColumnStatistics tColStats = (TimestampColumnStatistics) columnStats;
+      Timestamp maxValue = tColStats.getMaximumUTC();
+      max = Optional.ofNullable(maxValue)
+          .map(v -> TimeUnit.MILLISECONDS.toMicros(v.getTime()))
+          .map(v -> v + 1_000) // Add 1 millisecond to handle precision issue due to ORC-611
+          .orElse(null);
+    } else if (columnStats instanceof BooleanColumnStatistics) {
+      BooleanColumnStatistics booleanStats = (BooleanColumnStatistics) columnStats;
+      max = booleanStats.getTrueCount() > 0;
+    }
+    return Optional.ofNullable(Conversions.toByteBuffer(column.type(), max));
+  }
+
+  private static Set<TypeDescription> findColumnsInContainers(Schema schema,
+                                                              TypeDescription orcSchema) {
+    ColumnsInContainersVisitor visitor = new ColumnsInContainersVisitor();
+    OrcSchemaWithTypeVisitor.visit(schema, orcSchema, visitor);
+    return visitor.getColumnsInContainers();
+  }
+
+  private static class ColumnsInContainersVisitor extends OrcSchemaWithTypeVisitor<TypeDescription> {
+
+    private final Set<TypeDescription> columnsInContainers;
+
+    private ColumnsInContainersVisitor() {
+      columnsInContainers = Sets.newHashSet();
+    }
+
+    public Set<TypeDescription> getColumnsInContainers() {
+      return columnsInContainers;
+    }
+
+    private Set<TypeDescription> flatten(TypeDescription rootType) {
+      if (rootType == null) {
+        return ImmutableSet.of();
+      }
+
+      final Set<TypeDescription> flatTypes = Sets.newHashSetWithExpectedSize(rootType.getMaximumId());
+      final Queue<TypeDescription> queue = Queues.newLinkedBlockingQueue();
+      queue.add(rootType);
+      while (!queue.isEmpty()) {
+        TypeDescription type = queue.remove();
+        flatTypes.add(type);
+        queue.addAll(Optional.ofNullable(type.getChildren()).orElse(ImmutableList.of()));
+      }
+      return flatTypes;
+    }
+
+    @Override
+    public TypeDescription record(Types.StructType iStruct, TypeDescription record,
+                                  List<String> names, List<TypeDescription> fields) {
+      return record;
+    }
+
+    @Override
+    public TypeDescription list(Types.ListType iList, TypeDescription array, TypeDescription element) {
+      columnsInContainers.addAll(flatten(element));
+      return array;
+    }
+
+    @Override
+    public TypeDescription map(Types.MapType iMap, TypeDescription map,
+                    TypeDescription key, TypeDescription value) {
+      columnsInContainers.addAll(flatten(key));
+      columnsInContainers.addAll(flatten(value));
+      return map;
+    }
+
+    @Override
+    public TypeDescription primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) {
+      return primitive;
+    }
   }
 }