You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/02/14 09:50:39 UTC

[flink-table-store] 05/05: [FLINK-25820] Introduce FileStoreSource

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 4089be21d298be3bc11274dfbf2238d1e5c34114
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jan 26 18:29:43 2022 +0800

    [FLINK-25820] Introduce FileStoreSource
    
    This closes #15
---
 .../store/connector/source/FileStoreSource.java    | 130 +++++++++++++++++++++
 1 file changed, 130 insertions(+)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
new file mode 100644
index 0000000..a9f1497
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.table.store.utils.ProjectionUtils.project;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** {@link Source} of file store. */
+public class FileStoreSource
+        implements Source<RowData, FileStoreSourceSplit, PendingSplitsCheckpoint> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FileStore fileStore;
+
+    private final RowType rowType;
+
+    private final int[] partitions;
+
+    private final int[] keys;
+
+    @Nullable private final int[][] projectedFields;
+
+    @Nullable private final Predicate partitionPredicate;
+
+    @Nullable private final Predicate fieldsPredicate;
+
+    public FileStoreSource(
+            FileStore fileStore,
+            RowType rowType,
+            int[] partitions,
+            int[] keys,
+            @Nullable int[][] projectedFields,
+            @Nullable Predicate partitionPredicate,
+            @Nullable Predicate fieldsPredicate) {
+        this.fileStore = fileStore;
+        this.rowType = rowType;
+        this.partitions = partitions;
+        this.keys = keys;
+        this.projectedFields = projectedFields;
+        this.partitionPredicate = partitionPredicate;
+        this.fieldsPredicate = fieldsPredicate;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        // TODO supports streaming reading for file store
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SourceReader<RowData, FileStoreSourceSplit> createReader(SourceReaderContext context) {
+        FileStoreRead read = fileStore.newRead();
+        if (projectedFields != null) {
+            if (keys.length == 0) {
+                read.withKeyProjection(projectedFields);
+            } else {
+                read.withValueProjection(projectedFields);
+            }
+        }
+        return new FileStoreSourceReader(context, read, keys.length == 0);
+    }
+
+    @Override
+    public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> createEnumerator(
+            SplitEnumeratorContext<FileStoreSourceSplit> context) {
+        FileStoreScan scan = fileStore.newScan();
+        if (partitionPredicate != null) {
+            scan.withPartitionFilter(partitionPredicate);
+        }
+        if (fieldsPredicate != null) {
+            if (keys.length == 0) {
+                scan.withKeyFilter(fieldsPredicate);
+            } else {
+                scan.withValueFilter(fieldsPredicate);
+            }
+        }
+        return new StaticFileStoreSplitEnumerator(context, scan);
+    }
+
+    @Override
+    public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnumerator(
+            SplitEnumeratorContext<FileStoreSourceSplit> context,
+            PendingSplitsCheckpoint checkpoint) {
+        checkArgument(checkpoint.nextSnapshotId() == -1);
+        return new StaticFileStoreSplitEnumerator(context, checkpoint.splits());
+    }
+
+    @Override
+    public FileStoreSourceSplitSerializer getSplitSerializer() {
+        return new FileStoreSourceSplitSerializer(
+                project(rowType, partitions), project(rowType, keys), rowType);
+    }
+
+    @Override
+    public PendingSplitsCheckpointSerializer getEnumeratorCheckpointSerializer() {
+        return new PendingSplitsCheckpointSerializer(getSplitSerializer());
+    }
+}