You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/06/15 08:39:00 UTC

[GitHub] [iceberg] nastra commented on a diff in pull request #4537: Add reader and writer for Puffin, indexes and stats file format

nastra commented on code in PR #4537:
URL: https://github.com/apache/iceberg/pull/4537#discussion_r897618171


##########
core/src/main/java/org/apache/iceberg/puffin/BlobMetadata.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.puffin;
+
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+public class BlobMetadata {
+  private final String type;
+  private final List<Integer> inputFields;
+  private final long offset;
+  private final long length;
+  private final String compressionCodec;
+  private final Map<String, String> properties;
+
+  public BlobMetadata(
+      String type, List<Integer> inputFields, long offset, long length,
+      @Nullable String compressionCodec, Map<String, String> properties) {
+    Preconditions.checkNotNull(type, "type is null");
+    Preconditions.checkNotNull(inputFields, "inputFields is null");
+    Preconditions.checkNotNull(properties, "properties is null");
+    this.type = type;
+    this.inputFields = ImmutableList.copyOf(inputFields);
+    this.offset = offset;
+    this.length = length;
+    this.compressionCodec = compressionCodec;
+    this.properties = ImmutableMap.copyOf(properties);
+  }
+
+  public String type() {
+    return type;
+  }
+
+  public List<Integer> inputFields() {

Review Comment:
   the spec mentions that input fields are JSON longs, so I'm just wondering whether this should be a `List<Long>`?



##########
core/src/main/java/org/apache/iceberg/puffin/PuffinFormat.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.puffin;
+
+import io.airlift.compress.Compressor;
+import io.airlift.compress.zstd.ZstdCompressor;
+import io.airlift.compress.zstd.ZstdDecompressor;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.util.Pair;
+
+final class PuffinFormat {
+  private PuffinFormat() {
+  }
+
+  enum Flag {
+    FOOTER_PAYLOAD_COMPRESSED(0, 0),
+    /**/;
+
+    private static final Map<Pair<Integer, Integer>, Flag> BY_BYTE_AND_BIT = Stream.of(values())
+        .collect(ImmutableMap.toImmutableMap(
+            flag -> Pair.of(flag.byteNumber(), flag.bitNumber()), Function.identity()));
+
+    private final int byteNumber;
+    private final int bitNumber;
+
+    Flag(int byteNumber, int bitNumber) {
+      Preconditions.checkArgument(
+          0 <= byteNumber && byteNumber < PuffinFormat.FOOTER_STRUCT_FLAGS_LENGTH,
+          "Invalid byteNumber");

Review Comment:
   nit: should we mention what the actual byteNumber is and also what the requirement is? Same for the error message with bitNumber.
   
   `Preconditions.checkArgument(..., "Invalid byteNumber '%s': must be 0 <= byteNumber < %d", byteNumber, FOOTER_STRUCT_FLAGS_LENGTH)`



##########
core/src/test/java/org/apache/iceberg/puffin/TestPuffinFormat.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.puffin;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.junit.Test;
+
+import static org.apache.iceberg.puffin.PuffinFormat.readIntegerLittleEndian;
+import static org.apache.iceberg.puffin.PuffinFormat.writeIntegerLittleEndian;
+import static org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestPuffinFormat {
+  @Test
+  public void testWriteIntegerLittleEndian() throws Exception {
+    testWriteIntegerLittleEndian(0, bytes(0, 0, 0, 0));
+    testWriteIntegerLittleEndian(42, bytes(42, 0, 0, 0));
+    testWriteIntegerLittleEndian(Integer.MAX_VALUE - 5, bytes(0xFA, 0xFF, 0xFF, 0x7F));
+    testWriteIntegerLittleEndian(-7, bytes(0xF9, 0xFF, 0xFF, 0xFF));
+  }
+
+  private void testWriteIntegerLittleEndian(int value, byte[] expected) throws Exception {
+    // Sanity check: validate the expectation
+    ByteBuffer buffer = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
+    buffer.putInt(value);
+    buffer.flip();
+    byte[] written = new byte[4];
+    buffer.get(written);
+    Preconditions.checkState(Arrays.equals(written, expected), "Invalid expected value");

Review Comment:
   should that be just a `assertThat(written).isEqualTo(expected)`? Because this will actually show expected/actual values if the check fails, whereas the precondition doesn't show that



##########
core/src/main/java/org/apache/iceberg/puffin/PuffinWriter.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.puffin;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.puffin.PuffinFormat.Flag;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+public class PuffinWriter implements FileAppender<Blob> {
+  // Must not be modified
+  private static final byte[] MAGIC = PuffinFormat.getMagic();
+
+  private final PositionOutputStream outputStream;
+  private final Map<String, String> properties;
+  private final PuffinCompressionCodec footerCompression;
+  private final PuffinCompressionCodec defaultBlobCompression;
+
+  private final List<BlobMetadata> writtenBlobsMetadata = Lists.newArrayList();
+  private boolean headerWritten;
+  private boolean finished;
+  private Optional<Integer> footerSize = Optional.empty();
+  private Optional<Long> fileSize = Optional.empty();
+
+  PuffinWriter(
+      OutputFile outputFile, Map<String, String> properties, boolean compressFooter,
+      PuffinCompressionCodec defaultBlobCompression) {
+    Preconditions.checkNotNull(outputFile, "outputFile is null");
+    Preconditions.checkNotNull(properties, "properties is null");
+    Preconditions.checkNotNull(defaultBlobCompression, "defaultBlobCompression is null");
+    this.outputStream = outputFile.create();
+    this.properties = ImmutableMap.copyOf(properties);
+    this.footerCompression = compressFooter ? PuffinFormat.FOOTER_COMPRESSION_CODEC : PuffinCompressionCodec.NONE;
+    this.defaultBlobCompression = defaultBlobCompression;
+  }
+
+  @Override
+  public void add(Blob blob) {
+    Preconditions.checkNotNull(blob, "blob is null");
+    checkNotFinished();
+    try {
+      writeHeaderIfNeeded();
+      long fileOffset = outputStream.getPos();
+      PuffinCompressionCodec codec = MoreObjects.firstNonNull(blob.requestedCompression(), defaultBlobCompression);
+      ByteBuffer rawData = PuffinFormat.compress(codec, blob.blobData());
+      int length = rawData.remaining();
+      writeFully(rawData);
+      writtenBlobsMetadata.add(new BlobMetadata(blob.type(), blob.inputFields(), fileOffset, length,
+          codec.codecName(), blob.properties()));
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @Override
+  public Metrics metrics() {
+    return new Metrics();
+  }
+
+  @Override
+  public long length() {
+    return fileSize();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!finished) {
+      finish();
+    }
+
+    outputStream.close();
+  }
+
+  private void writeHeaderIfNeeded() throws IOException {
+    if (headerWritten) {
+      return;
+    }
+
+    this.outputStream.write(MAGIC);
+    this.headerWritten = true;
+  }
+
+  public void finish() throws IOException {
+    checkNotFinished();
+    writeHeaderIfNeeded();
+    if (footerSize.isPresent()) {

Review Comment:
   nit: Preconditions.checkState(..) maybe?



##########
core/src/main/java/org/apache/iceberg/puffin/PuffinFormat.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.puffin;
+
+import io.airlift.compress.Compressor;
+import io.airlift.compress.zstd.ZstdCompressor;
+import io.airlift.compress.zstd.ZstdDecompressor;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.util.Pair;
+
+final class PuffinFormat {
+  private PuffinFormat() {
+  }
+
+  enum Flag {
+    FOOTER_PAYLOAD_COMPRESSED(0, 0),
+    /**/;
+
+    private static final Map<Pair<Integer, Integer>, Flag> BY_BYTE_AND_BIT = Stream.of(values())
+        .collect(ImmutableMap.toImmutableMap(
+            flag -> Pair.of(flag.byteNumber(), flag.bitNumber()), Function.identity()));
+
+    private final int byteNumber;
+    private final int bitNumber;
+
+    Flag(int byteNumber, int bitNumber) {
+      Preconditions.checkArgument(
+          0 <= byteNumber && byteNumber < PuffinFormat.FOOTER_STRUCT_FLAGS_LENGTH,
+          "Invalid byteNumber");
+      Preconditions.checkArgument(0 <= bitNumber && bitNumber < Byte.SIZE, "Invalid bitNumber");
+      this.byteNumber = byteNumber;
+      this.bitNumber = bitNumber;
+    }
+
+    @Nullable
+    static Flag fromBit(int byteNumber, int bitNumber) {
+      return BY_BYTE_AND_BIT.get(Pair.of(byteNumber, bitNumber));
+    }
+
+    public int byteNumber() {
+      return byteNumber;
+    }
+
+    public int bitNumber() {
+      return bitNumber;
+    }
+  }
+
+  static final int FOOTER_START_MAGIC_OFFSET = 0;
+  static final int FOOTER_START_MAGIC_LENGTH = getMagic().length;
+
+  // "Footer struct" denotes the fixed-length portion of the Footer
+  static final int FOOTER_STRUCT_PAYLOAD_SIZE_OFFSET = 0;
+  static final int FOOTER_STRUCT_FLAGS_OFFSET = FOOTER_STRUCT_PAYLOAD_SIZE_OFFSET + 4;
+  static final int FOOTER_STRUCT_FLAGS_LENGTH = 4;
+  static final int FOOTER_STRUCT_MAGIC_OFFSET = FOOTER_STRUCT_FLAGS_OFFSET + FOOTER_STRUCT_FLAGS_LENGTH;
+  static final int FOOTER_STRUCT_LENGTH = FOOTER_STRUCT_MAGIC_OFFSET + getMagic().length;
+
+  static final PuffinCompressionCodec FOOTER_COMPRESSION_CODEC = PuffinCompressionCodec.LZ4;
+
+  static byte[] getMagic() {
+    return new byte[] {0x50, 0x46, 0x41, 0x31};
+  }
+
+  static void writeIntegerLittleEndian(OutputStream outputStream, int value) throws IOException {
+    outputStream.write(0xFF & value);
+    outputStream.write(0xFF & (value >> 8));
+    outputStream.write(0xFF & (value >> 16));
+    outputStream.write(0xFF & (value >> 24));
+  }
+
+  static int readIntegerLittleEndian(byte[] data, int offset) {
+    return Byte.toUnsignedInt(data[offset]) |
+        (Byte.toUnsignedInt(data[offset + 1]) << 8) |
+        (Byte.toUnsignedInt(data[offset + 2]) << 16) |
+        (Byte.toUnsignedInt(data[offset + 3]) << 24);
+  }
+
+  static ByteBuffer compress(PuffinCompressionCodec codec, ByteBuffer input) {
+    switch (codec) {
+      case NONE:
+        return input.duplicate();
+      case LZ4:
+        // TODO requires LZ4 frame compressor, e.g. https://github.com/airlift/aircompressor/pull/142
+        break;
+      case ZSTD:
+        return compress(new ZstdCompressor(), input);
+    }
+    throw new UnsupportedOperationException("Unsupported codec: " + codec);

Review Comment:
   nit: `Unsupported compression codec` maybe?



##########
core/src/test/java/org/apache/iceberg/puffin/TestPuffinFormat.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.puffin;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.junit.Test;
+
+import static org.apache.iceberg.puffin.PuffinFormat.readIntegerLittleEndian;
+import static org.apache.iceberg.puffin.PuffinFormat.writeIntegerLittleEndian;
+import static org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestPuffinFormat {
+  @Test
+  public void testWriteIntegerLittleEndian() throws Exception {
+    testWriteIntegerLittleEndian(0, bytes(0, 0, 0, 0));
+    testWriteIntegerLittleEndian(42, bytes(42, 0, 0, 0));
+    testWriteIntegerLittleEndian(Integer.MAX_VALUE - 5, bytes(0xFA, 0xFF, 0xFF, 0x7F));
+    testWriteIntegerLittleEndian(-7, bytes(0xF9, 0xFF, 0xFF, 0xFF));
+  }
+
+  private void testWriteIntegerLittleEndian(int value, byte[] expected) throws Exception {
+    // Sanity check: validate the expectation
+    ByteBuffer buffer = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
+    buffer.putInt(value);
+    buffer.flip();
+    byte[] written = new byte[4];
+    buffer.get(written);
+    Preconditions.checkState(Arrays.equals(written, expected), "Invalid expected value");
+
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    writeIntegerLittleEndian(outputStream, value);
+    assertThat(outputStream.toByteArray()).isEqualTo(expected);
+  }
+
+  @Test
+  public void testReadIntegerLittleEndian() {
+    testReadIntegerLittleEndian(bytes(0, 0, 0, 0), 0, 0);
+    testReadIntegerLittleEndian(bytes(42, 0, 0, 0), 0, 42);
+    testReadIntegerLittleEndian(bytes(13, 42, 0, 0, 0, 14), 1, 42);
+    testReadIntegerLittleEndian(bytes(13, 0xFa, 0xFF, 0xFF, 0x7F, 14), 1, Integer.MAX_VALUE - 5);
+    testReadIntegerLittleEndian(bytes(13, 0xF9, 0xFF, 0xFF, 0xFF, 14), 1, -7);
+  }
+
+  private void testReadIntegerLittleEndian(byte[] input, int offset, int expected) {
+    // Sanity check: validate the expectation
+    Preconditions.checkArgument(

Review Comment:
   same here, should this be just using `assertThat(...).isEqualTo(..)`?



##########
core/src/main/java/org/apache/iceberg/puffin/PuffinFormat.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.puffin;
+
+import io.airlift.compress.Compressor;
+import io.airlift.compress.zstd.ZstdCompressor;
+import io.airlift.compress.zstd.ZstdDecompressor;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.util.Pair;
+
+final class PuffinFormat {
+  private PuffinFormat() {
+  }
+
+  enum Flag {
+    FOOTER_PAYLOAD_COMPRESSED(0, 0),
+    /**/;
+
+    private static final Map<Pair<Integer, Integer>, Flag> BY_BYTE_AND_BIT = Stream.of(values())
+        .collect(ImmutableMap.toImmutableMap(
+            flag -> Pair.of(flag.byteNumber(), flag.bitNumber()), Function.identity()));
+
+    private final int byteNumber;
+    private final int bitNumber;
+
+    Flag(int byteNumber, int bitNumber) {
+      Preconditions.checkArgument(
+          0 <= byteNumber && byteNumber < PuffinFormat.FOOTER_STRUCT_FLAGS_LENGTH,
+          "Invalid byteNumber");
+      Preconditions.checkArgument(0 <= bitNumber && bitNumber < Byte.SIZE, "Invalid bitNumber");
+      this.byteNumber = byteNumber;
+      this.bitNumber = bitNumber;
+    }
+
+    @Nullable
+    static Flag fromBit(int byteNumber, int bitNumber) {
+      return BY_BYTE_AND_BIT.get(Pair.of(byteNumber, bitNumber));
+    }
+
+    public int byteNumber() {
+      return byteNumber;
+    }
+
+    public int bitNumber() {
+      return bitNumber;
+    }
+  }
+
+  static final int FOOTER_START_MAGIC_OFFSET = 0;
+  static final int FOOTER_START_MAGIC_LENGTH = getMagic().length;
+
+  // "Footer struct" denotes the fixed-length portion of the Footer
+  static final int FOOTER_STRUCT_PAYLOAD_SIZE_OFFSET = 0;
+  static final int FOOTER_STRUCT_FLAGS_OFFSET = FOOTER_STRUCT_PAYLOAD_SIZE_OFFSET + 4;
+  static final int FOOTER_STRUCT_FLAGS_LENGTH = 4;
+  static final int FOOTER_STRUCT_MAGIC_OFFSET = FOOTER_STRUCT_FLAGS_OFFSET + FOOTER_STRUCT_FLAGS_LENGTH;
+  static final int FOOTER_STRUCT_LENGTH = FOOTER_STRUCT_MAGIC_OFFSET + getMagic().length;
+
+  static final PuffinCompressionCodec FOOTER_COMPRESSION_CODEC = PuffinCompressionCodec.LZ4;
+
+  static byte[] getMagic() {
+    return new byte[] {0x50, 0x46, 0x41, 0x31};
+  }
+
+  static void writeIntegerLittleEndian(OutputStream outputStream, int value) throws IOException {
+    outputStream.write(0xFF & value);
+    outputStream.write(0xFF & (value >> 8));
+    outputStream.write(0xFF & (value >> 16));
+    outputStream.write(0xFF & (value >> 24));
+  }
+
+  static int readIntegerLittleEndian(byte[] data, int offset) {
+    return Byte.toUnsignedInt(data[offset]) |
+        (Byte.toUnsignedInt(data[offset + 1]) << 8) |
+        (Byte.toUnsignedInt(data[offset + 2]) << 16) |
+        (Byte.toUnsignedInt(data[offset + 3]) << 24);
+  }
+
+  static ByteBuffer compress(PuffinCompressionCodec codec, ByteBuffer input) {
+    switch (codec) {
+      case NONE:
+        return input.duplicate();
+      case LZ4:
+        // TODO requires LZ4 frame compressor, e.g. https://github.com/airlift/aircompressor/pull/142
+        break;
+      case ZSTD:
+        return compress(new ZstdCompressor(), input);
+    }
+    throw new UnsupportedOperationException("Unsupported codec: " + codec);
+  }
+
+  private static ByteBuffer compress(Compressor compressor, ByteBuffer input) {
+    ByteBuffer output = ByteBuffer.allocate(compressor.maxCompressedLength(input.remaining()));
+    compressor.compress(input.duplicate(), output);
+    output.flip();
+    return output;
+  }
+
+  static ByteBuffer decompress(PuffinCompressionCodec codec, ByteBuffer input) {
+    switch (codec) {
+      case NONE:
+        return input.duplicate();
+
+      case LZ4:
+        // TODO requires LZ4 frame decompressor, e.g. https://github.com/airlift/aircompressor/pull/142
+        break;
+
+      case ZSTD: {
+        byte[] inputBytes;
+        int inputOffset;
+        int inputLength;
+        if (input.hasArray()) {
+          inputBytes = input.array();
+          inputOffset = input.arrayOffset();
+          inputLength = input.remaining();
+        } else {
+          // TODO implement ZstdDecompressor.getDecompressedSize for ByteBuffer to avoid copying
+          inputBytes = ByteBuffers.toByteArray(input);
+          inputOffset = 0;
+          inputLength = inputBytes.length;
+        }
+
+        byte[] decompressed =
+            new byte[Math.toIntExact(ZstdDecompressor.getDecompressedSize(inputBytes, inputOffset, inputLength))];
+        int decompressedLength =
+            new ZstdDecompressor().decompress(
+                inputBytes,
+                inputOffset,
+                inputLength,
+                decompressed,
+                0,
+                decompressed.length);
+        Preconditions.checkState(decompressedLength == decompressed.length, "Invalid decompressed length");
+        return ByteBuffer.wrap(decompressed);
+      }
+    }
+
+    throw new UnsupportedOperationException("Unsupported codec: " + codec);

Review Comment:
   nit: same as above



##########
core/src/main/java/org/apache/iceberg/puffin/PuffinFormat.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.puffin;
+
+import io.airlift.compress.Compressor;
+import io.airlift.compress.zstd.ZstdCompressor;
+import io.airlift.compress.zstd.ZstdDecompressor;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.util.Pair;
+
+final class PuffinFormat {
+  private PuffinFormat() {
+  }
+
+  enum Flag {
+    FOOTER_PAYLOAD_COMPRESSED(0, 0),
+    /**/;
+
+    private static final Map<Pair<Integer, Integer>, Flag> BY_BYTE_AND_BIT = Stream.of(values())
+        .collect(ImmutableMap.toImmutableMap(
+            flag -> Pair.of(flag.byteNumber(), flag.bitNumber()), Function.identity()));
+
+    private final int byteNumber;
+    private final int bitNumber;
+
+    Flag(int byteNumber, int bitNumber) {
+      Preconditions.checkArgument(
+          0 <= byteNumber && byteNumber < PuffinFormat.FOOTER_STRUCT_FLAGS_LENGTH,
+          "Invalid byteNumber");
+      Preconditions.checkArgument(0 <= bitNumber && bitNumber < Byte.SIZE, "Invalid bitNumber");
+      this.byteNumber = byteNumber;
+      this.bitNumber = bitNumber;
+    }
+
+    @Nullable
+    static Flag fromBit(int byteNumber, int bitNumber) {
+      return BY_BYTE_AND_BIT.get(Pair.of(byteNumber, bitNumber));
+    }
+
+    public int byteNumber() {
+      return byteNumber;
+    }
+
+    public int bitNumber() {
+      return bitNumber;
+    }
+  }
+
+  static final int FOOTER_START_MAGIC_OFFSET = 0;
+  static final int FOOTER_START_MAGIC_LENGTH = getMagic().length;
+
+  // "Footer struct" denotes the fixed-length portion of the Footer
+  static final int FOOTER_STRUCT_PAYLOAD_SIZE_OFFSET = 0;
+  static final int FOOTER_STRUCT_FLAGS_OFFSET = FOOTER_STRUCT_PAYLOAD_SIZE_OFFSET + 4;
+  static final int FOOTER_STRUCT_FLAGS_LENGTH = 4;
+  static final int FOOTER_STRUCT_MAGIC_OFFSET = FOOTER_STRUCT_FLAGS_OFFSET + FOOTER_STRUCT_FLAGS_LENGTH;
+  static final int FOOTER_STRUCT_LENGTH = FOOTER_STRUCT_MAGIC_OFFSET + getMagic().length;
+
+  static final PuffinCompressionCodec FOOTER_COMPRESSION_CODEC = PuffinCompressionCodec.LZ4;
+
+  static byte[] getMagic() {
+    return new byte[] {0x50, 0x46, 0x41, 0x31};
+  }
+
+  static void writeIntegerLittleEndian(OutputStream outputStream, int value) throws IOException {
+    outputStream.write(0xFF & value);
+    outputStream.write(0xFF & (value >> 8));
+    outputStream.write(0xFF & (value >> 16));
+    outputStream.write(0xFF & (value >> 24));
+  }
+
+  static int readIntegerLittleEndian(byte[] data, int offset) {
+    return Byte.toUnsignedInt(data[offset]) |
+        (Byte.toUnsignedInt(data[offset + 1]) << 8) |
+        (Byte.toUnsignedInt(data[offset + 2]) << 16) |
+        (Byte.toUnsignedInt(data[offset + 3]) << 24);
+  }
+
+  static ByteBuffer compress(PuffinCompressionCodec codec, ByteBuffer input) {
+    switch (codec) {
+      case NONE:
+        return input.duplicate();
+      case LZ4:
+        // TODO requires LZ4 frame compressor, e.g. https://github.com/airlift/aircompressor/pull/142
+        break;
+      case ZSTD:
+        return compress(new ZstdCompressor(), input);
+    }
+    throw new UnsupportedOperationException("Unsupported codec: " + codec);
+  }
+
+  private static ByteBuffer compress(Compressor compressor, ByteBuffer input) {
+    ByteBuffer output = ByteBuffer.allocate(compressor.maxCompressedLength(input.remaining()));
+    compressor.compress(input.duplicate(), output);
+    output.flip();
+    return output;
+  }
+
+  static ByteBuffer decompress(PuffinCompressionCodec codec, ByteBuffer input) {
+    switch (codec) {
+      case NONE:
+        return input.duplicate();
+
+      case LZ4:
+        // TODO requires LZ4 frame decompressor, e.g. https://github.com/airlift/aircompressor/pull/142
+        break;
+
+      case ZSTD: {
+        byte[] inputBytes;

Review Comment:
   maybe move the entire block into it's own method as the case block is getting quite long?



##########
core/src/test/java/org/apache/iceberg/puffin/TestFileMetadataParser.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.puffin;
+
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestFileMetadataParser {

Review Comment:
   maybe add a few tests with invalid/missing json fields?



##########
core/src/main/java/org/apache/iceberg/puffin/PuffinWriter.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.puffin;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.puffin.PuffinFormat.Flag;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+public class PuffinWriter implements FileAppender<Blob> {
+  // Must not be modified
+  private static final byte[] MAGIC = PuffinFormat.getMagic();
+
+  private final PositionOutputStream outputStream;
+  private final Map<String, String> properties;
+  private final PuffinCompressionCodec footerCompression;
+  private final PuffinCompressionCodec defaultBlobCompression;
+
+  private final List<BlobMetadata> writtenBlobsMetadata = Lists.newArrayList();
+  private boolean headerWritten;
+  private boolean finished;
+  private Optional<Integer> footerSize = Optional.empty();
+  private Optional<Long> fileSize = Optional.empty();
+
+  PuffinWriter(
+      OutputFile outputFile, Map<String, String> properties, boolean compressFooter,
+      PuffinCompressionCodec defaultBlobCompression) {
+    Preconditions.checkNotNull(outputFile, "outputFile is null");
+    Preconditions.checkNotNull(properties, "properties is null");
+    Preconditions.checkNotNull(defaultBlobCompression, "defaultBlobCompression is null");
+    this.outputStream = outputFile.create();
+    this.properties = ImmutableMap.copyOf(properties);
+    this.footerCompression = compressFooter ? PuffinFormat.FOOTER_COMPRESSION_CODEC : PuffinCompressionCodec.NONE;
+    this.defaultBlobCompression = defaultBlobCompression;
+  }
+
+  @Override
+  public void add(Blob blob) {
+    Preconditions.checkNotNull(blob, "blob is null");
+    checkNotFinished();
+    try {
+      writeHeaderIfNeeded();
+      long fileOffset = outputStream.getPos();
+      PuffinCompressionCodec codec = MoreObjects.firstNonNull(blob.requestedCompression(), defaultBlobCompression);
+      ByteBuffer rawData = PuffinFormat.compress(codec, blob.blobData());
+      int length = rawData.remaining();
+      writeFully(rawData);
+      writtenBlobsMetadata.add(new BlobMetadata(blob.type(), blob.inputFields(), fileOffset, length,
+          codec.codecName(), blob.properties()));
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @Override
+  public Metrics metrics() {
+    return new Metrics();
+  }
+
+  @Override
+  public long length() {
+    return fileSize();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!finished) {
+      finish();
+    }
+
+    outputStream.close();
+  }
+
+  private void writeHeaderIfNeeded() throws IOException {
+    if (headerWritten) {
+      return;
+    }
+
+    this.outputStream.write(MAGIC);
+    this.headerWritten = true;
+  }
+
+  public void finish() throws IOException {
+    checkNotFinished();
+    writeHeaderIfNeeded();
+    if (footerSize.isPresent()) {
+      throw new IllegalStateException("footerSize already set");
+    }
+
+    long footerOffset = outputStream.getPos();
+    writeFooter();
+    this.footerSize = Optional.of(Math.toIntExact(outputStream.getPos() - footerOffset));
+    this.fileSize = Optional.of(outputStream.getPos());
+    this.finished = true;
+  }
+
+  private void writeFooter() throws IOException {
+    FileMetadata fileMetadata = new FileMetadata(writtenBlobsMetadata, properties);
+    ByteBuffer footerJson = ByteBuffer.wrap(
+        FileMetadataParser.toJson(fileMetadata, false).getBytes(StandardCharsets.UTF_8));
+    ByteBuffer footerPayload = PuffinFormat.compress(footerCompression, footerJson);
+    outputStream.write(MAGIC);
+    int footerPayloadLength = footerPayload.remaining();
+    writeFully(footerPayload);
+    PuffinFormat.writeIntegerLittleEndian(outputStream, footerPayloadLength);
+    writeFlags();
+    outputStream.write(MAGIC);
+  }
+
+  private void writeFlags() throws IOException {
+    Map<Integer, List<Flag>> flagsByByteNumber = fileFlags().stream()
+        .collect(Collectors.groupingBy(Flag::byteNumber));
+    for (int byteNumber = 0; byteNumber < PuffinFormat.FOOTER_STRUCT_FLAGS_LENGTH; byteNumber++) {
+      int byteFlag = 0;
+      for (Flag flag : flagsByByteNumber.getOrDefault(byteNumber, ImmutableList.of())) {
+        byteFlag |= 0x1 << flag.bitNumber();
+      }
+      outputStream.write(byteFlag);
+    }
+  }
+
+  private void writeFully(ByteBuffer buffer) throws IOException {
+    WritableByteChannel channel = Channels.newChannel(outputStream);
+    while (buffer.remaining() > 0) {
+      channel.write(buffer);
+    }
+  }
+
+  public long footerSize() {
+    return footerSize.orElseThrow(() -> new IllegalStateException("Footer not written yet"));
+  }
+
+  public long fileSize() {
+    return fileSize.orElseThrow(() -> new IllegalStateException("File not written yet"));
+  }
+
+  public List<BlobMetadata> writtenBlobsMetadata() {
+    return ImmutableList.copyOf(writtenBlobsMetadata);
+  }
+
+  private Set<Flag> fileFlags() {
+    EnumSet<Flag> flags = EnumSet.noneOf(Flag.class);
+    if (footerCompression != PuffinCompressionCodec.NONE) {
+      flags.add(Flag.FOOTER_PAYLOAD_COMPRESSED);
+    }
+
+    return flags;
+  }
+
+  private void checkNotFinished() {
+    if (finished) {

Review Comment:
   nit: Preconditions.checkState(..) maybe?



##########
core/src/main/java/org/apache/iceberg/puffin/PuffinReader.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.puffin;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.RangeReadable;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.puffin.PuffinFormat.Flag;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.io.ByteStreams;
+import org.apache.iceberg.util.Pair;
+
+public class PuffinReader implements Closeable {
+  // Must not be modified
+  private static final byte[] MAGIC = PuffinFormat.getMagic();
+
+  private final long fileSize;
+  private final SeekableInputStream input;
+  private Integer knownFooterSize;
+  private FileMetadata knownFileMetadata;
+
+  PuffinReader(InputFile inputFile, @Nullable Long fileSize, @Nullable Long footerSize) {
+    Preconditions.checkNotNull(inputFile, "inputFile is null");
+    this.fileSize = fileSize == null ? inputFile.getLength() : fileSize;
+    this.input = inputFile.newStream();
+    if (footerSize != null) {
+      Preconditions.checkArgument(0 < footerSize && footerSize <= this.fileSize - MAGIC.length,
+          "Invalid footer size: %s", footerSize);

Review Comment:
   should this maybe mention what the valid size for the footer is?



##########
core/src/test/java/org/apache/iceberg/puffin/TestPuffinWriter.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.puffin;
+
+import java.nio.ByteBuffer;
+import org.apache.iceberg.io.InMemoryOutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.iceberg.puffin.PuffinCompressionCodec.NONE;
+import static org.apache.iceberg.puffin.PuffinCompressionCodec.ZSTD;
+import static org.apache.iceberg.puffin.PuffinFormatTestUtil.EMPTY_PUFFIN_UNCOMPRESSED_FOOTER_SIZE;
+import static org.apache.iceberg.puffin.PuffinFormatTestUtil.readTestResource;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+public class TestPuffinWriter {
+  @Test
+  public void testEmptyFooterCompressed() {
+    InMemoryOutputFile outputFile = new InMemoryOutputFile();
+
+    PuffinWriter writer = Puffin.write(outputFile)
+        .compressFooter()
+        .build();
+    assertThatThrownBy(writer::footerSize)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Footer not written yet");
+    assertThatThrownBy(writer::finish)
+        .isInstanceOf(UnsupportedOperationException.class)
+        .hasMessage("Unsupported codec: LZ4");
+    assertThatThrownBy(writer::close)
+        .isInstanceOf(UnsupportedOperationException.class)
+        .hasMessage("Unsupported codec: LZ4");

Review Comment:
   I'm not sure this is intuitive for users to understand why this exception is happening (given that currently LZ4 is the default when using `compressFooter()` and it's not implemented). Should the default compressor maybe be configured to ZSTD?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org