You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/04/23 12:38:59 UTC

[GitHub] [incubator-seatunnel] 1996fanrui commented on a diff in pull request #1287: [Feature] [flink] Remove flink DataSet api (#1246)

1996fanrui commented on code in PR #1287:
URL: https://github.com/apache/incubator-seatunnel/pull/1287#discussion_r856884966


##########
seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSink.java:
##########
@@ -21,17 +21,13 @@
 import org.apache.seatunnel.flink.FlinkEnvironment;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.types.Row;
 
-import javax.annotation.Nullable;
-
 /**
  * a FlinkStreamSink plugin will write data to other system using Flink DataStream API.
  */
 public interface FlinkStreamSink extends BaseFlinkSink {
 
-    @Nullable
-    DataStreamSink<Row> outputStream(FlinkEnvironment env, DataStream<Row> dataStream);
+    void outputStream(FlinkEnvironment env, DataStream<Row> dataStream);

Review Comment:
   Could we remove the FlinkStreamSink and  move the method to BaseFlinkSink? BaseFlinkSource is similar.
   
   Because we don't distinguish the batch or stream, we can just keep the BaseFlinkSink. All flink sinks implement the BaseFlinkSink. How do you think?



##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/pom.xml:
##########
@@ -36,10 +36,25 @@
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-java</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>

Review Comment:
   Could the `${flink.version}` be removed?



##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidOutputFormat.java:
##########
@@ -56,7 +56,7 @@
 import java.util.Arrays;
 import java.util.Collections;
 
-public class DruidOutputFormat extends RichOutputFormat<Row> {
+public class DruidOutputFormat<T> extends RichOutputFormat<T> {

Review Comment:
   From the writeRecord, we can see it just support Row, why add the <T> for DruidOutputFormat? Could you revert this change? If no, please add some details, thanks. 
   
   I think this change will delay the Error until writeRecord. And it just log.ERROR, flink job will run normally, right?



##########
seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-split/src/main/java/org/apache/seatunnel/flink/transform/Split.java:
##########
@@ -20,21 +20,19 @@
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.flink.FlinkEnvironment;
-import org.apache.seatunnel.flink.batch.FlinkBatchTransform;
 import org.apache.seatunnel.flink.stream.FlinkStreamTransform;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.api.scala.typeutils.Types;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.types.Row;
 
 import java.util.List;
 
-public class Split implements FlinkStreamTransform, FlinkBatchTransform {
+public class Split implements FlinkStreamTransform {
 
     private Config config;

Review Comment:
   About the Transform refactor:
   
   - Could you remove the FlinkStreamTransform? 
   - rename the method name from processStream to process
   - move the process to BaseFlinkTransform
   - Change the BaseFlinkTransform from interface to abstract class
   - move the `private Config config` and setter/getter to BaseFlinkTransform
   
   After finish these changes, it can greatly reduce the duplication of code and remove the useless class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org