You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/10/20 13:47:56 UTC

[GitHub] [inlong] EMsnap commented on a diff in pull request #6215: [INLONG-6214][Sort] Support multiple sink for IcebergLoadNode

EMsnap commented on code in PR #6215:
URL: https://github.com/apache/inlong/pull/6215#discussion_r1000656733


##########
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java:
##########
@@ -0,0 +1,395 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ReplacePartitions;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotUpdate;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.iceberg.sink.DeltaManifests;
+import org.apache.inlong.sort.iceberg.sink.DeltaManifestsSerializer;
+import org.apache.inlong.sort.iceberg.sink.FlinkManifestUtil;
+import org.apache.inlong.sort.iceberg.sink.ManifestOutputFileFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.SortedMap;
+
+public class IcebergSingleFileCommiter extends IcebergProcessFunction<WriteResult, Void>
+        implements CheckpointedFunction, CheckpointListener {
+    private static final long serialVersionUID = 1L;
+    private static final long INITIAL_CHECKPOINT_ID = -1L;
+    private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
+
+    private static final Logger LOG = LoggerFactory.getLogger(IcebergSingleFileCommiter.class);
+    private static final String FLINK_JOB_ID = "flink.job-id";
+
+    // The max checkpoint id we've committed to iceberg table. As the flink's checkpoint is always increasing, so we
+    // could correctly commit all the data files whose checkpoint id is greater than the max committed one to iceberg
+    // table, for avoiding committing the same data files twice. This id will be attached to iceberg's meta when
+    // committing the iceberg transaction.
+    private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id";
+    static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits";
+
+    // TableLoader to load iceberg table lazily.
+    private final TableLoader tableLoader;
+    private final boolean replacePartitions;
+
+    // A sorted map to maintain the completed data files for each pending checkpointId (which have not been committed
+    // to iceberg table). We need a sorted map here because there's possible that few checkpoints snapshot failed, for
+    // example: the 1st checkpoint have 2 data files <1, <file0, file1>>, the 2st checkpoint have 1 data files
+    // <2, <file3>>. Snapshot for checkpoint#1 interrupted because of network/disk failure etc, while we don't expect
+    // any data loss in iceberg table. So we keep the finished files <1, <file0, file1>> in memory and retry to commit
+    // iceberg table when the next checkpoint happen.
+    private final NavigableMap<Long, byte[]> dataFilesPerCheckpoint = Maps.newTreeMap();
+
+    // The completed files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the
+    // 'dataFilesPerCheckpoint'.
+    private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();
+
+    // It will have an unique identifier for one job.
+    private transient String flinkJobId;
+    private transient Table table;
+    private transient ManifestOutputFileFactory manifestOutputFileFactory;
+    private transient long maxCommittedCheckpointId;
+    private transient int continuousEmptyCheckpoints;
+    private transient int maxContinuousEmptyCommits;
+    // There're two cases that we restore from flink checkpoints: the first case is restoring from snapshot created by
+    // the same flink job; another case is restoring from snapshot created by another different job. For the second
+    // case, we need to maintain the old flink job's id in flink state backend to find the max-committed-checkpoint-id
+    // when traversing iceberg table's snapshots.
+    private final ListStateDescriptor<String> JOB_ID_DESCRIPTOR;
+    private transient ListState<String> jobIdState;
+    // All pending checkpoints states for this function.
+    private final ListStateDescriptor<SortedMap<Long, byte[]>> STATE_DESCRIPTOR;
+    private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
+
+    public IcebergSingleFileCommiter(TableIdentifier tableId, TableLoader tableLoader, boolean replacePartitions) {
+        // Here must distinguish state descriptor with tableId, because all icebergSingleFileCommiter state in
+        // one IcebergMultipleFilesCommiter use same StateStore.
+        this.tableLoader = tableLoader;
+        this.replacePartitions = replacePartitions;
+        this.JOB_ID_DESCRIPTOR= new ListStateDescriptor<>(
+                String.format("iceberg(%s)-flink-job-id", tableId.toString()), BasicTypeInfo.STRING_TYPE_INFO);
+        this.STATE_DESCRIPTOR = buildStateDescriptor(tableId);
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        this.flinkJobId = getRuntimeContext().getJobId().toString();
+
+        // Open the table loader and load the table.
+        this.tableLoader.open();
+        this.table = tableLoader.loadTable();
+
+        maxContinuousEmptyCommits = PropertyUtil.propertyAsInt(table.properties(), MAX_CONTINUOUS_EMPTY_COMMITS, 10);
+        Preconditions.checkArgument(maxContinuousEmptyCommits > 0,
+                MAX_CONTINUOUS_EMPTY_COMMITS + " must be positive");
+
+        int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+        int attemptId = getRuntimeContext().getAttemptNumber();
+        this.manifestOutputFileFactory = FlinkManifestUtil
+                .createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
+        this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
+
+        this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
+        this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
+        if (context.isRestored()) {
+            String restoredFlinkJobId = jobIdState.get().iterator().next();
+            Preconditions.checkState(!Strings.isNullOrEmpty(restoredFlinkJobId),
+                    "Flink job id parsed from checkpoint snapshot shouldn't be null or empty");
+
+            // Since flink's checkpoint id will start from the max-committed-checkpoint-id + 1 in the new flink job even
+            // if it's restored from a snapshot created by another different flink job, so it's safe to assign the max
+            // committed checkpoint id from restored flink job to the current flink job.
+            this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, restoredFlinkJobId);
+
+            NavigableMap<Long, byte[]> uncommittedDataFiles = Maps
+                    .newTreeMap(checkpointsState.get().iterator().next())
+                    .tailMap(maxCommittedCheckpointId, false);
+            if (!uncommittedDataFiles.isEmpty()) {
+                // Committed all uncommitted data files from the old flink job to iceberg table.
+                long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
+                commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);
+            }
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        long checkpointId = context.getCheckpointId();
+        LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", table, checkpointId);
+
+        // Update the checkpoint state.
+        dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId));
+        // Reset the snapshot state to the latest state.
+        checkpointsState.clear();
+        checkpointsState.add(dataFilesPerCheckpoint);
+
+        jobIdState.clear();
+        jobIdState.add(flinkJobId);
+
+        // Clear the local buffer for current checkpoint.
+        writeResultsOfCurrentCkpt.clear();
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        // It's possible that we have the following events:
+        //   1. snapshotState(ckpId);
+        //   2. snapshotState(ckpId+1);
+        //   3. notifyCheckpointComplete(ckpId+1);
+        //   4. notifyCheckpointComplete(ckpId);
+        // For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files,
+        // Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
+        if (checkpointId > maxCommittedCheckpointId) {
+            commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
+            this.maxCommittedCheckpointId = checkpointId;
+        }
+    }
+
+    private void commitUpToCheckpoint(NavigableMap<Long, byte[]> deltaManifestsMap,
+            String newFlinkJobId,
+            long checkpointId) throws IOException {
+        NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);

Review Comment:
   large chunk of duplicate code in IcebergFilesCommitter, any chance to abstract this method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org