You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/09/06 05:29:08 UTC

[incubator-seatunnel] branch dev updated: [translation][spark] delete ContinuousReadSupport code (#2657)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4f3b8c5a6 [translation][spark]  delete ContinuousReadSupport code (#2657)
4f3b8c5a6 is described below

commit 4f3b8c5a6fde6524d38b58f0f2bebfa3afee0994
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Tue Sep 6 13:29:03 2022 +0800

    [translation][spark]  delete ContinuousReadSupport code (#2657)
    
    ContinuousReadSupport will not be supported, so delete it
---
 .../spark/source/SeaTunnelSourceSupport.java       |  12 +--
 .../source/continnous/ContinuousPartition.java     |  54 ----------
 .../source/continnous/ContinuousSourceReader.java  | 117 ---------------------
 .../source/continnous/CoordinationState.java       |  49 ---------
 .../ParallelContinuousPartitionReader.java         |  78 --------------
 5 files changed, 1 insertion(+), 309 deletions(-)

diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
index baf7abcb7..22a4859aa 100644
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
+++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
@@ -22,7 +22,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.translation.spark.source.batch.BatchSourceReader;
-import org.apache.seatunnel.translation.spark.source.continnous.ContinuousSourceReader;
 import org.apache.seatunnel.translation.spark.source.micro.MicroBatchSourceReader;
 
 import org.apache.commons.lang3.StringUtils;
@@ -30,13 +29,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.sources.DataSourceRegister;
-import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
 import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
 import org.apache.spark.sql.sources.v2.ReadSupport;
 import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
 import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
 import org.apache.spark.sql.types.StructType;
 import org.slf4j.Logger;
@@ -44,7 +41,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Optional;
 
-public class SeaTunnelSourceSupport implements DataSourceV2, ReadSupport, MicroBatchReadSupport, ContinuousReadSupport, DataSourceRegister {
+public class SeaTunnelSourceSupport implements DataSourceV2, ReadSupport, MicroBatchReadSupport, DataSourceRegister {
     private static final Logger LOG = LoggerFactory.getLogger(SeaTunnelSourceSupport.class);
     public static final String SEA_TUNNEL_SOURCE_NAME = "SeaTunnelSource";
     public static final Integer CHECKPOINT_INTERVAL_DEFAULT = 10000;
@@ -79,13 +76,6 @@ public class SeaTunnelSourceSupport implements DataSourceV2, ReadSupport, MicroB
         return new MicroBatchSourceReader(seaTunnelSource, parallelism, checkpointId, checkpointInterval, checkpointPath, hdfsRoot, hdfsUser);
     }
 
-    @Override
-    public ContinuousReader createContinuousReader(Optional<StructType> rowTypeOptional, String checkpointLocation, DataSourceOptions options) {
-        SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource = getSeaTunnelSource(options);
-        Integer parallelism = options.getInt(Constants.SOURCE_PARALLELISM, 1);
-        return new ContinuousSourceReader(seaTunnelSource, parallelism);
-    }
-
     private SeaTunnelSource<SeaTunnelRow, ?, ?> getSeaTunnelSource(DataSourceOptions options) {
         return SerializationUtils.stringToObject(options.get(Constants.SOURCE_SERIALIZATION)
             .orElseThrow(() -> new UnsupportedOperationException("Serialization information for the SeaTunnelSource is required")));
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartition.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartition.java
deleted file mode 100644
index f7f091e45..000000000
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartition.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.spark.source.continnous;
-
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.translation.spark.common.source.continnous.ParallelContinuousPartitionReader;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
-
-import java.util.List;
-import java.util.Map;
-
-public class ContinuousPartition implements InputPartition<InternalRow> {
-    protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
-    protected final Integer parallelism;
-    protected final Integer subtaskId;
-    protected final Integer checkpointId;
-    protected final Map<Integer, List<byte[]>> restoredState;
-
-    public ContinuousPartition(SeaTunnelSource<SeaTunnelRow, ?, ?> source,
-                               Integer parallelism,
-                               Integer subtaskId,
-                               Integer checkpointId,
-                               Map<Integer, List<byte[]>> restoredState) {
-        this.source = source;
-        this.parallelism = parallelism;
-        this.subtaskId = subtaskId;
-        this.checkpointId = checkpointId;
-        this.restoredState = restoredState;
-    }
-
-    @Override
-    public InputPartitionReader<InternalRow> createPartitionReader() {
-        return new ParallelContinuousPartitionReader(source, parallelism, subtaskId, checkpointId, restoredState);
-    }
-}
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousSourceReader.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousSourceReader.java
deleted file mode 100644
index 1b46a8e74..000000000
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousSourceReader.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.spark.source.continnous;
-
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.common.utils.SerializationUtils;
-import org.apache.seatunnel.translation.spark.common.ReaderState;
-import org.apache.seatunnel.translation.spark.common.source.continnous.CoordinationState;
-import org.apache.seatunnel.translation.spark.common.utils.TypeConverterUtils;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
-import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
-import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
-import org.apache.spark.sql.types.StructType;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-public class ContinuousSourceReader implements ContinuousReader {
-
-    private final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
-    private final Integer parallelism;
-    private final Map<Integer, ReaderState> readerStateMap = new HashMap<>();
-    private CoordinationState coordinationState;
-    private int checkpointId = 1;
-
-    public ContinuousSourceReader(SeaTunnelSource<SeaTunnelRow, ?, ?> source, Integer parallelism) {
-        this.source = source;
-        this.parallelism = parallelism;
-        throw new UnsupportedOperationException("Continuous source is not currently supported.");
-    }
-
-    @Override
-    public Offset mergeOffsets(PartitionOffset[] subtaskStates) {
-        // aggregate state
-        List<ReaderState> readerStateList = new ArrayList<>(subtaskStates.length);
-        for (PartitionOffset subtaskState : subtaskStates) {
-            if (subtaskState instanceof ReaderState) {
-                ReaderState readerState = (ReaderState) subtaskState;
-                readerStateMap.put(readerState.getSubtaskId(), readerState);
-                readerStateList.add(readerState);
-            } else {
-                throw new UnsupportedOperationException(String.format("Unsupported state type: %s", subtaskState.getClass()));
-            }
-        }
-        return new CoordinationState(readerStateList, readerStateList.get(0).getCheckpointId());
-    }
-
-    @Override
-    public Offset deserializeOffset(String aggregatedState) {
-        return SerializationUtils.stringToObject(aggregatedState);
-    }
-
-    @Override
-    public void setStartOffset(Optional<Offset> start) {
-        // initialize or restore state
-        start.ifPresent(state -> {
-            CoordinationState restoreState = (CoordinationState) state;
-            checkpointId = restoreState.getCheckpointId();
-            for (ReaderState readerState : restoreState.getReaderStateList()) {
-                readerStateMap.put(readerState.getSubtaskId(), readerState);
-            }
-        });
-        coordinationState = (CoordinationState) start.orElse(new CoordinationState(new ArrayList<>(), 1));
-    }
-
-    @Override
-    public Offset getStartOffset() {
-        return coordinationState;
-    }
-
-    @Override
-    public void commit(Offset end) {
-        // TODO: rpc commit {@link ContinuousPartitionReader#notifyCheckpointComplete}
-    }
-
-    @Override
-    public void stop() {
-        // TODO: stop rpc
-    }
-
-    @Override
-    public StructType readSchema() {
-        return (StructType) TypeConverterUtils.convert(source.getProducedType());
-    }
-
-    @Override
-    public List<InputPartition<InternalRow>> planInputPartitions() {
-        List<InputPartition<InternalRow>> virtualPartitions = new ArrayList<>(parallelism);
-        for (int subtaskId = 0; subtaskId < parallelism; subtaskId++) {
-            ReaderState readerState = readerStateMap.get(subtaskId);
-            virtualPartitions.add(new ContinuousPartition(source, parallelism, subtaskId, checkpointId, readerState == null ? null : readerState.getBytes()));
-        }
-        return virtualPartitions;
-    }
-}
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/continnous/CoordinationState.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/continnous/CoordinationState.java
deleted file mode 100644
index a78fccdea..000000000
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/continnous/CoordinationState.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.spark.common.source.continnous;
-
-import org.apache.seatunnel.common.utils.SerializationUtils;
-import org.apache.seatunnel.translation.spark.common.ReaderState;
-
-import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
-
-import java.io.Serializable;
-import java.util.List;
-
-public class CoordinationState extends Offset implements Serializable {
-    private List<ReaderState> readerStateList;
-    private Integer checkpointId;
-
-    public CoordinationState(List<ReaderState> readerStateList, Integer checkpointId) {
-        this.readerStateList = readerStateList;
-        this.checkpointId = checkpointId;
-    }
-
-    @Override
-    public String json() {
-        return SerializationUtils.objectToString(this);
-    }
-
-    public List<ReaderState> getReaderStateList() {
-        return readerStateList;
-    }
-
-    public Integer getCheckpointId() {
-        return checkpointId;
-    }
-}
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/continnous/ParallelContinuousPartitionReader.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/continnous/ParallelContinuousPartitionReader.java
deleted file mode 100644
index e73e5fd4b..000000000
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/continnous/ParallelContinuousPartitionReader.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.spark.common.source.continnous;
-
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.translation.source.ParallelSource;
-import org.apache.seatunnel.translation.spark.common.ReaderState;
-import org.apache.seatunnel.translation.spark.common.source.batch.ParallelBatchPartitionReader;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader;
-import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-public class ParallelContinuousPartitionReader extends ParallelBatchPartitionReader implements ContinuousInputPartitionReader<InternalRow> {
-    protected volatile Integer checkpointId;
-    protected final Map<Integer, List<byte[]>> restoredState;
-
-    public ParallelContinuousPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> source, Integer parallelism, Integer subtaskId, Integer checkpointId, Map<Integer, List<byte[]>> restoredState) {
-        super(source, parallelism, subtaskId);
-        this.checkpointId = checkpointId;
-        this.restoredState = restoredState;
-    }
-
-    @Override
-    protected ParallelSource<SeaTunnelRow, ?, ?> createInternalSource() {
-        return new InternalParallelSource<>(source,
-            restoredState,
-            parallelism,
-            subtaskId);
-    }
-
-    @Override
-    public PartitionOffset getOffset() {
-        Map<Integer, List<byte[]>> bytes;
-        try {
-            bytes = internalSource.snapshotState(checkpointId);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        ReaderState readerState = new ReaderState(bytes, subtaskId, checkpointId++);
-        return readerState;
-    }
-
-    // TODO: RPC call
-
-    /**
-     * The method is called by RPC
-     */
-    public void notifyCheckpointComplete(long checkpointId) throws Exception {
-        internalSource.notifyCheckpointComplete(checkpointId);
-    }
-
-    @Override
-    public void close() throws IOException {
-        super.close();
-        // TODO: close rpc
-    }
-}