You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/02/11 05:45:25 UTC

[incubator-seatunnel] branch dev updated: [Feature] [api] Translate spark apis from scala to java (#1188)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 296c2a5  [Feature] [api] Translate spark apis from scala to java (#1188)
296c2a5 is described below

commit 296c2a515c37123d0156bfe2490ab070531b8c54
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Fri Feb 11 13:45:21 2022 +0800

    [Feature] [api] Translate spark apis from scala to java (#1188)
---
 .../org/apache/seatunnel/spark/BaseSparkSink.java} |  30 +++--
 .../apache/seatunnel/spark/BaseSparkSource.java}   |  26 ++--
 .../seatunnel/spark/BaseSparkTransform.java}       |  30 +++--
 .../apache/seatunnel/spark/SparkEnvironment.java   |  84 +++++++++++++
 .../seatunnel/spark/batch/SparkBatchExecution.java | 132 +++++++++++++++++++++
 .../seatunnel/spark/batch/SparkBatchSink.java}     |  11 +-
 .../seatunnel/spark/batch/SparkBatchSource.java}   |  13 +-
 .../StructuredStreamingExecution.java              |  64 ++++++++++
 .../structuredstream/StructuredStreamingSink.java} |  14 ++-
 .../StructuredStreamingSource.java}                |  13 +-
 .../apache/seatunnel/spark/SparkEnvironment.scala  |  79 ------------
 .../spark/batch/SparkBatchExecution.scala          | 120 -------------------
 .../spark/stream/SparkStreamingExecution.scala     |   4 +-
 .../StructuredStreamingExecution.scala             |  40 -------
 14 files changed, 375 insertions(+), 285 deletions(-)

diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/BaseSparkSink.scala b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSink.java
similarity index 54%
rename from seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/BaseSparkSink.scala
rename to seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSink.java
index 9e9981d..589232f 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/BaseSparkSink.scala
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSink.java
@@ -14,23 +14,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.seatunnel.spark
 
-import org.apache.seatunnel.apis.BaseSink
-import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory}
-import org.apache.spark.sql.{Dataset, Row}
+package org.apache.seatunnel.spark;
+
+import org.apache.seatunnel.apis.BaseSink;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 
 /**
  * a base interface indicates a sink plugin running on Spark.
  */
-trait BaseSparkSink[OUT] extends BaseSink[SparkEnvironment] {
+public abstract class BaseSparkSink<OUT> implements BaseSink<SparkEnvironment> {
 
-  protected var config: Config = ConfigFactory.empty()
+    protected Config config = ConfigFactory.empty();
 
-  override def setConfig(config: Config): Unit = this.config = config
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
 
-  override def getConfig: Config = config
+    @Override
+    public Config getConfig() {
+        return config;
+    }
 
-  def output(data: Dataset[Row], env: SparkEnvironment): OUT
+    public abstract void prepare(SparkEnvironment prepareEnv);
 
+    public abstract OUT output(Dataset<Row> data, SparkEnvironment env);
 }
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/BaseSparkSource.scala b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSource.java
similarity index 60%
rename from seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/BaseSparkSource.scala
rename to seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSource.java
index ee9eaad..391a791 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/BaseSparkSource.scala
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSource.java
@@ -14,22 +14,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.seatunnel.spark
 
-import org.apache.seatunnel.apis.BaseSource
-import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory}
+package org.apache.seatunnel.spark;
+
+import org.apache.seatunnel.apis.BaseSource;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
 /**
  * a base interface indicates a source plugin running on Spark.
  */
-trait BaseSparkSource[Data] extends BaseSource[SparkEnvironment] {
-
-  protected var config: Config = ConfigFactory.empty()
+public abstract class BaseSparkSource<T> implements BaseSource<SparkEnvironment> {
 
-  override def setConfig(config: Config): Unit = this.config = config
+    protected Config config = ConfigFactory.empty();
 
-  override def getConfig: Config = config
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
 
-  def getData(env: SparkEnvironment): Data
+    @Override
+    public Config getConfig() {
+        return this.config;
+    }
 
+    public abstract T getData(SparkEnvironment env);
 }
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/BaseSparkTransform.scala b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkTransform.java
similarity index 56%
rename from seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/BaseSparkTransform.scala
rename to seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkTransform.java
index 3294786..a3997d4 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/BaseSparkTransform.scala
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkTransform.java
@@ -14,23 +14,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.seatunnel.spark
 
-import org.apache.spark.sql.{Dataset, Row}
-import org.apache.seatunnel.apis.BaseTransform
-import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory}
+package org.apache.seatunnel.spark;
+
+import org.apache.seatunnel.apis.BaseTransform;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 
 /**
  * a base interface indicates a transform plugin running on Spark.
  */
