You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/09/06 06:11:07 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] Add sentry sink connector #2244 (#2584)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9fd40390a [Feature][Connector-V2] Add sentry sink connector #2244 (#2584)
9fd40390a is described below

commit 9fd40390a74f028738e0afaa87e4863291e42083
Author: Saintyang <sh...@foxmail.com>
AuthorDate: Tue Sep 6 14:11:02 2022 +0800

    [Feature][Connector-V2] Add sentry sink connector #2244 (#2584)
    
    * [Feature][Connector-V2] Add sentry sink connector #2244
    
    * [Feature][Connector-V2] Add sentry sink connector: modify pom.xml and doc, remove example class
    
    Co-authored-by: yangshengjie <ya...@analysys.com.cn>
---
 docs/en/connector-v2/sink/Sentry.md                | 59 +++++++++++++++++
 plugin-mapping.properties                          |  2 +-
 seatunnel-connectors-v2-dist/pom.xml               |  5 ++
 seatunnel-connectors-v2/connector-sentry/pom.xml   | 55 ++++++++++++++++
 .../seatunnel/sentry/sink/SentryConfig.java        | 32 +++++++++
 .../seatunnel/sentry/sink/SentrySink.java          | 73 ++++++++++++++++++++
 .../seatunnel/sentry/sink/SentrySinkState.java     | 23 +++++++
 .../seatunnel/sentry/sink/SentrySinkWriter.java    | 77 ++++++++++++++++++++++
 seatunnel-connectors-v2/pom.xml                    |  1 +
 .../seatunnel-flink-connector-v2-example/pom.xml   |  1 -
 10 files changed, 326 insertions(+), 2 deletions(-)

diff --git a/docs/en/connector-v2/sink/Sentry.md b/docs/en/connector-v2/sink/Sentry.md
new file mode 100644
index 000000000..1e64e8aab
--- /dev/null
+++ b/docs/en/connector-v2/sink/Sentry.md
@@ -0,0 +1,59 @@
+# Sentry
+
+## Description
+
+Write message to Sentry.
+
+## Key features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+
+
+## Options
+
+| name                       | type    | required | default value |
+|----------------------------|---------|----------| ------------- |
+| dsn                        | string  | yes      | -             |
+| env                        | string  | no       | -             |
+| release                    | string  | no       | -             |
+| cacheDirPath               | string  | no       | -             |
+| enableExternalConfiguration | boolean | no       | -             |
+| maxCacheItems              | number  | no       | -             |
+| flushTimeoutMills          | number  | no       | -             |
+| maxQueueSize               | number  | no       | -             |
+### dsn [string]
+
+The DSN tells the SDK where to send the events to.
+
+### env [string]
+specify the environment
+
+### release [string]
+specify the release
+
+### cacheDirPath [string]
+the cache dir path for caching offline events
+
+### enableExternalConfiguration [boolean]
+if loading properties from external sources is enabled.
+
+### maxCacheItems [number]
+The max cache items for capping the number of events Default is 30
+
+### flushTimeoutMillis [number]
+Controls how many seconds to wait before flushing down. Sentry SDKs cache events from a background queue and this queue is given a certain amount to drain pending events Default is 15000 = 15s
+
+### maxQueueSize [number]
+Max queue size before flushing events/envelopes to the disk
+
+## Example
+```
+  Sentry {
+    dsn = "https://xxx@sentry.xxx.com:9999/6"
+    enableExternalConfiguration = true
+    maxCacheItems = 1000
+    env = prod
+  }
+
+```
\ No newline at end of file
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index ada6b9e45..0107a87df 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -124,4 +124,4 @@ seatunnel.sink.FtpFile = connector-file-ftp
 seatunnel.sink.Socket = connector-socket
 seatunnel.source.Redis = connector-redis
 seatunnel.sink.DataHub = connector-datahub
-
+seatunnel.sink.Sentry = connector-sentry
diff --git a/seatunnel-connectors-v2-dist/pom.xml b/seatunnel-connectors-v2-dist/pom.xml
index ce13e8adb..baf154db1 100644
--- a/seatunnel-connectors-v2-dist/pom.xml
+++ b/seatunnel-connectors-v2-dist/pom.xml
@@ -156,6 +156,11 @@
             <artifactId>connector-datahub</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-sentry</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/seatunnel-connectors-v2/connector-sentry/pom.xml b/seatunnel-connectors-v2/connector-sentry/pom.xml
