You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/09/26 12:46:20 UTC

[incubator-seatunnel] branch dev updated: [Improve][Connector-v2-Fake]Supports direct definition of data values(row) (#2839)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b7d9dde6c [Improve][Connector-v2-Fake]Supports direct definition of data values(row) (#2839)
b7d9dde6c is described below

commit b7d9dde6c8803bb72b4943020f4d9a7d0b52c1d4
Author: Laglangyue <35...@users.noreply.github.com>
AuthorDate: Mon Sep 26 20:46:14 2022 +0800

    [Improve][Connector-v2-Fake]Supports direct definition of data values(row) (#2839)
    
    * [Improve][Connector-v2]Supports direct definition of data values(row)
---
 docs/en/connector-v2/source/FakeSource.md          |  8 +++++
 .../seatunnel/fake/source/FakeOptions.java         | 40 ++++++++++++++++++++++
 .../seatunnel/fake/source/FakeSource.java          |  4 ++-
 .../seatunnel/fake/source/FakeSourceReader.java    |  6 ++--
 .../resources/assertion/fakesource_to_assert.conf  |  1 +
 .../main/resources/examples/fake_to_console.conf   |  1 +
 .../src/main/resources/examples/spark.batch.conf   |  1 +
 7 files changed, 58 insertions(+), 3 deletions(-)

diff --git a/docs/en/connector-v2/source/FakeSource.md b/docs/en/connector-v2/source/FakeSource.md
index 3c66ce679..c4bc5057c 100644
--- a/docs/en/connector-v2/source/FakeSource.md
+++ b/docs/en/connector-v2/source/FakeSource.md
@@ -22,15 +22,21 @@ just for testing, such as type conversion and feature testing
 |-------------------|--------|----------|---------------|
 | result_table_name | string | yes      | -             |
 | schema            | config | yes      | -             |
+| row.num           | long   | no       | 10            |
 
 ### result_table_name [string]
 
 The table name.
 
 ### type [string]
+
 Table structure description ,you should assign schema option to tell connector how to parse data to the row you want.  
 **Tips**: Most of Unstructured-Datasource contain this param, such as LocalFile,HdfsFile.  
 **Example**:
+
+### row.num
+Number of additional rows of generated data
+
 ```hocon
 schema = {
       fields {
@@ -55,7 +61,9 @@ schema = {
 ```
 
 ## Example
+
 Simple source for FakeSource which contains enough datatype
+
 ```hocon
 source {
   FakeSource {
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeOptions.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeOptions.java
new file mode 100644
index 000000000..96dc5e8ac
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeOptions.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+
+public class FakeOptions implements Serializable {
+
+    private static final String ROW_NUM = "row.num";
+    private static final Long DEFAULT_ROW_NUM = 10L;
+    @Getter
+    @Setter
+    private Long rowNum;
+
+    public static FakeOptions parse(Config config) {
+        FakeOptions fakeOptions = new FakeOptions();
+        fakeOptions.setRowNum(config.hasPath(ROW_NUM) ? config.getLong(ROW_NUM) : DEFAULT_ROW_NUM);
+        return fakeOptions;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index fd9387f2d..eeede4439 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -38,6 +38,7 @@ public class FakeSource extends AbstractSingleSplitSource<SeaTunnelRow> {
     private Config pluginConfig;
     private JobContext jobContext;
     private SeaTunnelSchema schema;
+    private FakeOptions fakeOptions;
 
     @Override
     public Boundedness getBoundedness() {
@@ -51,7 +52,7 @@ public class FakeSource extends AbstractSingleSplitSource<SeaTunnelRow> {
 
     @Override
     public AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext readerContext) throws Exception {
-        return new FakeSourceReader(readerContext, new FakeRandomData(schema));
+        return new FakeSourceReader(readerContext, new FakeRandomData(schema), fakeOptions);
     }
 
     @Override
@@ -64,6 +65,7 @@ public class FakeSource extends AbstractSingleSplitSource<SeaTunnelRow> {
         this.pluginConfig = pluginConfig;
         assert pluginConfig.hasPath(FakeRandomData.SCHEMA);
         this.schema = SeaTunnelSchema.buildWithConfig(pluginConfig.getConfig(FakeRandomData.SCHEMA));
+        this.fakeOptions = FakeOptions.parse(pluginConfig);
     }
 
     @Override
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index d67e1f4b2..6301a284f 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -33,10 +33,12 @@ public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
     private final SingleSplitReaderContext context;
 
     private final FakeRandomData fakeRandomData;
+    private final FakeOptions options;
 
-    public FakeSourceReader(SingleSplitReaderContext context, FakeRandomData randomData) {
+    public FakeSourceReader(SingleSplitReaderContext context, FakeRandomData randomData, FakeOptions options) {
         this.context = context;
         this.fakeRandomData = randomData;
+        this.options = options;
     }
 
     @Override
@@ -53,7 +55,7 @@ public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
     @SuppressWarnings("magicnumber")
     public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException {
         // Generate a random number of rows to emit.
-        for (int i = 0; i < 10; i++) {
+        for (int i = 0; i < options.getRowNum(); i++) {
             SeaTunnelRow seaTunnelRow = fakeRandomData.randomRow();
             output.collect(seaTunnelRow);
         }
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
index 15da3772b..9b628ba46 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
@@ -24,6 +24,7 @@ env {
 
 source {
     FakeSource {
+      row.num = 16
       result_table_name = "fake"
       schema = {
         fields {
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
index 6a89b64a6..674274f5b 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
@@ -30,6 +30,7 @@ source {
   # This is a example source plugin **only for test and demonstrate the feature source plugin**
     FakeSource {
       result_table_name = "fake"
+      row.num = 16
       schema = {
         fields {
           name = "string"
diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
index e04880dd2..83f57c829 100644
--- a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
+++ b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
@@ -33,6 +33,7 @@ env {
 source {
   # This is a example input plugin **only for test and demonstrate the feature input plugin**
   FakeSource {
+    row.num = 16
     schema = {
       fields {
         c_map = "map<string, string>"