You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/20 06:46:23 UTC

[incubator-seatunnel] branch api-draft updated: Add SeaTunnel runtime environment (#1933)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 51fd0f7f Add SeaTunnel runtime environment (#1933)
51fd0f7f is described below

commit 51fd0f7fe002fb1e8e7837f5ec1a9b70f4fcd1e9
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri May 20 14:46:17 2022 +0800

    Add SeaTunnel runtime environment (#1933)
---
 .../apache/seatunnel/api/sink/SeaTunnelSink.java   |  3 +-
 .../api/source/SeaTunnelRuntimeEnvironment.java    | 35 ++++++++++++++++++++++
 .../seatunnel/api/source/SeaTunnelSource.java      |  8 +++--
 .../seatunnel/fake/source/FakeSource.java          | 11 -------
 4 files changed, 43 insertions(+), 14 deletions(-)

diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
index 1178c604..63636a17 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.api.sink;
 import org.apache.seatunnel.api.common.PluginIdentifierInterface;
 import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
 import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.SeaTunnelRuntimeEnvironment;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
 
 import java.io.IOException;
@@ -41,7 +42,7 @@ import java.util.Optional;
  *                                {@link SinkAggregatedCommitter} handle it, this class should implement interface {@link Serializable}.
  */
 public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
-    extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle {
+    extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle, SeaTunnelRuntimeEnvironment {
 
     /**
      * Set the row type info of sink row data. This method will be automatically called by translation.
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java
new file mode 100644
index 00000000..11f69e5a
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.source;
+
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+
+/**
+ * This interface defines the runtime environment of the SeaTunnel application.
+ */
+public interface SeaTunnelRuntimeEnvironment {
+
+    /**
+     * Returns the SeaTunnel runtime context.
+     *
+     * @return seaTunnelContext
+     */
+    default SeaTunnelContext getSeaTunnelContext() {
+        return SeaTunnelContext.getContext();
+    }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
index 7d1522f0..a246425a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.common.PluginIdentifierInterface;
 import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+import org.apache.seatunnel.common.constants.JobMode;
 
 import java.io.Serializable;
 
@@ -33,14 +34,17 @@ import java.io.Serializable;
  * @param <StateT> The type of checkpoint states.
  */
 public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT>
-    extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle {
+    extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle, SeaTunnelRuntimeEnvironment {
 
     /**
      * Get the boundedness of this source.
      *
      * @return the boundedness of this source.
      */
-    Boundedness getBoundedness();
+    default Boundedness getBoundedness() {
+        return JobMode.BATCH.equals(getSeaTunnelContext().getJobMode()) ?
+            Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+    }
 
     /**
      * Get the row type information of the records produced by this source.
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 91a87064..64585430 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -17,10 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.fake.source;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
-import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
@@ -28,7 +26,6 @@ import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
-import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeState;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -39,12 +36,6 @@ import com.google.auto.service.AutoService;
 public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit, FakeState> {
 
     private Config pluginConfig;
-    private Boundedness boundedness;
-
-    @Override
-    public Boundedness getBoundedness() {
-        return boundedness;
-    }
 
     @Override
     public SeaTunnelRowTypeInfo getRowTypeInfo() {
@@ -89,7 +80,5 @@ public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit
     @Override
     public void prepare(Config pluginConfig) {
         this.pluginConfig = pluginConfig;
-        this.boundedness = JobMode.STREAMING.equals(SeaTunnelContext.getContext().getJobMode()) ?
-            Boundedness.UNBOUNDED : Boundedness.BOUNDED;
     }
 }