-trait BaseSparkTransform extends BaseTransform[SparkEnvironment] {
-
-  protected var config: Config = ConfigFactory.empty()
+public abstract class BaseSparkTransform implements BaseTransform<SparkEnvironment> {
 
-  override def setConfig(config: Config): Unit = this.config = config
+    protected Config config = ConfigFactory.empty();
 
-  override def getConfig: Config = config
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
 
-  def process(data: Dataset[Row], env: SparkEnvironment): Dataset[Row]
+    @Override
+    public Config getConfig() {
+        return this.config;
+    }
 
+    public abstract Dataset<Row> process(Dataset<Row> data, SparkEnvironment env);
 }
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
new file mode 100644
index 0000000..9eb0987
--- /dev/null
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark;
+
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.env.RuntimeEnv;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.streaming.Seconds;
+import org.apache.spark.streaming.StreamingContext;
+
+public class SparkEnvironment implements RuntimeEnv {
+
+    private static final long DEFAULT_SPARK_STREAMING_DURATION = 5;
+
+    private SparkSession sparkSession;
+
+    private StreamingContext streamingContext;
+
+    private Config config = ConfigFactory.empty();
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return this.config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckResult.success();
+    }
+
+    @Override
+    public void prepare(Boolean prepareEnv) {
+        SparkConf sparkConf = createSparkConf();
+        this.sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
+        createStreamingContext();
+    }
+
+    public SparkSession getSparkSession() {
+        return this.sparkSession;
+    }
+
+    public StreamingContext getStreamingContext() {
+        return this.streamingContext;
+    }
+
+    private SparkConf createSparkConf() {
+        SparkConf sparkConf = new SparkConf();
+        this.config.entrySet().forEach(entry -> sparkConf.set(entry.getKey(), String.valueOf(entry.getValue().unwrapped())));
+        return sparkConf;
+    }
+
+    private void createStreamingContext() {
+        SparkConf conf = this.sparkSession.sparkContext().getConf();
+        long duration = conf.getLong("spark.stream.batchDuration", DEFAULT_SPARK_STREAMING_DURATION);
+        if (this.streamingContext == null) {
+            this.streamingContext = new StreamingContext(sparkSession.sparkContext(), Seconds.apply(duration));
+        }
+    }
+}
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
new file mode 100644
index 0000000..cc62195
--- /dev/null
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.batch;
+
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.ConfigRuntimeException;
+import org.apache.seatunnel.env.Execution;
+import org.apache.seatunnel.spark.BaseSparkSink;
+import org.apache.seatunnel.spark.BaseSparkSource;
+import org.apache.seatunnel.spark.BaseSparkTransform;
+import org.apache.seatunnel.spark.SparkEnvironment;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import java.util.List;
+
+public class SparkBatchExecution implements Execution<SparkBatchSource, BaseSparkTransform, SparkBatchSink> {
+
+    public static final String SOURCE_TABLE_NAME = "source_table_name";
+    public static final String RESULT_TABLE_NAME = "result_table_name";
+
+    private final SparkEnvironment environment;
+
+    private Config config = ConfigFactory.empty();
+
+    public SparkBatchExecution(SparkEnvironment environment) {
+        this.environment = environment;
+    }
+
+    public static void registerTempView(String tableName, Dataset<Row> ds) {
+        ds.createOrReplaceGlobalTempView(tableName);
+    }
+
+    public static void registerInputTempView(BaseSparkSource<Dataset<Row>> source, SparkEnvironment environment) {
+        Config config = source.getConfig();
+        if (config.hasPath(SparkBatchExecution.RESULT_TABLE_NAME)) {
+            String tableName = config.getString(SparkBatchExecution.RESULT_TABLE_NAME);
+            registerTempView(tableName, source.getData(environment));
+        } else {
+            throw new ConfigRuntimeException(
+                    "Plugin[" + source.getClass().getName() + "] must be registered as dataset/table, please set \"result_table_name\" config");
+        }
+    }
+
+    public static Dataset<Row> transformProcess(SparkEnvironment environment, BaseSparkTransform transform, Dataset<Row> ds) {
+        Dataset<Row> fromDs;
+        Config config = transform.getConfig();
+        if (config.hasPath(SparkBatchExecution.SOURCE_TABLE_NAME)) {
+            String sourceTableName = config.getString(SparkBatchExecution.SOURCE_TABLE_NAME);
+            fromDs = environment.getSparkSession().read().table(sourceTableName);
+        } else {
+            fromDs = ds;
+        }
+        return transform.process(fromDs, environment);
+    }
+
+    public static void registerTransformTempView(BaseSparkTransform transform, Dataset<Row> ds) {
+        Config config = transform.getConfig();
+        if (config.hasPath(SparkBatchExecution.RESULT_TABLE_NAME)) {
+            String resultTableName = config.getString(SparkBatchExecution.RESULT_TABLE_NAME);
+            registerTempView(resultTableName, ds);
+        }
+    }
+
+    public static void sinkProcess(SparkEnvironment environment, BaseSparkSink<?> sink, Dataset<Row> ds) {
+        Dataset<Row> fromDs;
+        Config config = sink.getConfig();
+        if (config.hasPath(SparkBatchExecution.SOURCE_TABLE_NAME)) {
+            String sourceTableName = config.getString(SparkBatchExecution.RESULT_TABLE_NAME);
+            fromDs = environment.getSparkSession().read().table(sourceTableName);
+        } else {
+            fromDs = ds;
+        }
+        sink.output(fromDs, environment);
+    }
+
+    @Override
+    public void start(List<SparkBatchSource> sources, List<BaseSparkTransform> transforms, List<SparkBatchSink> sinks) {
+        sources.forEach(source -> SparkBatchExecution.registerInputTempView(source, environment));
+        if (!sources.isEmpty()) {
+            Dataset<Row> ds = sources.get(0).getData(environment);
+            for (BaseSparkTransform transform : transforms) {
+                if (ds.head().size() > 0) {
+                    ds = SparkBatchExecution.transformProcess(environment, transform, ds);
+                    SparkBatchExecution.registerTransformTempView(transform, ds);
+                }
+            }
+            for (SparkBatchSink sink : sinks) {
+                SparkBatchExecution.sinkProcess(environment, sink, ds);
+            }
+        }
+    }
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return this.config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckResult.success();
+    }
+
+    @Override
+    public void prepare(Void prepareEnv) {
+
+    }
+}
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchSink.scala b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchSink.java
similarity index 78%
rename from seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchSink.scala
rename to seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchSink.java
index b86755f..38a1a7c 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchSink.scala
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchSink.java
@@ -14,12 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.seatunnel.spark.batch
 
