You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/10/26 02:27:26 UTC

[GitHub] [iceberg] JingsongLi commented on a change in pull request #1477: Flink: maintain the complete data files into manifest before checkpoint finished.

JingsongLi commented on a change in pull request #1477:
URL: https://github.com/apache/iceberg/pull/1477#discussion_r511687317



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestSerializer.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+class FlinkManifestSerializer implements SimpleVersionedSerializer<ManifestFile> {
+  private static final int VERSION_NUM = 1;
+  static final FlinkManifestSerializer INSTANCE = new FlinkManifestSerializer();
+
+  @Override
+  public int getVersion() {
+    return VERSION_NUM;
+  }
+
+  @Override
+  public byte[] serialize(ManifestFile manifestFile) throws IOException {
+    Preconditions.checkNotNull(manifestFile, "ManifestFile to be serialized should not be null");
+
+    DataOutputSerializer out = new DataOutputSerializer(256);
+    out.writeInt(VERSION_NUM);

Review comment:
       Why write `VERSION_NUM` again? Should this be a bug?

##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroEncoderUtil.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class AvroEncoderUtil {
+
+  private AvroEncoderUtil() {
+  }
+
+  static {
+    LogicalTypes.register(LogicalMap.NAME, schema -> LogicalMap.get());
+  }
+
+  private static final int VERSION = 1;

Review comment:
       You should remove version things in this `AvroEncoderUtil` since `SimpleVersionedSerializer` already handle versions.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestSerializer.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+class FlinkManifestSerializer implements SimpleVersionedSerializer<ManifestFile> {
+  private static final int VERSION_NUM = 1;
+  static final FlinkManifestSerializer INSTANCE = new FlinkManifestSerializer();
+
+  @Override
+  public int getVersion() {
+    return VERSION_NUM;
+  }
+
+  @Override
+  public byte[] serialize(ManifestFile manifestFile) throws IOException {
+    Preconditions.checkNotNull(manifestFile, "ManifestFile to be serialized should not be null");
+
+    DataOutputSerializer out = new DataOutputSerializer(256);
+    out.writeInt(VERSION_NUM);
+
+    byte[] serialized = ManifestFiles.encode(manifestFile);
+    out.writeInt(serialized.length);
+    out.write(serialized);
+
+    return out.getCopyOfBuffer();
+  }
+
+  @Override
+  public ManifestFile deserialize(int version, byte[] serialized) throws IOException {
+    if (version == VERSION_NUM) {
+      return ManifestFiles.decode(serialized);
+    } else {
+      throw new IOException("Unrecognized version or corrupt state: " + version);
+    }
+  }
+
+  static ManifestFile readVersionAndDeserialize(byte[] versionedSerialized) throws IOException {

Review comment:
       You should use `SimpleVersionedSerialization.readVersionAndDeSerialize`

##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroEncoderUtil.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class AvroEncoderUtil {
+
+  private AvroEncoderUtil() {
+  }
+
+  static {
+    LogicalTypes.register(LogicalMap.NAME, schema -> LogicalMap.get());
+  }
+
+  private static final int VERSION = 1;
+  private static final byte[] MAGIC_BYTES = new byte[] {'a', 'V', 'R', VERSION};
+
+  private static byte[] encodeInt(int value) {
+    return ByteBuffer.allocate(4).putInt(value).array();
+  }
+
+  private static int decodeInt(byte[] value) {
+    return ByteBuffer.wrap(value).getInt();
+  }
+
+  public static <T> byte[] encode(T datum, Schema avroSchema) throws IOException {
+    try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+      // Write the magic bytes
+      out.write(MAGIC_BYTES);
+
+      // Write the length of avro schema string.
+      byte[] avroSchemaBytes = avroSchema.toString().getBytes(StandardCharsets.UTF_8);
+      out.write(encodeInt(avroSchemaBytes.length));
+
+      // Write the avro schema string.
+      out.write(avroSchemaBytes);
+
+      // Encode the datum with avro schema.
+      BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+      DatumWriter<T> writer = new GenericAvroWriter<>(avroSchema);
+      writer.write(datum, encoder);
+      encoder.flush();
+      return out.toByteArray();
+    }
+  }
+
+  public static <T> T decode(byte[] data) throws IOException {
+    byte[] buffer4 = new byte[4];
+    try (ByteArrayInputStream in = new ByteArrayInputStream(data, 0, data.length)) {
+      // Read the magic bytes
+      Preconditions.checkState(in.read(buffer4) == 4, "Size of magic bytes isn't 4.");
+      Preconditions.checkState(Arrays.equals(MAGIC_BYTES, buffer4), "Magic bytes mismatched.");
+
+      // Read the length of avro schema string.
+      Preconditions.checkState(in.read(buffer4) == 4, "Could not read an integer from input stream.");
+      int avroSchemaLength = decodeInt(buffer4);
+      Preconditions.checkState(avroSchemaLength > 0, "Length of avro schema string should be positive");
+
+      // Read the avro schema string.
+      byte[] avroSchemaBytes = new byte[avroSchemaLength];

Review comment:
       Can you extract two methods `encodeSchema(Schema schema, OutputStream out)` and `decodeSchema(InputStream in)`?

##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroEncoderUtil.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class AvroEncoderUtil {
+
+  private AvroEncoderUtil() {
+  }
+
+  static {
+    LogicalTypes.register(LogicalMap.NAME, schema -> LogicalMap.get());
+  }
+
+  private static final int VERSION = 1;
+  private static final byte[] MAGIC_BYTES = new byte[] {'a', 'V', 'R', VERSION};
+
+  private static byte[] encodeInt(int value) {
+    return ByteBuffer.allocate(4).putInt(value).array();
+  }
+
+  private static int decodeInt(byte[] value) {
+    return ByteBuffer.wrap(value).getInt();
+  }
+
+  public static <T> byte[] encode(T datum, Schema avroSchema) throws IOException {
+    try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+      // Write the magic bytes
+      out.write(MAGIC_BYTES);
+
+      // Write the length of avro schema string.
+      byte[] avroSchemaBytes = avroSchema.toString().getBytes(StandardCharsets.UTF_8);
+      out.write(encodeInt(avroSchemaBytes.length));
+
+      // Write the avro schema string.
+      out.write(avroSchemaBytes);
+
+      // Encode the datum with avro schema.
+      BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+      DatumWriter<T> writer = new GenericAvroWriter<>(avroSchema);
+      writer.write(datum, encoder);
+      encoder.flush();
+      return out.toByteArray();
+    }
+  }
+
+  public static <T> T decode(byte[] data) throws IOException {
+    byte[] buffer4 = new byte[4];
+    try (ByteArrayInputStream in = new ByteArrayInputStream(data, 0, data.length)) {

Review comment:
       Can we use `DataInputStream` and `DataOutputStream` to avoid implementing these parser logic?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifest.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+class FlinkManifest {

Review comment:
       I don't quite understand this class. It looks like a manifest writer and reader?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org