You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/06/28 11:28:58 UTC

[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #2071: add assert sink to Api draft

Hisoka-X commented on code in PR #2071:
URL: https://github.com/apache/incubator-seatunnel/pull/2071#discussion_r908359326


##########
docs/en/connector/sink/Assert.md:
##########
@@ -0,0 +1,105 @@
+# Assert
+
+> # Sink plugin: Assert [Flink]

Review Comment:
   In fact, the new SeaTunnel API support both spark and flink. So you can test use spark example to make sure your connector can execute normally on spark and flink.



##########
docs/en/connector/sink/Assert.md:
##########
@@ -0,0 +1,105 @@
+# Assert
+
+> # Sink plugin: Assert [Flink]
+
+## Description
+
+A flink sink plugin which can assert illegal data by user defined rules
+
+:::tip
+
+Engine Supported and plugin name
+
+* [ ] Spark
+* [x] Flink: AssertSink
+
+:::
+
+## Options
+
+| name                    | type     | required | default value |
+| ----------------------- | -------- | -------- | ------------- |
+| rules         		  | ConfigList | yes      | -             |
+| &ensp;field_name        | `String` | yes       | -     |

Review Comment:
   remove unknown `&ensp;` string



##########
docs/en/connector/sink/Assert.md:
##########
@@ -0,0 +1,105 @@
+# Assert
+
+> # Sink plugin: Assert [Flink]
+
+## Description
+
+A flink sink plugin which can assert illegal data by user defined rules
+
+:::tip
+
+Engine Supported and plugin name
+
+* [ ] Spark
+* [x] Flink: AssertSink
+
+:::
+
+## Options
+
+| name                    | type     | required | default value |
+| ----------------------- | -------- | -------- | ------------- |
+| rules         		  | ConfigList | yes      | -             |
+| &ensp;field_name        | `String` | yes       | -     |
+| &ensp;field_type        | `String` | no       | -          |
+| &ensp;field_value | ConfigList | no       | -             |
+| &ensp;&ensp;rule_type         | `String`    | no       | -             |
+| &ensp;&ensp;rule_value         | double    | no       | -             |
+
+
+### rules
+
+Rule definition of user's available data.  Each rule represents one field validation.
+
+### field_name
+
+field name(string)
+
+### field_type
+
+field type (string),  e.g. `string,boolean,byte,short,int,long,float,double,char,void,BigInteger,BigDecimal,Instant`
+
+### field_value
+
+A list value rule define the data value validation
+
+### rule_type
+
+The following rules are supported for now
+`
+NOT_NULL,  

Review Comment:
   Can you briefly describe each rule?



##########
docs/en/connector/sink/Assert.md:
##########
@@ -0,0 +1,105 @@
+# Assert
+
+> # Sink plugin: Assert [Flink]
+
+## Description
+
+A flink sink plugin which can assert illegal data by user defined rules
+
+:::tip
+
+Engine Supported and plugin name
+
+* [ ] Spark
+* [x] Flink: AssertSink
+
+:::
+
+## Options
+
+| name                    | type     | required | default value |
+| ----------------------- | -------- | -------- | ------------- |
+| rules         		  | ConfigList | yes      | -             |
+| &ensp;field_name        | `String` | yes       | -     |
+| &ensp;field_type        | `String` | no       | -          |
+| &ensp;field_value | ConfigList | no       | -             |
+| &ensp;&ensp;rule_type         | `String`    | no       | -             |
+| &ensp;&ensp;rule_value         | double    | no       | -             |
+
+
+### rules

Review Comment:
   Please add type after option name. Like other connector doc



##########
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.assertion.sink;
+
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
+import org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertRuleParser;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigException;
+
+import com.google.auto.service.AutoService;
+import com.google.common.base.Throwables;
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.io.IOException;
+import java.util.List;
+
+@AutoService(SeaTunnelSink.class)
+public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+    private static final String RULES = "rules";
+    private SeaTunnelContext seaTunnelContext;
+    private SeaTunnelRowType seaTunnelRowType;
+    private List<AssertFieldRule> assertFieldRules;
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+        return this.seaTunnelRowType;
+    }
+
+    @Override
+    public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) throws IOException {
+        return new AssertSinkWriter(seaTunnelRowType, assertFieldRules);
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) {
+        if (!pluginConfig.hasPath(RULES)) {
+            Throwables.propagateIfPossible(new ConfigException.Missing(RULES));
+        }
+
+        List<? extends Config> configList = pluginConfig.getConfigList(RULES);
+        if (CollectionUtils.isEmpty(configList)) {
+            Throwables.propagateIfPossible(new ConfigException.BadValue(RULES, "Assert rule config is empty, please add rule config."));
+        }
+        assertFieldRules = new AssertRuleParser().parseRules(configList);
+    }
+
+    @Override
+    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+        this.seaTunnelContext = seaTunnelContext;
+    }
+
+    @Override
+    public String getPluginName() {
+        return "AssertSink";

Review Comment:
   ```suggestion
           return "Assert";
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org