-import org.apache.seatunnel.spark.BaseSparkSink
+package org.apache.seatunnel.spark.batch;
+
+import org.apache.seatunnel.spark.BaseSparkSink;
+
+import scala.Unit;
 
 /**
  * a SparkBatchSink plugin will write data to other system
  *  using Spark DataSet API.
  */
-trait SparkBatchSink extends BaseSparkSink[Unit] {}
+@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
+public abstract class SparkBatchSink extends BaseSparkSink<Unit> {
+}
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchSource.scala b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchSource.java
similarity index 73%
rename from seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchSource.scala
rename to seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchSource.java
index 95cac53..515c19f 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchSource.scala
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchSource.java
@@ -14,13 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.seatunnel.spark.batch
 
-import org.apache.seatunnel.spark.BaseSparkSource
-import org.apache.spark.sql.{Dataset, Row}
+package org.apache.seatunnel.spark.batch;
+
+import org.apache.seatunnel.spark.BaseSparkSource;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 
 /**
  * a SparkBatchSource plugin will read data from other system
  *  using Spark DataSet API.
  */
-trait SparkBatchSource extends BaseSparkSource[Dataset[Row]] {}
+@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
+public abstract class SparkBatchSource extends BaseSparkSource<Dataset<Row>> {
+}
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java
new file mode 100644
index 0000000..7e08175
--- /dev/null
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.structuredstream;
+
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.env.Execution;
+import org.apache.seatunnel.spark.BaseSparkTransform;
+import org.apache.seatunnel.spark.SparkEnvironment;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import java.util.List;
+
+public class StructuredStreamingExecution implements Execution<StructuredStreamingSource, BaseSparkTransform, StructuredStreamingSink> {
+
+    private final SparkEnvironment sparkEnvironment;
+
+    private Config config = ConfigFactory.empty();
+
+    public StructuredStreamingExecution(SparkEnvironment sparkEnvironment) {
+        this.sparkEnvironment = sparkEnvironment;
+    }
+
+    @Override
+    public void start(List<StructuredStreamingSource> sources, List<BaseSparkTransform> transforms, List<StructuredStreamingSink> sinks) {
+
+    }
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return this.config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckResult.success();
+    }
+
+    @Override
+    public void prepare(Void prepareEnv) {
+
+    }
+}
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSink.scala b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSink.java
similarity index 71%
rename from seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSink.scala
rename to seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSink.java
index 3b5a88a..ece636b 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSink.scala
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSink.java
@@ -14,14 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.seatunnel.spark.structuredstream
 