new file mode 100644
index 000000000..c2234a66e
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-sentry/pom.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel-connectors-v2</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>connector-sentry</artifactId>
+
+    <properties>
+        <sentry.version>5.0.1</sentry.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.sentry</groupId>
+            <artifactId>sentry-logback</artifactId>
+            <version>${sentry.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentryConfig.java b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentryConfig.java
new file mode 100644
index 000000000..421495b96
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentryConfig.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.sentry.sink;
+
+public class SentryConfig {
+
+    public static final String SENTRY = "sentry";
+    public static final String DSN = "dsn";
+    public static final String ENV = "env";
+    public static final String RELEASE = "release";
+    public static final String CACHE_DIRPATH = "cacheDirPath";
+    public static final String ENABLE_EXTERNAL_CONFIGURATION = "enableExternalConfiguration";
+    public static final String MAX_CACHEITEMS = "maxCacheItems";
+    public static final String FLUSH_TIMEOUTMILLIS = "flushTimeoutMillis";
+    public static final String MAX_QUEUESIZE = "maxQueueSize";
+
+}
diff --git a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
new file mode 100644
index 000000000..e4f1ada70
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.sentry.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter.Context;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+/**
+ * @description: SentrySink class
+ */
+@AutoService(SeaTunnelSink.class)
+public class SentrySink extends AbstractSimpleSink<SeaTunnelRow, SentrySinkState> {
+
+    private SeaTunnelRowType seaTunnelRowType;
+    private Config pluginConfig;
+    @Override
+    public String getPluginName() {
+        return SentryConfig.SENTRY;
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        if (!pluginConfig.hasPath(SentryConfig.DSN)) {
+            throw new PrepareFailException(getPluginName(), PluginType.SINK,
+                    String.format("Config must include column : %s", SentryConfig.DSN));
+        }
+
+        this.pluginConfig = pluginConfig;
+    }
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+        return this.seaTunnelRowType;
+    }
+
+    @Override
+    public AbstractSinkWriter<SeaTunnelRow, SentrySinkState> createWriter(Context context) throws IOException {
+        return new SentrySinkWriter(seaTunnelRowType, context, pluginConfig);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkState.java b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkState.java
new file mode 100644
index 000000000..b5ad101c3
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkState.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.sentry.sink;
+
+import java.io.Serializable;
+
+public class SentrySinkState implements Serializable {
+}
diff --git a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkWriter.java b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkWriter.java
new file mode 100644
index 000000000..4f94055bd
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkWriter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.sentry.sink;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import io.sentry.Sentry;
+import io.sentry.SentryOptions;
+
+import java.io.IOException;
+
+/**
+ * @description: SentrySinkWriter class
+ */
+
+public class SentrySinkWriter extends AbstractSinkWriter<SeaTunnelRow, SentrySinkState> {
+    private SeaTunnelRowType seaTunnelRowType;
+    public SentrySinkWriter(SeaTunnelRowType seaTunnelRowType,
+                            SinkWriter.Context context,
+                            Config pluginConfig) {
+        SentryOptions options = new SentryOptions();
+        options.setDsn(pluginConfig.getString(SentryConfig.DSN));
+        if (pluginConfig.hasPath(SentryConfig.ENV)){
+            options.setEnvironment(pluginConfig.getString(SentryConfig.ENV));
+        }
+        if (pluginConfig.hasPath(SentryConfig.RELEASE)){
+            options.setRelease(pluginConfig.getString(SentryConfig.RELEASE));
+        }
+        if (pluginConfig.hasPath(SentryConfig.CACHE_DIRPATH)){
+            options.setCacheDirPath(pluginConfig.getString(SentryConfig.CACHE_DIRPATH));
+        }
+        if (pluginConfig.hasPath(SentryConfig.MAX_CACHEITEMS)){
+            options.setMaxCacheItems(pluginConfig.getInt(SentryConfig.MAX_CACHEITEMS));
+        }
+        if (pluginConfig.hasPath(SentryConfig.MAX_QUEUESIZE)){
+            options.setMaxQueueSize(pluginConfig.getInt(SentryConfig.MAX_QUEUESIZE));
+        }
+        if (pluginConfig.hasPath(SentryConfig.FLUSH_TIMEOUTMILLIS)){
+            options.setFlushTimeoutMillis(pluginConfig.getLong(SentryConfig.FLUSH_TIMEOUTMILLIS));
+        }
+        if (pluginConfig.hasPath(SentryConfig.ENABLE_EXTERNAL_CONFIGURATION)){
+            options.setEnableExternalConfiguration(pluginConfig.getBoolean(SentryConfig.ENABLE_EXTERNAL_CONFIGURATION));
+        }
+        Sentry.init(options);
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        Sentry.captureMessage(element.toString());
+    }
+
+    @Override
+    public void close() throws IOException {
+        Sentry.close();
+    }
+
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 42b50f6b0..e1c5b683d 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -52,6 +52,7 @@
         <module>connector-neo4j</module>
         <module>connector-redis</module>
         <module>connector-datahub</module>
+        <module>connector-sentry</module>
     </modules>
 
     <dependencyManagement>
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
index 52db44100..bc5bcd470 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
@@ -77,7 +77,6 @@
             <version>${project.version}</version>
         </dependency>
         <!--   seatunnel connectors   -->
-
         <!--flink-->
         <dependency>
             <groupId>org.apache.flink</groupId>