You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/20 06:46:23 UTC
[incubator-seatunnel] branch api-draft updated: Add SeaTunnel runtime environment (#1933)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 51fd0f7f Add SeaTunnel runtime environment (#1933)
51fd0f7f is described below
commit 51fd0f7fe002fb1e8e7837f5ec1a9b70f4fcd1e9
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri May 20 14:46:17 2022 +0800
Add SeaTunnel runtime environment (#1933)
---
.../apache/seatunnel/api/sink/SeaTunnelSink.java | 3 +-
.../api/source/SeaTunnelRuntimeEnvironment.java | 35 ++++++++++++++++++++++
.../seatunnel/api/source/SeaTunnelSource.java | 8 +++--
.../seatunnel/fake/source/FakeSource.java | 11 -------
4 files changed, 43 insertions(+), 14 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
index 1178c604..63636a17 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.api.sink;
import org.apache.seatunnel.api.common.PluginIdentifierInterface;
import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.SeaTunnelRuntimeEnvironment;
import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
import java.io.IOException;
@@ -41,7 +42,7 @@ import java.util.Optional;
* {@link SinkAggregatedCommitter} handle it, this class should implement interface {@link Serializable}.
*/
public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
- extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle {
+ extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle, SeaTunnelRuntimeEnvironment {
/**
* Set the row type info of sink row data. This method will be automatically called by translation.
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java
new file mode 100644
index 00000000..11f69e5a
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.source;
+
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+
+/**
+ * This interface defines the runtime environment of the SeaTunnel application.
+ */
+public interface SeaTunnelRuntimeEnvironment {
+
+ /**
+ * Returns the SeaTunnel runtime context.
+ *
+ * @return seaTunnelContext
+ */
+ default SeaTunnelContext getSeaTunnelContext() {
+ return SeaTunnelContext.getContext();
+ }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
index 7d1522f0..a246425a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.common.PluginIdentifierInterface;
import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+import org.apache.seatunnel.common.constants.JobMode;
import java.io.Serializable;
@@ -33,14 +34,17 @@ import java.io.Serializable;
* @param <StateT> The type of checkpoint states.
*/
public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT>
- extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle {
+ extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle, SeaTunnelRuntimeEnvironment {
/**
* Get the boundedness of this source.
*
* @return the boundedness of this source.
*/
- Boundedness getBoundedness();
+ default Boundedness getBoundedness() {
+ return JobMode.BATCH.equals(getSeaTunnelContext().getJobMode()) ?
+ Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+ }
/**
* Get the row type information of the records produced by this source.
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 91a87064..64585430 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -17,10 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.fake.source;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
-import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
@@ -28,7 +26,6 @@ import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
-import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeState;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -39,12 +36,6 @@ import com.google.auto.service.AutoService;
public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit, FakeState> {
private Config pluginConfig;
- private Boundedness boundedness;
-
- @Override
- public Boundedness getBoundedness() {
- return boundedness;
- }
@Override
public SeaTunnelRowTypeInfo getRowTypeInfo() {
@@ -89,7 +80,5 @@ public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit
@Override
public void prepare(Config pluginConfig) {
this.pluginConfig = pluginConfig;
- this.boundedness = JobMode.STREAMING.equals(SeaTunnelContext.getContext().getJobMode()) ?
- Boundedness.UNBOUNDED : Boundedness.BOUNDED;
}
}