-import org.apache.seatunnel.spark.BaseSparkSink
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.streaming.DataStreamWriter
+package org.apache.seatunnel.spark.structuredstream;
+
+import org.apache.seatunnel.spark.BaseSparkSink;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.streaming.DataStreamWriter;
 
 /**
  * a StructuredStreamingSink plugin will write data to other system
  * using Spark Structured streaming API.
  */
-trait StructuredStreamingSink extends BaseSparkSink[DataStreamWriter[Row]] {}
+@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
+public abstract class StructuredStreamingSink extends BaseSparkSink<DataStreamWriter<Row>> {
+}
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSource.scala b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSource.java
similarity index 73%
rename from seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSource.scala
rename to seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSource.java
index 02697ed..d72b3db 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSource.scala
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSource.java
@@ -14,13 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.seatunnel.spark.structuredstream
 
-import org.apache.seatunnel.spark.BaseSparkSource
-import org.apache.spark.sql.{Dataset, Row}
+package org.apache.seatunnel.spark.structuredstream;
+
+import org.apache.seatunnel.spark.BaseSparkSource;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 
 /**
  * a StructuredStreamingSource plugin will read data from other system
  * using Spark Structured streaming API.
  */
-trait StructuredStreamingSource extends BaseSparkSource[Dataset[Row]] {}
+@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
+public abstract class StructuredStreamingSource extends BaseSparkSource<Dataset<Row>> {
+}
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/SparkEnvironment.scala b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/SparkEnvironment.scala
deleted file mode 100644
index 761d892..0000000
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/SparkEnvironment.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.seatunnel.spark
-
-import java.lang
-
-import scala.collection.JavaConversions._
-
-import org.apache.seatunnel.common.config.CheckResult
-import org.apache.seatunnel.env.RuntimeEnv
-import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory}
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-
-class SparkEnvironment extends RuntimeEnv {
-
-  private var sparkSession: SparkSession = _
-
-  private var streamingContext: StreamingContext = _
-
-  var config: Config = ConfigFactory.empty()
-
-  override def setConfig(config: Config): Unit = this.config = config
-
-  override def getConfig: Config = config
-
-  override def checkConfig(): CheckResult = CheckResult.success()
-
-  override def prepare(prepareEnv: lang.Boolean): Unit = {
-    val sparkConf = createSparkConf()
-    sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
-    createStreamingContext
-  }
-
-  private def createSparkConf(): SparkConf = {
-    val sparkConf = new SparkConf()
-    config
-      .entrySet()
-      .foreach(entry => {
-        sparkConf.set(entry.getKey, String.valueOf(entry.getValue.unwrapped()))
-      })
-
-    sparkConf
-  }
-
-  private def createStreamingContext: StreamingContext = {
-    val conf = sparkSession.sparkContext.getConf
-    val duration = conf.getLong("spark.stream.batchDuration", 5)
-    if (streamingContext == null) {
-      streamingContext =
-        new StreamingContext(sparkSession.sparkContext, Seconds(duration))
-    }
-    streamingContext
-  }
-
-  def getStreamingContext: StreamingContext = {
-    streamingContext
-  }
-
-  def getSparkSession: SparkSession = {
-    sparkSession
-  }
-
-}
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchExecution.scala b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchExecution.scala
deleted file mode 100644
index 8d0640f..0000000
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchExecution.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.seatunnel.spark.batch
-
-import org.apache.seatunnel.common.config.{CheckResult, ConfigRuntimeException}
-import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory}
-import org.apache.seatunnel.env.Execution
-import org.apache.seatunnel.spark.{BaseSparkSink, BaseSparkSource, BaseSparkTransform, SparkEnvironment}
-import org.apache.spark.sql.{Dataset, Row}
-import java.util.{List => JList}
-
-import scala.collection.JavaConversions._
-
-class SparkBatchExecution(environment: SparkEnvironment)
-  extends Execution[SparkBatchSource, BaseSparkTransform, SparkBatchSink] {
-
-  private var config = ConfigFactory.empty()
-
-  override def setConfig(config: Config): Unit = this.config = config
-
-  override def getConfig: Config = config
-
-  override def checkConfig(): CheckResult = CheckResult.success()
-
-  override def prepare(prepareEnv: Void): Unit = {}
-
-  override def start(sources: JList[SparkBatchSource], transforms: JList[BaseSparkTransform], sinks: JList[SparkBatchSink]): Unit = {
-
-    sources.foreach(s => {
-      SparkBatchExecution.registerInputTempView(
-        s.asInstanceOf[BaseSparkSource[Dataset[Row]]],
-        environment)
-    })
-    if (!sources.isEmpty) {
-      var ds = sources.get(0).getData(environment)
-      for (tf <- transforms) {
-
-        if (ds.take(1).length > 0) {
-          ds = SparkBatchExecution.transformProcess(environment, tf, ds)
-          SparkBatchExecution.registerTransformTempView(tf, ds)
-        }
-      }
-
-      // if (ds.take(1).length > 0) {
-      sinks.foreach(sink => {
-        SparkBatchExecution.sinkProcess(environment, sink, ds)
-      })
-      // }
-    }
-  }
-
-}
-
-object SparkBatchExecution {
-
-  private[seatunnel] val sourceTableName = "source_table_name"
-  private[seatunnel] val resultTableName = "result_table_name"
-
-  private[seatunnel] def registerTempView(tableName: String, ds: Dataset[Row]): Unit = {
-    ds.createOrReplaceTempView(tableName)
-  }
-
-  private[seatunnel] def registerInputTempView(source: BaseSparkSource[Dataset[Row]], environment: SparkEnvironment): Unit = {
-    val conf = source.getConfig
-    conf.hasPath(SparkBatchExecution.resultTableName) match {
-      case true =>
-        val tableName = conf.getString(SparkBatchExecution.resultTableName)
-        registerTempView(tableName, source.getData(environment))
-      case false =>
-        throw new ConfigRuntimeException(
-          "Plugin[" + source.getClass.getName + "] must be registered as dataset/table, please set \"result_table_name\" config")
-    }
-  }
-
-  private[seatunnel] def transformProcess(environment: SparkEnvironment, transform: BaseSparkTransform, ds: Dataset[Row]): Dataset[Row] = {
-    val config = transform.getConfig()
-    val fromDs = config.hasPath(SparkBatchExecution.sourceTableName) match {
-      case true =>
-        val sourceTableName = config.getString(SparkBatchExecution.sourceTableName)
-        environment.getSparkSession.read.table(sourceTableName)
-      case false => ds
-    }
-
-    transform.process(fromDs, environment)
-  }
-
-  private[seatunnel] def registerTransformTempView(plugin: BaseSparkTransform, ds: Dataset[Row]): Unit = {
-    val config = plugin.getConfig()
-    if (config.hasPath(SparkBatchExecution.resultTableName)) {
-      val tableName = config.getString(SparkBatchExecution.resultTableName)
-      registerTempView(tableName, ds)
-    }
-  }
-
-  private[seatunnel] def sinkProcess(environment: SparkEnvironment, sink: BaseSparkSink[_], ds: Dataset[Row]): Unit = {
-    val config = sink.getConfig()
-    val fromDs = config.hasPath(SparkBatchExecution.sourceTableName) match {
-      case true =>
-        val sourceTableName = config.getString(SparkBatchExecution.sourceTableName)
-        environment.getSparkSession.read.table(sourceTableName)
-      case false => ds
-    }
-
-    sink.output(fromDs, environment)
-  }
-}
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
index e5f702b..16dfe69 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
@@ -44,9 +44,9 @@ class SparkStreamingExecution(sparkEnvironment: SparkEnvironment)
       sparkEnvironment,
       dataset => {
         val conf = source.getConfig
-        if (conf.hasPath(SparkBatchExecution.resultTableName)) {
+        if (conf.hasPath(SparkBatchExecution.RESULT_TABLE_NAME)) {
           SparkBatchExecution.registerTempView(
-            conf.getString(SparkBatchExecution.resultTableName),
+            conf.getString(SparkBatchExecution.RESULT_TABLE_NAME),
             dataset)
         }
         var ds = dataset
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.scala b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.scala
deleted file mode 100644
index 8d92637..0000000
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.seatunnel.spark.structuredstream
-
-import org.apache.seatunnel.common.config.CheckResult
-import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory}
-import org.apache.seatunnel.env.Execution
-import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
-import java.util.{List => JList}
-
-
-class StructuredStreamingExecution(environment: SparkEnvironment)
-  extends Execution[StructuredStreamingSource, BaseSparkTransform, StructuredStreamingSink] {
-
-  private var config = ConfigFactory.empty()
-
-  override def setConfig(config: Config): Unit = this.config = config
-
-  override def getConfig: Config = config
-
-  override def checkConfig(): CheckResult = CheckResult.success()
-
-  override def prepare(void: Void): Unit = {}
-
-  override def start(sources: JList[StructuredStreamingSource], transforms: JList[BaseSparkTransform], sinks: JList[StructuredStreamingSink]): Unit = {}
-}