You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/09/06 05:29:08 UTC
[incubator-seatunnel] branch dev updated: [translation][spark] delete ContinuousReadSupport code (#2657)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4f3b8c5a6 [translation][spark] delete ContinuousReadSupport code (#2657)
4f3b8c5a6 is described below
commit 4f3b8c5a6fde6524d38b58f0f2bebfa3afee0994
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Tue Sep 6 13:29:03 2022 +0800
[translation][spark] delete ContinuousReadSupport code (#2657)
ContinuousReadSupport will not be supported, so delete it
---
.../spark/source/SeaTunnelSourceSupport.java | 12 +--
.../source/continnous/ContinuousPartition.java | 54 ----------
.../source/continnous/ContinuousSourceReader.java | 117 ---------------------
.../source/continnous/CoordinationState.java | 49 ---------
.../ParallelContinuousPartitionReader.java | 78 --------------
5 files changed, 1 insertion(+), 309 deletions(-)
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
index baf7abcb7..22a4859aa 100644
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
+++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
@@ -22,7 +22,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.translation.spark.source.batch.BatchSourceReader;
-import org.apache.seatunnel.translation.spark.source.continnous.ContinuousSourceReader;
import org.apache.seatunnel.translation.spark.source.micro.MicroBatchSourceReader;
import org.apache.commons.lang3.StringUtils;
@@ -30,13 +29,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.sources.DataSourceRegister;
-import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
@@ -44,7 +41,7 @@ import org.slf4j.LoggerFactory;
import java.util.Optional;
-public class SeaTunnelSourceSupport implements DataSourceV2, ReadSupport, MicroBatchReadSupport, ContinuousReadSupport, DataSourceRegister {
+public class SeaTunnelSourceSupport implements DataSourceV2, ReadSupport, MicroBatchReadSupport, DataSourceRegister {
private static final Logger LOG = LoggerFactory.getLogger(SeaTunnelSourceSupport.class);
public static final String SEA_TUNNEL_SOURCE_NAME = "SeaTunnelSource";
public static final Integer CHECKPOINT_INTERVAL_DEFAULT = 10000;
@@ -79,13 +76,6 @@ public class SeaTunnelSourceSupport implements DataSourceV2, ReadSupport, MicroB
return new MicroBatchSourceReader(seaTunnelSource, parallelism, checkpointId, checkpointInterval, checkpointPath, hdfsRoot, hdfsUser);
}
- @Override
- public ContinuousReader createContinuousReader(Optional<StructType> rowTypeOptional, String checkpointLocation, DataSourceOptions options) {
- SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource = getSeaTunnelSource(options);
- Integer parallelism = options.getInt(Constants.SOURCE_PARALLELISM, 1);
- return new ContinuousSourceReader(seaTunnelSource, parallelism);
- }
-
private SeaTunnelSource<SeaTunnelRow, ?, ?> getSeaTunnelSource(DataSourceOptions options) {
return SerializationUtils.stringToObject(options.get(Constants.SOURCE_SERIALIZATION)
.orElseThrow(() -> new UnsupportedOperationException("Serialization information for the SeaTunnelSource is required")));
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartition.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartition.java
deleted file mode 100644
index f7f091e45..000000000
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartition.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.spark.source.continnous;
-
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.translation.spark.common.source.continnous.ParallelContinuousPartitionReader;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
-
-import java.util.List;
-import java.util.Map;
-
-public class ContinuousPartition implements InputPartition<InternalRow> {
- protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
- protected final Integer parallelism;
- protected final Integer subtaskId;
- protected final Integer checkpointId;
- protected final Map<Integer, List<byte[]>> restoredState;
-
- public ContinuousPartition(SeaTunnelSource<SeaTunnelRow, ?, ?> source,
- Integer parallelism,
- Integer subtaskId,
- Integer checkpointId,
- Map<Integer, List<byte[]>> restoredState) {
- this.source = source;
- this.parallelism = parallelism;
- this.subtaskId = subtaskId;
- this.checkpointId = checkpointId;
- this.restoredState = restoredState;
- }
-
- @Override
- public InputPartitionReader<InternalRow> createPartitionReader() {
- return new ParallelContinuousPartitionReader(source, parallelism, subtaskId, checkpointId, restoredState);
- }
-}
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousSourceReader.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousSourceReader.java
deleted file mode 100644
index 1b46a8e74..000000000
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousSourceReader.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.spark.source.continnous;
-
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.common.utils.SerializationUtils;
-import org.apache.seatunnel.translation.spark.common.ReaderState;
-import org.apache.seatunnel.translation.spark.common.source.continnous.CoordinationState;
-import org.apache.seatunnel.translation.spark.common.utils.TypeConverterUtils;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
-import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
-import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
-import org.apache.spark.sql.types.StructType;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-public class ContinuousSourceReader implements ContinuousReader {
-
- private final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
- private final Integer parallelism;
- private final Map<Integer, ReaderState> readerStateMap = new HashMap<>();
- private CoordinationState coordinationState;
- private int checkpointId = 1;
-
- public ContinuousSourceReader(SeaTunnelSource<SeaTunnelRow, ?, ?> source, Integer parallelism) {
- this.source = source;
- this.parallelism = parallelism;
- throw new UnsupportedOperationException("Continuous source is not currently supported.");
- }
-
- @Override
- public Offset mergeOffsets(PartitionOffset[] subtaskStates) {
- // aggregate state
- List<ReaderState> readerStateList = new ArrayList<>(subtaskStates.length);
- for (PartitionOffset subtaskState : subtaskStates) {
- if (subtaskState instanceof ReaderState) {
- ReaderState readerState = (ReaderState) subtaskState;
- readerStateMap.put(readerState.getSubtaskId(), readerState);
- readerStateList.add(readerState);
- } else {
- throw new UnsupportedOperationException(String.format("Unsupported state type: %s", subtaskState.getClass()));
- }
- }
- return new CoordinationState(readerStateList, readerStateList.get(0).getCheckpointId());
- }
-
- @Override
- public Offset deserializeOffset(String aggregatedState) {
- return SerializationUtils.stringToObject(aggregatedState);
- }
-
- @Override
- public void setStartOffset(Optional<Offset> start) {
- // initialize or restore state
- start.ifPresent(state -> {
- CoordinationState restoreState = (CoordinationState) state;
- checkpointId = restoreState.getCheckpointId();
- for (ReaderState readerState : restoreState.getReaderStateList()) {
- readerStateMap.put(readerState.getSubtaskId(), readerState);
- }
- });
- coordinationState = (CoordinationState) start.orElse(new CoordinationState(new ArrayList<>(), 1));
- }
-
- @Override
- public Offset getStartOffset() {
- return coordinationState;
- }
-
- @Override
- public void commit(Offset end) {
- // TODO: rpc commit {@link ContinuousPartitionReader#notifyCheckpointComplete}
- }
-
- @Override
- public void stop() {
- // TODO: stop rpc
- }
-
- @Override
- public StructType readSchema() {
- return (StructType) TypeConverterUtils.convert(source.getProducedType());
- }
-
- @Override
- public List<InputPartition<InternalRow>> planInputPartitions() {
- List<InputPartition<InternalRow>> virtualPartitions = new ArrayList<>(parallelism);
- for (int subtaskId = 0; subtaskId < parallelism; subtaskId++) {
- ReaderState readerState = readerStateMap.get(subtaskId);
- virtualPartitions.add(new ContinuousPartition(source, parallelism, subtaskId, checkpointId, readerState == null ? null : readerState.getBytes()));
- }
- return virtualPartitions;
- }
-}
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/continnous/CoordinationState.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/continnous/CoordinationState.java
deleted file mode 100644
index a78fccdea..000000000
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/continnous/CoordinationState.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.spark.common.source.continnous;
-
-import org.apache.seatunnel.common.utils.SerializationUtils;
-import org.apache.seatunnel.translation.spark.common.ReaderState;
-
-import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
-
-import java.io.Serializable;
-import java.util.List;
-
-public class CoordinationState extends Offset implements Serializable {
- private List<ReaderState> readerStateList;
- private Integer checkpointId;
-
- public CoordinationState(List<ReaderState> readerStateList, Integer checkpointId) {
- this.readerStateList = readerStateList;
- this.checkpointId = checkpointId;
- }
-
- @Override
- public String json() {
- return SerializationUtils.objectToString(this);
- }
-
- public List<ReaderState> getReaderStateList() {
- return readerStateList;
- }
-
- public Integer getCheckpointId() {
- return checkpointId;
- }
-}
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/continnous/ParallelContinuousPartitionReader.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/continnous/ParallelContinuousPartitionReader.java
deleted file mode 100644
index e73e5fd4b..000000000
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/continnous/ParallelContinuousPartitionReader.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.spark.common.source.continnous;
-
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.translation.source.ParallelSource;
-import org.apache.seatunnel.translation.spark.common.ReaderState;
-import org.apache.seatunnel.translation.spark.common.source.batch.ParallelBatchPartitionReader;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader;
-import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-public class ParallelContinuousPartitionReader extends ParallelBatchPartitionReader implements ContinuousInputPartitionReader<InternalRow> {
- protected volatile Integer checkpointId;
- protected final Map<Integer, List<byte[]>> restoredState;
-
- public ParallelContinuousPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> source, Integer parallelism, Integer subtaskId, Integer checkpointId, Map<Integer, List<byte[]>> restoredState) {
- super(source, parallelism, subtaskId);
- this.checkpointId = checkpointId;
- this.restoredState = restoredState;
- }
-
- @Override
- protected ParallelSource<SeaTunnelRow, ?, ?> createInternalSource() {
- return new InternalParallelSource<>(source,
- restoredState,
- parallelism,
- subtaskId);
- }
-
- @Override
- public PartitionOffset getOffset() {
- Map<Integer, List<byte[]>> bytes;
- try {
- bytes = internalSource.snapshotState(checkpointId);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- ReaderState readerState = new ReaderState(bytes, subtaskId, checkpointId++);
- return readerState;
- }
-
- // TODO: RPC call
-
- /**
- * The method is called by RPC
- */
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
- internalSource.notifyCheckpointComplete(checkpointId);
- }
-
- @Override
- public void close() throws IOException {
- super.close();
- // TODO: close rpc
- }
-}