You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/09/06 06:11:07 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] Add sentry sink connector #2244 (#2584)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9fd40390a [Feature][Connector-V2] Add sentry sink connector #2244 (#2584)
9fd40390a is described below
commit 9fd40390a74f028738e0afaa87e4863291e42083
Author: Saintyang <sh...@foxmail.com>
AuthorDate: Tue Sep 6 14:11:02 2022 +0800
[Feature][Connector-V2] Add sentry sink connector #2244 (#2584)
* [Feature][Connector-V2] Add sentry sink connector #2244
* [Feature][Connector-V2] Add sentry sink connector: modify pom.xml and doc, remove example class
Co-authored-by: yangshengjie <ya...@analysys.com.cn>
---
docs/en/connector-v2/sink/Sentry.md | 59 +++++++++++++++++
plugin-mapping.properties | 2 +-
seatunnel-connectors-v2-dist/pom.xml | 5 ++
seatunnel-connectors-v2/connector-sentry/pom.xml | 55 ++++++++++++++++
.../seatunnel/sentry/sink/SentryConfig.java | 32 +++++++++
.../seatunnel/sentry/sink/SentrySink.java | 73 ++++++++++++++++++++
.../seatunnel/sentry/sink/SentrySinkState.java | 23 +++++++
.../seatunnel/sentry/sink/SentrySinkWriter.java | 77 ++++++++++++++++++++++
seatunnel-connectors-v2/pom.xml | 1 +
.../seatunnel-flink-connector-v2-example/pom.xml | 1 -
10 files changed, 326 insertions(+), 2 deletions(-)
diff --git a/docs/en/connector-v2/sink/Sentry.md b/docs/en/connector-v2/sink/Sentry.md
new file mode 100644
index 000000000..1e64e8aab
--- /dev/null
+++ b/docs/en/connector-v2/sink/Sentry.md
@@ -0,0 +1,59 @@
+# Sentry
+
+## Description
+
+Write message to Sentry.
+
+## Key features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+
+
+## Options
+
+| name | type | required | default value |
+|----------------------------|---------|----------| ------------- |
+| dsn | string | yes | - |
+| env | string | no | - |
+| release | string | no | - |
+| cacheDirPath | string | no | - |
+| enableExternalConfiguration | boolean | no | - |
+| maxCacheItems | number | no | - |
+| flushTimeoutMills | number | no | - |
+| maxQueueSize | number | no | - |
+### dsn [string]
+
+The DSN tells the SDK where to send the events to.
+
+### env [string]
+specify the environment
+
+### release [string]
+specify the release
+
+### cacheDirPath [string]
+the cache dir path for caching offline events
+
+### enableExternalConfiguration [boolean]
+if loading properties from external sources is enabled.
+
+### maxCacheItems [number]
+The max cache items for capping the number of events Default is 30
+
+### flushTimeoutMillis [number]
+Controls how many seconds to wait before flushing down. Sentry SDKs cache events from a background queue and this queue is given a certain amount to drain pending events Default is 15000 = 15s
+
+### maxQueueSize [number]
+Max queue size before flushing events/envelopes to the disk
+
+## Example
+```
+ Sentry {
+ dsn = "https://xxx@sentry.xxx.com:9999/6"
+ enableExternalConfiguration = true
+ maxCacheItems = 1000
+ env = prod
+ }
+
+```
\ No newline at end of file
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index ada6b9e45..0107a87df 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -124,4 +124,4 @@ seatunnel.sink.FtpFile = connector-file-ftp
seatunnel.sink.Socket = connector-socket
seatunnel.source.Redis = connector-redis
seatunnel.sink.DataHub = connector-datahub
-
+seatunnel.sink.Sentry = connector-sentry
diff --git a/seatunnel-connectors-v2-dist/pom.xml b/seatunnel-connectors-v2-dist/pom.xml
index ce13e8adb..baf154db1 100644
--- a/seatunnel-connectors-v2-dist/pom.xml
+++ b/seatunnel-connectors-v2-dist/pom.xml
@@ -156,6 +156,11 @@
<artifactId>connector-datahub</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-sentry</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git a/seatunnel-connectors-v2/connector-sentry/pom.xml b/seatunnel-connectors-v2/connector-sentry/pom.xml
new file mode 100644
index 000000000..c2234a66e
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-sentry/pom.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel-connectors-v2</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>connector-sentry</artifactId>
+
+ <properties>
+ <sentry.version>5.0.1</sentry.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.sentry</groupId>
+ <artifactId>sentry-logback</artifactId>
+ <version>${sentry.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentryConfig.java b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentryConfig.java
new file mode 100644
index 000000000..421495b96
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentryConfig.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.sentry.sink;
+
+public class SentryConfig {
+
+ public static final String SENTRY = "sentry";
+ public static final String DSN = "dsn";
+ public static final String ENV = "env";
+ public static final String RELEASE = "release";
+ public static final String CACHE_DIRPATH = "cacheDirPath";
+ public static final String ENABLE_EXTERNAL_CONFIGURATION = "enableExternalConfiguration";
+ public static final String MAX_CACHEITEMS = "maxCacheItems";
+ public static final String FLUSH_TIMEOUTMILLIS = "flushTimeoutMillis";
+ public static final String MAX_QUEUESIZE = "maxQueueSize";
+
+}
diff --git a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
new file mode 100644
index 000000000..e4f1ada70
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.sentry.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter.Context;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+/**
+ * @description: SentrySink class
+ */
+@AutoService(SeaTunnelSink.class)
+public class SentrySink extends AbstractSimpleSink<SeaTunnelRow, SentrySinkState> {
+
+ private SeaTunnelRowType seaTunnelRowType;
+ private Config pluginConfig;
+ @Override
+ public String getPluginName() {
+ return SentryConfig.SENTRY;
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ if (!pluginConfig.hasPath(SentryConfig.DSN)) {
+ throw new PrepareFailException(getPluginName(), PluginType.SINK,
+ String.format("Config must include column : %s", SentryConfig.DSN));
+ }
+
+ this.pluginConfig = pluginConfig;
+ }
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+ return this.seaTunnelRowType;
+ }
+
+ @Override
+ public AbstractSinkWriter<SeaTunnelRow, SentrySinkState> createWriter(Context context) throws IOException {
+ return new SentrySinkWriter(seaTunnelRowType, context, pluginConfig);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkState.java b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkState.java
new file mode 100644
index 000000000..b5ad101c3
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkState.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.sentry.sink;
+
+import java.io.Serializable;
+
+public class SentrySinkState implements Serializable {
+}
diff --git a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkWriter.java b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkWriter.java
new file mode 100644
index 000000000..4f94055bd
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkWriter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.sentry.sink;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import io.sentry.Sentry;
+import io.sentry.SentryOptions;
+
+import java.io.IOException;
+
+/**
+ * @description: SentrySinkWriter class
+ */
+
+public class SentrySinkWriter extends AbstractSinkWriter<SeaTunnelRow, SentrySinkState> {
+ private SeaTunnelRowType seaTunnelRowType;
+ public SentrySinkWriter(SeaTunnelRowType seaTunnelRowType,
+ SinkWriter.Context context,
+ Config pluginConfig) {
+ SentryOptions options = new SentryOptions();
+ options.setDsn(pluginConfig.getString(SentryConfig.DSN));
+ if (pluginConfig.hasPath(SentryConfig.ENV)){
+ options.setEnvironment(pluginConfig.getString(SentryConfig.ENV));
+ }
+ if (pluginConfig.hasPath(SentryConfig.RELEASE)){
+ options.setRelease(pluginConfig.getString(SentryConfig.RELEASE));
+ }
+ if (pluginConfig.hasPath(SentryConfig.CACHE_DIRPATH)){
+ options.setCacheDirPath(pluginConfig.getString(SentryConfig.CACHE_DIRPATH));
+ }
+ if (pluginConfig.hasPath(SentryConfig.MAX_CACHEITEMS)){
+ options.setMaxCacheItems(pluginConfig.getInt(SentryConfig.MAX_CACHEITEMS));
+ }
+ if (pluginConfig.hasPath(SentryConfig.MAX_QUEUESIZE)){
+ options.setMaxQueueSize(pluginConfig.getInt(SentryConfig.MAX_QUEUESIZE));
+ }
+ if (pluginConfig.hasPath(SentryConfig.FLUSH_TIMEOUTMILLIS)){
+ options.setFlushTimeoutMillis(pluginConfig.getLong(SentryConfig.FLUSH_TIMEOUTMILLIS));
+ }
+ if (pluginConfig.hasPath(SentryConfig.ENABLE_EXTERNAL_CONFIGURATION)){
+ options.setEnableExternalConfiguration(pluginConfig.getBoolean(SentryConfig.ENABLE_EXTERNAL_CONFIGURATION));
+ }
+ Sentry.init(options);
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+ Sentry.captureMessage(element.toString());
+ }
+
+ @Override
+ public void close() throws IOException {
+ Sentry.close();
+ }
+
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 42b50f6b0..e1c5b683d 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -52,6 +52,7 @@
<module>connector-neo4j</module>
<module>connector-redis</module>
<module>connector-datahub</module>
+ <module>connector-sentry</module>
</modules>
<dependencyManagement>
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
index 52db44100..bc5bcd470 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
@@ -77,7 +77,6 @@
<version>${project.version}</version>
</dependency>
<!-- seatunnel connectors -->
-
<!--flink-->
<dependency>
<groupId>org.apache.flink</groupId>