You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/02/11 05:45:25 UTC
[incubator-seatunnel] branch dev updated: [Feature] [api] Translate spark apis from scala to java (#1188)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 296c2a5 [Feature] [api] Translate spark apis from scala to java (#1188)
296c2a5 is described below
commit 296c2a515c37123d0156bfe2490ab070531b8c54
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Fri Feb 11 13:45:21 2022 +0800
[Feature] [api] Translate spark apis from scala to java (#1188)
---
.../org/apache/seatunnel/spark/BaseSparkSink.java} | 30 +++--
.../apache/seatunnel/spark/BaseSparkSource.java} | 26 ++--
.../seatunnel/spark/BaseSparkTransform.java} | 30 +++--
.../apache/seatunnel/spark/SparkEnvironment.java | 84 +++++++++++++
.../seatunnel/spark/batch/SparkBatchExecution.java | 132 +++++++++++++++++++++
.../seatunnel/spark/batch/SparkBatchSink.java} | 11 +-
.../seatunnel/spark/batch/SparkBatchSource.java} | 13 +-
.../StructuredStreamingExecution.java | 64 ++++++++++
.../structuredstream/StructuredStreamingSink.java} | 14 ++-
.../StructuredStreamingSource.java} | 13 +-
.../apache/seatunnel/spark/SparkEnvironment.scala | 79 ------------
.../spark/batch/SparkBatchExecution.scala | 120 -------------------
.../spark/stream/SparkStreamingExecution.scala | 4 +-
.../StructuredStreamingExecution.scala | 40 -------
14 files changed, 375 insertions(+), 285 deletions(-)
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/BaseSparkSink.scala b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSink.java
similarity index 54%
rename from seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/BaseSparkSink.scala
rename to seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSink.java
index 9e9981d..589232f 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/BaseSparkSink.scala
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSink.java
@@ -14,23 +14,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seatunnel.spark
-import org.apache.seatunnel.apis.BaseSink
-import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory}
-import org.apache.spark.sql.{Dataset, Row}
+package org.apache.seatunnel.spark;
+
+import org.apache.seatunnel.apis.BaseSink;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
/**
* a base interface indicates a sink plugin running on Spark.
*/
-trait BaseSparkSink[OUT] extends BaseSink[SparkEnvironment] {
+public abstract class BaseSparkSink<OUT> implements BaseSink<SparkEnvironment> {
- protected var config: Config = ConfigFactory.empty()
+ protected Config config = ConfigFactory.empty();
- override def setConfig(config: Config): Unit = this.config = config
+ @Override
+ public void setConfig(Config config) {
+ this.config = config;
+ }
- override def getConfig: Config = config
+ @Override
+ public Config getConfig() {
+ return config;
+ }
- def output(data: Dataset[Row], env: SparkEnvironment): OUT
+ public abstract void prepare(SparkEnvironment prepareEnv);
+ public abstract OUT output(Dataset<Row> data, SparkEnvironment env);
}
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/BaseSparkSource.scala b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSource.java
similarity index 60%
rename from seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/BaseSparkSource.scala
rename to seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSource.java
index ee9eaad..391a791 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/BaseSparkSource.scala
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSource.java
@@ -14,22 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seatunnel.spark
-import org.apache.seatunnel.apis.BaseSource
-import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory}
+package org.apache.seatunnel.spark;
+
+import org.apache.seatunnel.apis.BaseSource;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
/**
* a base interface indicates a source plugin running on Spark.
*/
-trait BaseSparkSource[Data] extends BaseSource[SparkEnvironment] {
-
- protected var config: Config = ConfigFactory.empty()
+public abstract class BaseSparkSource<T> implements BaseSource<SparkEnvironment> {
- override def setConfig(config: Config): Unit = this.config = config
+ protected Config config = ConfigFactory.empty();
- override def getConfig: Config = config
+ @Override
+ public void setConfig(Config config) {
+ this.config = config;
+ }
- def getData(env: SparkEnvironment): Data
+ @Override
+ public Config getConfig() {
+ return this.config;
+ }
+ public abstract T getData(SparkEnvironment env);
}
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/BaseSparkTransform.scala b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkTransform.java
similarity index 56%
rename from seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/BaseSparkTransform.scala
rename to seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkTransform.java
index 3294786..a3997d4 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/BaseSparkTransform.scala
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkTransform.java
@@ -14,23 +14,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seatunnel.spark
-import org.apache.spark.sql.{Dataset, Row}
-import org.apache.seatunnel.apis.BaseTransform
-import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory}
+package org.apache.seatunnel.spark;
+
+import org.apache.seatunnel.apis.BaseTransform;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
/**
* a base interface indicates a transform plugin running on Spark.
*/
-trait BaseSparkTransform extends BaseTransform[SparkEnvironment] {
-
- protected var config: Config = ConfigFactory.empty()
+public abstract class BaseSparkTransform implements BaseTransform<SparkEnvironment> {
- override def setConfig(config: Config): Unit = this.config = config
+ protected Config config = ConfigFactory.empty();
- override def getConfig: Config = config
+ @Override
+ public void setConfig(Config config) {
+ this.config = config;
+ }
- def process(data: Dataset[Row], env: SparkEnvironment): Dataset[Row]
+ @Override
+ public Config getConfig() {
+ return this.config;
+ }
+ public abstract Dataset<Row> process(Dataset<Row> data, SparkEnvironment env);
}
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
new file mode 100644
index 0000000..9eb0987
--- /dev/null
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark;
+
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.env.RuntimeEnv;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.streaming.Seconds;
+import org.apache.spark.streaming.StreamingContext;
+
+public class SparkEnvironment implements RuntimeEnv {
+
+ private static final long DEFAULT_SPARK_STREAMING_DURATION = 5;
+
+ private SparkSession sparkSession;
+
+ private StreamingContext streamingContext;
+
+ private Config config = ConfigFactory.empty();
+
+ @Override
+ public void setConfig(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public Config getConfig() {
+ return this.config;
+ }
+
+ @Override
+ public CheckResult checkConfig() {
+ return CheckResult.success();
+ }
+
+ @Override
+ public void prepare(Boolean prepareEnv) {
+ SparkConf sparkConf = createSparkConf();
+ this.sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
+ createStreamingContext();
+ }
+
+ public SparkSession getSparkSession() {
+ return this.sparkSession;
+ }
+
+ public StreamingContext getStreamingContext() {
+ return this.streamingContext;
+ }
+
+ private SparkConf createSparkConf() {
+ SparkConf sparkConf = new SparkConf();
+ this.config.entrySet().forEach(entry -> sparkConf.set(entry.getKey(), String.valueOf(entry.getValue().unwrapped())));
+ return sparkConf;
+ }
+
+ private void createStreamingContext() {
+ SparkConf conf = this.sparkSession.sparkContext().getConf();
+ long duration = conf.getLong("spark.stream.batchDuration", DEFAULT_SPARK_STREAMING_DURATION);
+ if (this.streamingContext == null) {
+ this.streamingContext = new StreamingContext(sparkSession.sparkContext(), Seconds.apply(duration));
+ }
+ }
+}
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
new file mode 100644
index 0000000..cc62195
--- /dev/null
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.batch;
+
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.ConfigRuntimeException;
+import org.apache.seatunnel.env.Execution;
+import org.apache.seatunnel.spark.BaseSparkSink;
+import org.apache.seatunnel.spark.BaseSparkSource;
+import org.apache.seatunnel.spark.BaseSparkTransform;
+import org.apache.seatunnel.spark.SparkEnvironment;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import java.util.List;
+
+public class SparkBatchExecution implements Execution<SparkBatchSource, BaseSparkTransform, SparkBatchSink> {
+
+ public static final String SOURCE_TABLE_NAME = "source_table_name";
+ public static final String RESULT_TABLE_NAME = "result_table_name";
+
+ private final SparkEnvironment environment;
+
+ private Config config = ConfigFactory.empty();
+
+ public SparkBatchExecution(SparkEnvironment environment) {
+ this.environment = environment;
+ }
+
+ public static void registerTempView(String tableName, Dataset<Row> ds) {
+ ds.createOrReplaceGlobalTempView(tableName);
+ }
+
+ public static void registerInputTempView(BaseSparkSource<Dataset<Row>> source, SparkEnvironment environment) {
+ Config config = source.getConfig();
+ if (config.hasPath(SparkBatchExecution.RESULT_TABLE_NAME)) {
+ String tableName = config.getString(SparkBatchExecution.RESULT_TABLE_NAME);
+ registerTempView(tableName, source.getData(environment));
+ } else {
+ throw new ConfigRuntimeException(
+ "Plugin[" + source.getClass().getName() + "] must be registered as dataset/table, please set \"result_table_name\" config");
+ }
+ }
+
+ public static Dataset<Row> transformProcess(SparkEnvironment environment, BaseSparkTransform transform, Dataset<Row> ds) {
+ Dataset<Row> fromDs;
+ Config config = transform.getConfig();
+ if (config.hasPath(SparkBatchExecution.SOURCE_TABLE_NAME)) {
+ String sourceTableName = config.getString(SparkBatchExecution.SOURCE_TABLE_NAME);
+ fromDs = environment.getSparkSession().read().table(sourceTableName);
+ } else {
+ fromDs = ds;
+ }
+ return transform.process(fromDs, environment);
+ }
+
+ public static void registerTransformTempView(BaseSparkTransform transform, Dataset<Row> ds) {
+ Config config = transform.getConfig();
+ if (config.hasPath(SparkBatchExecution.RESULT_TABLE_NAME)) {
+ String resultTableName = config.getString(SparkBatchExecution.RESULT_TABLE_NAME);
+ registerTempView(resultTableName, ds);
+ }
+ }
+
+ public static void sinkProcess(SparkEnvironment environment, BaseSparkSink<?> sink, Dataset<Row> ds) {
+ Dataset<Row> fromDs;
+ Config config = sink.getConfig();
+ if (config.hasPath(SparkBatchExecution.SOURCE_TABLE_NAME)) {
+ String sourceTableName = config.getString(SparkBatchExecution.RESULT_TABLE_NAME);
+ fromDs = environment.getSparkSession().read().table(sourceTableName);
+ } else {
+ fromDs = ds;
+ }
+ sink.output(fromDs, environment);
+ }
+
+ @Override
+ public void start(List<SparkBatchSource> sources, List<BaseSparkTransform> transforms, List<SparkBatchSink> sinks) {
+ sources.forEach(source -> SparkBatchExecution.registerInputTempView(source, environment));
+ if (!sources.isEmpty()) {
+ Dataset<Row> ds = sources.get(0).getData(environment);
+ for (BaseSparkTransform transform : transforms) {
+ if (ds.head().size() > 0) {
+ ds = SparkBatchExecution.transformProcess(environment, transform, ds);
+ SparkBatchExecution.registerTransformTempView(transform, ds);
+ }
+ }
+ for (SparkBatchSink sink : sinks) {
+ SparkBatchExecution.sinkProcess(environment, sink, ds);
+ }
+ }
+ }
+
+ @Override
+ public void setConfig(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public Config getConfig() {
+ return this.config;
+ }
+
+ @Override
+ public CheckResult checkConfig() {
+ return CheckResult.success();
+ }
+
+ @Override
+ public void prepare(Void prepareEnv) {
+
+ }
+}
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchSink.scala b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchSink.java
similarity index 78%
rename from seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchSink.scala
rename to seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchSink.java
index b86755f..38a1a7c 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchSink.scala
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchSink.java
@@ -14,12 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seatunnel.spark.batch
-import org.apache.seatunnel.spark.BaseSparkSink
+package org.apache.seatunnel.spark.batch;
+
+import org.apache.seatunnel.spark.BaseSparkSink;
+
+import scala.Unit;
/**
* a SparkBatchSink plugin will write data to other system
* using Spark DataSet API.
*/
-trait SparkBatchSink extends BaseSparkSink[Unit] {}
+@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
+public abstract class SparkBatchSink extends BaseSparkSink<Unit> {
+}
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchSource.scala b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchSource.java
similarity index 73%
rename from seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchSource.scala
rename to seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchSource.java
index 95cac53..515c19f 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchSource.scala
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchSource.java
@@ -14,13 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seatunnel.spark.batch
-import org.apache.seatunnel.spark.BaseSparkSource
-import org.apache.spark.sql.{Dataset, Row}
+package org.apache.seatunnel.spark.batch;
+
+import org.apache.seatunnel.spark.BaseSparkSource;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
/**
* a SparkBatchSource plugin will read data from other system
* using Spark DataSet API.
*/
-trait SparkBatchSource extends BaseSparkSource[Dataset[Row]] {}
+@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
+public abstract class SparkBatchSource extends BaseSparkSource<Dataset<Row>> {
+}
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java
new file mode 100644
index 0000000..7e08175
--- /dev/null
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.structuredstream;
+
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.env.Execution;
+import org.apache.seatunnel.spark.BaseSparkTransform;
+import org.apache.seatunnel.spark.SparkEnvironment;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import java.util.List;
+
+public class StructuredStreamingExecution implements Execution<StructuredStreamingSource, BaseSparkTransform, StructuredStreamingSink> {
+
+ private final SparkEnvironment sparkEnvironment;
+
+ private Config config = ConfigFactory.empty();
+
+ public StructuredStreamingExecution(SparkEnvironment sparkEnvironment) {
+ this.sparkEnvironment = sparkEnvironment;
+ }
+
+ @Override
+ public void start(List<StructuredStreamingSource> sources, List<BaseSparkTransform> transforms, List<StructuredStreamingSink> sinks) {
+
+ }
+
+ @Override
+ public void setConfig(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public Config getConfig() {
+ return this.config;
+ }
+
+ @Override
+ public CheckResult checkConfig() {
+ return CheckResult.success();
+ }
+
+ @Override
+ public void prepare(Void prepareEnv) {
+
+ }
+}
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSink.scala b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSink.java
similarity index 71%
rename from seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSink.scala
rename to seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSink.java
index 3b5a88a..ece636b 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSink.scala
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSink.java
@@ -14,14 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seatunnel.spark.structuredstream
-import org.apache.seatunnel.spark.BaseSparkSink
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.streaming.DataStreamWriter
+package org.apache.seatunnel.spark.structuredstream;
+
+import org.apache.seatunnel.spark.BaseSparkSink;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.streaming.DataStreamWriter;
/**
* a StructuredStreamingSink plugin will write data to other system
* using Spark Structured streaming API.
*/
-trait StructuredStreamingSink extends BaseSparkSink[DataStreamWriter[Row]] {}
+@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
+public abstract class StructuredStreamingSink extends BaseSparkSink<DataStreamWriter<Row>> {
+}
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSource.scala b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSource.java
similarity index 73%
rename from seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSource.scala
rename to seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSource.java
index 02697ed..d72b3db 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSource.scala
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSource.java
@@ -14,13 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seatunnel.spark.structuredstream
-import org.apache.seatunnel.spark.BaseSparkSource
-import org.apache.spark.sql.{Dataset, Row}
+package org.apache.seatunnel.spark.structuredstream;
+
+import org.apache.seatunnel.spark.BaseSparkSource;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
/**
* a StructuredStreamingSource plugin will read data from other system
* using Spark Structured streaming API.
*/
-trait StructuredStreamingSource extends BaseSparkSource[Dataset[Row]] {}
+@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
+public abstract class StructuredStreamingSource extends BaseSparkSource<Dataset<Row>> {
+}
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/SparkEnvironment.scala b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/SparkEnvironment.scala
deleted file mode 100644
index 761d892..0000000
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/SparkEnvironment.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.seatunnel.spark
-
-import java.lang
-
-import scala.collection.JavaConversions._
-
-import org.apache.seatunnel.common.config.CheckResult
-import org.apache.seatunnel.env.RuntimeEnv
-import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory}
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-
-class SparkEnvironment extends RuntimeEnv {
-
- private var sparkSession: SparkSession = _
-
- private var streamingContext: StreamingContext = _
-
- var config: Config = ConfigFactory.empty()
-
- override def setConfig(config: Config): Unit = this.config = config
-
- override def getConfig: Config = config
-
- override def checkConfig(): CheckResult = CheckResult.success()
-
- override def prepare(prepareEnv: lang.Boolean): Unit = {
- val sparkConf = createSparkConf()
- sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
- createStreamingContext
- }
-
- private def createSparkConf(): SparkConf = {
- val sparkConf = new SparkConf()
- config
- .entrySet()
- .foreach(entry => {
- sparkConf.set(entry.getKey, String.valueOf(entry.getValue.unwrapped()))
- })
-
- sparkConf
- }
-
- private def createStreamingContext: StreamingContext = {
- val conf = sparkSession.sparkContext.getConf
- val duration = conf.getLong("spark.stream.batchDuration", 5)
- if (streamingContext == null) {
- streamingContext =
- new StreamingContext(sparkSession.sparkContext, Seconds(duration))
- }
- streamingContext
- }
-
- def getStreamingContext: StreamingContext = {
- streamingContext
- }
-
- def getSparkSession: SparkSession = {
- sparkSession
- }
-
-}
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchExecution.scala b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchExecution.scala
deleted file mode 100644
index 8d0640f..0000000
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchExecution.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.seatunnel.spark.batch
-
-import org.apache.seatunnel.common.config.{CheckResult, ConfigRuntimeException}
-import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory}
-import org.apache.seatunnel.env.Execution
-import org.apache.seatunnel.spark.{BaseSparkSink, BaseSparkSource, BaseSparkTransform, SparkEnvironment}
-import org.apache.spark.sql.{Dataset, Row}
-import java.util.{List => JList}
-
-import scala.collection.JavaConversions._
-
-class SparkBatchExecution(environment: SparkEnvironment)
- extends Execution[SparkBatchSource, BaseSparkTransform, SparkBatchSink] {
-
- private var config = ConfigFactory.empty()
-
- override def setConfig(config: Config): Unit = this.config = config
-
- override def getConfig: Config = config
-
- override def checkConfig(): CheckResult = CheckResult.success()
-
- override def prepare(prepareEnv: Void): Unit = {}
-
- override def start(sources: JList[SparkBatchSource], transforms: JList[BaseSparkTransform], sinks: JList[SparkBatchSink]): Unit = {
-
- sources.foreach(s => {
- SparkBatchExecution.registerInputTempView(
- s.asInstanceOf[BaseSparkSource[Dataset[Row]]],
- environment)
- })
- if (!sources.isEmpty) {
- var ds = sources.get(0).getData(environment)
- for (tf <- transforms) {
-
- if (ds.take(1).length > 0) {
- ds = SparkBatchExecution.transformProcess(environment, tf, ds)
- SparkBatchExecution.registerTransformTempView(tf, ds)
- }
- }
-
- // if (ds.take(1).length > 0) {
- sinks.foreach(sink => {
- SparkBatchExecution.sinkProcess(environment, sink, ds)
- })
- // }
- }
- }
-
-}
-
-object SparkBatchExecution {
-
- private[seatunnel] val sourceTableName = "source_table_name"
- private[seatunnel] val resultTableName = "result_table_name"
-
- private[seatunnel] def registerTempView(tableName: String, ds: Dataset[Row]): Unit = {
- ds.createOrReplaceTempView(tableName)
- }
-
- private[seatunnel] def registerInputTempView(source: BaseSparkSource[Dataset[Row]], environment: SparkEnvironment): Unit = {
- val conf = source.getConfig
- conf.hasPath(SparkBatchExecution.resultTableName) match {
- case true =>
- val tableName = conf.getString(SparkBatchExecution.resultTableName)
- registerTempView(tableName, source.getData(environment))
- case false =>
- throw new ConfigRuntimeException(
- "Plugin[" + source.getClass.getName + "] must be registered as dataset/table, please set \"result_table_name\" config")
- }
- }
-
- private[seatunnel] def transformProcess(environment: SparkEnvironment, transform: BaseSparkTransform, ds: Dataset[Row]): Dataset[Row] = {
- val config = transform.getConfig()
- val fromDs = config.hasPath(SparkBatchExecution.sourceTableName) match {
- case true =>
- val sourceTableName = config.getString(SparkBatchExecution.sourceTableName)
- environment.getSparkSession.read.table(sourceTableName)
- case false => ds
- }
-
- transform.process(fromDs, environment)
- }
-
- private[seatunnel] def registerTransformTempView(plugin: BaseSparkTransform, ds: Dataset[Row]): Unit = {
- val config = plugin.getConfig()
- if (config.hasPath(SparkBatchExecution.resultTableName)) {
- val tableName = config.getString(SparkBatchExecution.resultTableName)
- registerTempView(tableName, ds)
- }
- }
-
- private[seatunnel] def sinkProcess(environment: SparkEnvironment, sink: BaseSparkSink[_], ds: Dataset[Row]): Unit = {
- val config = sink.getConfig()
- val fromDs = config.hasPath(SparkBatchExecution.sourceTableName) match {
- case true =>
- val sourceTableName = config.getString(SparkBatchExecution.sourceTableName)
- environment.getSparkSession.read.table(sourceTableName)
- case false => ds
- }
-
- sink.output(fromDs, environment)
- }
-}
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
index e5f702b..16dfe69 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
@@ -44,9 +44,9 @@ class SparkStreamingExecution(sparkEnvironment: SparkEnvironment)
sparkEnvironment,
dataset => {
val conf = source.getConfig
- if (conf.hasPath(SparkBatchExecution.resultTableName)) {
+ if (conf.hasPath(SparkBatchExecution.RESULT_TABLE_NAME)) {
SparkBatchExecution.registerTempView(
- conf.getString(SparkBatchExecution.resultTableName),
+ conf.getString(SparkBatchExecution.RESULT_TABLE_NAME),
dataset)
}
var ds = dataset
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.scala b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.scala
deleted file mode 100644
index 8d92637..0000000
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.seatunnel.spark.structuredstream
-
-import org.apache.seatunnel.common.config.CheckResult
-import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory}
-import org.apache.seatunnel.env.Execution
-import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
-import java.util.{List => JList}
-
-
-class StructuredStreamingExecution(environment: SparkEnvironment)
- extends Execution[StructuredStreamingSource, BaseSparkTransform, StructuredStreamingSink] {
-
- private var config = ConfigFactory.empty()
-
- override def setConfig(config: Config): Unit = this.config = config
-
- override def getConfig: Config = config
-
- override def checkConfig(): CheckResult = CheckResult.success()
-
- override def prepare(void: Void): Unit = {}
-
- override def start(sources: JList[StructuredStreamingSource], transforms: JList[BaseSparkTransform], sinks: JList[StructuredStreamingSink]): Unit = {}
-}