You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/03/17 20:17:59 UTC
[flink] branch master updated: [FLINK-26687][Connectors][NiFi] Remove Apache NiFi connector
This is an automated email from the ASF dual-hosted git repository.
martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 328b7d1 [FLINK-26687][Connectors][NiFi] Remove Apache NiFi connector
328b7d1 is described below
commit 328b7d195cb1a4d5ab86a1b891723e77c0e63140
Author: MartijnVisser <ma...@apache.org>
AuthorDate: Thu Mar 17 09:45:27 2022 +0100
[FLINK-26687][Connectors][NiFi] Remove Apache NiFi connector
---
docs/content.zh/docs/connectors/datastream/nifi.md | 132 ------------------
.../docs/connectors/datastream/overview.md | 1 -
docs/content/docs/connectors/datastream/nifi.md | 141 -------------------
.../content/docs/connectors/datastream/overview.md | 1 -
.../flink-architecture-tests-production/pom.xml | 5 -
flink-architecture-tests/pom.xml | 7 -
flink-connectors/flink-connector-nifi/pom.xml | 82 -----------
.../streaming/connectors/nifi/NiFiDataPacket.java | 33 -----
.../connectors/nifi/NiFiDataPacketBuilder.java | 34 -----
.../flink/streaming/connectors/nifi/NiFiSink.java | 79 -----------
.../streaming/connectors/nifi/NiFiSource.java | 154 ---------------------
.../connectors/nifi/StandardNiFiDataPacket.java | 44 ------
.../nifi/examples/NiFiSinkTopologyExample.java | 63 ---------
.../nifi/examples/NiFiSourceTopologyExample.java | 60 --------
.../src/test/resources/NiFi_Flink.xml | 16 ---
flink-connectors/pom.xml | 1 -
tools/ci/stage.sh | 1 -
17 files changed, 854 deletions(-)
diff --git a/docs/content.zh/docs/connectors/datastream/nifi.md b/docs/content.zh/docs/connectors/datastream/nifi.md
deleted file mode 100644
index d8e53e6..0000000
--- a/docs/content.zh/docs/connectors/datastream/nifi.md
+++ /dev/null
@@ -1,132 +0,0 @@
----
-title: NiFi
-weight: 8
-type: docs
-aliases:
- - /zh/dev/connectors/nifi.html
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-# Apache NiFi 连接器
-
-{{< hint warning >}}
-The NiFi connector is deprecated and will be removed with Flink 1.16.
-{{< /hint >}}
-
-[Apache NiFi](https://nifi.apache.org/) 连接器提供了可以读取和写入的 Source 和 Sink。
-使用这个连接器,需要在工程中添加下面的依赖:
-
-{{< artifact flink-connector-nifi >}}
-
-注意这些连接器目前还没有包含在二进制发行版中。添加依赖、打包配置以及集群运行的相关信息请参考 [这里]({{< ref "docs/dev/datastream/project-configuration" >}})。
-
-#### 安装 Apache NiFi
-
-安装 Apache NiFi 集群请参考 [这里](https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#how-to-install-and-start-nifi)。
-
-#### Apache NiFi Source
-
-该连接器提供了一个 Source 可以用来从 Apache NiFi 读取数据到 Apache Flink。
-
-`NiFiSource(…)` 类有两个构造方法。
-
-- `NiFiSource(SiteToSiteConfig config)` - 构造一个 `NiFiSource(…)` ,需要指定参数 SiteToSiteConfig ,采用默认的等待时间 1000 ms。
-
-- `NiFiSource(SiteToSiteConfig config, long waitTimeMs)` - 构造一个 `NiFiSource(…)`,需要指定参数 SiteToSiteConfig 和等待时间(单位为毫秒)。
-
-示例:
-
-{{< tabs "44ccc35b-83c3-464f-9464-995d4981f4d9" >}}
-{{< tab "Java" >}}
-```java
-StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-
-SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
- .url("http://localhost:8080/nifi")
- .portName("Data for Flink")
- .requestBatchCount(5)
- .buildConfig();
-
-SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()
-
-val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
- .url("http://localhost:8080/nifi")
- .portName("Data for Flink")
- .requestBatchCount(5)
- .buildConfig()
-
-val nifiSource = new NiFiSource(clientConfig)
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-数据从 Apache NiFi Output Port 读取,Apache NiFi Output Port 也被称为 "Data for Flink",是 Apache NiFi Site-to-site 协议配置的一部分。
-
-#### Apache NiFi Sink
-
-该连接器提供了一个 Sink 可以用来把 Apache Flink 的数据写入到 Apache NiFi。
-
-`NiFiSink(…)` 类只有一个构造方法。
-
-- `NiFiSink(SiteToSiteClientConfig, NiFiDataPacketBuilder<T>)` 构造一个 `NiFiSink(…)`,需要指定 `SiteToSiteConfig` 和 `NiFiDataPacketBuilder` 参数 ,`NiFiDataPacketBuilder` 可以将Flink数据转化成可以被NiFi识别的 `NiFiDataPacket`.
-
-示例:
-
-{{< tabs "599dbd31-e2a4-4203-a428-0a4c95c8fd07" >}}
-{{< tab "Java" >}}
-```java
-StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-
-SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
- .url("http://localhost:8080/nifi")
- .portName("Data from Flink")
- .requestBatchCount(5)
- .buildConfig();
-
-SinkFunction<NiFiDataPacket> nifiSink = new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<T>() {...});
-
-streamExecEnv.addSink(nifiSink);
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()
-
-val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
- .url("http://localhost:8080/nifi")
- .portName("Data from Flink")
- .requestBatchCount(5)
- .buildConfig()
-
-val nifiSink: NiFiSink[NiFiDataPacket] = new NiFiSink[NiFiDataPacket](clientConfig, new NiFiDataPacketBuilder<T>() {...})
-
-streamExecEnv.addSink(nifiSink)
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-更多关于 [Apache NiFi](https://nifi.apache.org) Site-to-Site Protocol 的信息请参考 [这里](https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#site-to-site)。
-
-{{< top >}}
diff --git a/docs/content.zh/docs/connectors/datastream/overview.md b/docs/content.zh/docs/connectors/datastream/overview.md
index 6d08762..911559f 100644
--- a/docs/content.zh/docs/connectors/datastream/overview.md
+++ b/docs/content.zh/docs/connectors/datastream/overview.md
@@ -45,7 +45,6 @@ under the License.
* [RabbitMQ]({{< ref "docs/connectors/datastream/rabbitmq" >}}) (source/sink)
* [Google PubSub]({{< ref "docs/connectors/datastream/pubsub" >}}) (source/sink)
* [Hybrid Source]({{< ref "docs/connectors/datastream/hybridsource" >}}) (source)
- * [Apache NiFi]({{< ref "docs/connectors/datastream/nifi" >}}) (source/sink)
* [Apache Pulsar]({{< ref "docs/connectors/datastream/pulsar" >}}) (source)
* [JDBC]({{< ref "docs/connectors/datastream/jdbc" >}}) (sink)
diff --git a/docs/content/docs/connectors/datastream/nifi.md b/docs/content/docs/connectors/datastream/nifi.md
deleted file mode 100644
index 1c68ba2..0000000
--- a/docs/content/docs/connectors/datastream/nifi.md
+++ /dev/null
@@ -1,141 +0,0 @@
----
-title: NiFi
-weight: 8
-type: docs
-aliases:
- - /dev/connectors/nifi.html
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-# Apache NiFi Connector
-
-{{< hint warning >}}
-The NiFi connector is deprecated and will be removed with Flink 1.16.
-{{< /hint >}}
-
-This connector provides a Source and Sink that can read from and write to
-[Apache NiFi](https://nifi.apache.org/). To use this connector, add the
-following dependency to your project:
-
-{{< artifact flink-connector-nifi >}}
-
-Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{< ref "docs/dev/configuration/overview" >}})
-for information about how to package the program with the libraries for
-cluster execution.
-
-#### Installing Apache NiFi
-
-Instructions for setting up a Apache NiFi cluster can be found
-[here](https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#how-to-install-and-start-nifi).
-
-#### Apache NiFi Source
-
-The connector provides a Source for reading data from Apache NiFi to Apache Flink.
-
-The class `NiFiSource(…)` provides 2 constructors for reading data from NiFi.
-
-- `NiFiSource(SiteToSiteConfig config)` - Constructs a `NiFiSource(…)` given the client's SiteToSiteConfig and a
- default wait time of 1000 ms.
-
-- `NiFiSource(SiteToSiteConfig config, long waitTimeMs)` - Constructs a `NiFiSource(…)` given the client's
- SiteToSiteConfig and the specified wait time (in milliseconds).
-
-Example:
-
-{{< tabs "14bce3dd-fcc4-4c98-bdd8-ed7819b7f0c4" >}}
-{{< tab "Java" >}}
-```java
-StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-
-SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
- .url("http://localhost:8080/nifi")
- .portName("Data for Flink")
- .requestBatchCount(5)
- .buildConfig();
-
-SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()
-
-val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
- .url("http://localhost:8080/nifi")
- .portName("Data for Flink")
- .requestBatchCount(5)
- .buildConfig()
-
-val nifiSource = new NiFiSource(clientConfig)
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-Here data is read from the Apache NiFi Output Port called "Data for Flink" which is part of Apache NiFi
-Site-to-site protocol configuration.
-
-#### Apache NiFi Sink
-
-The connector provides a Sink for writing data from Apache Flink to Apache NiFi.
-
-The class `NiFiSink(…)` provides a constructor for instantiating a `NiFiSink`.
-
-- `NiFiSink(SiteToSiteClientConfig, NiFiDataPacketBuilder<T>)` constructs a `NiFiSink(…)` given the client's `SiteToSiteConfig` and a `NiFiDataPacketBuilder` that converts data from Flink to `NiFiDataPacket` to be ingested by NiFi.
-
-Example:
-
-{{< tabs "bcf46513-edfb-4b41-b588-51009eb9f59a" >}}
-{{< tab "Java" >}}
-```java
-StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-
-SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
- .url("http://localhost:8080/nifi")
- .portName("Data from Flink")
- .requestBatchCount(5)
- .buildConfig();
-
-SinkFunction<NiFiDataPacket> nifiSink = new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<T>() {...});
-
-streamExecEnv.addSink(nifiSink);
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()
-
-val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
- .url("http://localhost:8080/nifi")
- .portName("Data from Flink")
- .requestBatchCount(5)
- .buildConfig()
-
-val nifiSink: NiFiSink[NiFiDataPacket] = new NiFiSink[NiFiDataPacket](clientConfig, new NiFiDataPacketBuilder<T>() {...})
-
-streamExecEnv.addSink(nifiSink)
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-More information about [Apache NiFi](https://nifi.apache.org) Site-to-Site Protocol can be found [here](https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#site-to-site)
-
-{{< top >}}
diff --git a/docs/content/docs/connectors/datastream/overview.md b/docs/content/docs/connectors/datastream/overview.md
index 42cf647..591cd89 100644
--- a/docs/content/docs/connectors/datastream/overview.md
+++ b/docs/content/docs/connectors/datastream/overview.md
@@ -46,7 +46,6 @@ Connectors provide code for interfacing with various third-party systems. Curren
* [RabbitMQ]({{< ref "docs/connectors/datastream/rabbitmq" >}}) (source/sink)
* [Google PubSub]({{< ref "docs/connectors/datastream/pubsub" >}}) (source/sink)
* [Hybrid Source]({{< ref "docs/connectors/datastream/hybridsource" >}}) (source)
- * [Apache NiFi]({{< ref "docs/connectors/datastream/nifi" >}}) (source/sink)
* [Apache Pulsar]({{< ref "docs/connectors/datastream/pulsar" >}}) (source)
* [JDBC]({{< ref "docs/connectors/datastream/jdbc" >}}) (sink)
diff --git a/flink-architecture-tests/flink-architecture-tests-production/pom.xml b/flink-architecture-tests/flink-architecture-tests-production/pom.xml
index ce5d53c..2b858b0 100644
--- a/flink-architecture-tests/flink-architecture-tests-production/pom.xml
+++ b/flink-architecture-tests/flink-architecture-tests-production/pom.xml
@@ -184,11 +184,6 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-nifi</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
<artifactId>flink-connector-pulsar</artifactId>
</dependency>
diff --git a/flink-architecture-tests/pom.xml b/flink-architecture-tests/pom.xml
index 243afe7..6fb1e7f 100644
--- a/flink-architecture-tests/pom.xml
+++ b/flink-architecture-tests/pom.xml
@@ -233,13 +233,6 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-nifi</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
<artifactId>flink-connector-pulsar</artifactId>
<version>${project.version}</version>
<scope>test</scope>
diff --git a/flink-connectors/flink-connector-nifi/pom.xml b/flink-connectors/flink-connector-nifi/pom.xml
deleted file mode 100644
index 0a284d9..0000000
--- a/flink-connectors/flink-connector-nifi/pom.xml
+++ /dev/null
@@ -1,82 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connectors</artifactId>
- <version>1.16-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-connector-nifi</artifactId>
- <name>Flink : Connectors : Nifi</name>
-
- <packaging>jar</packaging>
-
- <!-- Allow users to pass custom connector versions -->
- <properties>
- <nifi.version>1.14.0</nifi.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-site-to-site-client</artifactId>
- <version>${nifi.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-tests</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <rerunFailingTestsCount>3</rerunFailingTestsCount>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-</project>
diff --git a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
deleted file mode 100644
index 01e77d33..0000000
--- a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.nifi;
-
-import java.util.Map;
-
-/**
- * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both a FlowFile's
- * content and its attributes so that they can be processed by Flink.
- */
-public interface NiFiDataPacket {
-
- /** @return the contents of a NiFi FlowFile */
- byte[] getContent();
-
- /** @return a Map of attributes that are associated with the NiFi FlowFile */
- Map<String, String> getAttributes();
-}
diff --git a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
deleted file mode 100644
index 492b0ec..0000000
--- a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.nifi;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-import java.io.Serializable;
-
-/**
- * A function that can create a NiFiDataPacket from an incoming instance of the given type.
- *
- * @param <T>
- */
-public interface NiFiDataPacketBuilder<T> extends Function, Serializable {
-
- NiFiDataPacket createNiFiDataPacket(T t, RuntimeContext ctx);
-}
diff --git a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
deleted file mode 100644
index 0921c28..0000000
--- a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.nifi;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
-import org.apache.nifi.remote.Transaction;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-
-/**
- * A sink that delivers data to Apache NiFi using the NiFi Site-to-Site client. The sink requires a
- * NiFiDataPacketBuilder which can create instances of NiFiDataPacket from the incoming data.
- *
- * @deprecated The NiFi Sink has been deprecated and will be removed in a future Flink release.
- */
-@Deprecated
-public class NiFiSink<T> extends RichSinkFunction<T> {
-
- private SiteToSiteClient client;
- private SiteToSiteClientConfig clientConfig;
- private NiFiDataPacketBuilder<T> builder;
-
- /**
- * Construct a new NiFiSink with the given client config and NiFiDataPacketBuilder.
- *
- * @param clientConfig the configuration for building a NiFi SiteToSiteClient
- * @param builder a builder to produce NiFiDataPackets from incoming data
- */
- public NiFiSink(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder<T> builder) {
- this.clientConfig = clientConfig;
- this.builder = builder;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- this.client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
- }
-
- @Override
- public void invoke(T value) throws Exception {
- final NiFiDataPacket niFiDataPacket =
- builder.createNiFiDataPacket(value, getRuntimeContext());
-
- final Transaction transaction = client.createTransaction(TransferDirection.SEND);
- if (transaction == null) {
- throw new IllegalStateException("Unable to create a NiFi Transaction to send data");
- }
-
- transaction.send(niFiDataPacket.getContent(), niFiDataPacket.getAttributes());
- transaction.confirm();
- transaction.complete();
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- client.close();
- }
-}
diff --git a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
deleted file mode 100644
index a2ec6c7..0000000
--- a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.nifi;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-
-import org.apache.nifi.remote.Transaction;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-import org.apache.nifi.remote.protocol.DataPacket;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A source that pulls data from Apache NiFi using the NiFi Site-to-Site client. This source
- * produces NiFiDataPackets which encapsulate the content and attributes of a NiFi FlowFile.
- *
- * @deprecated The NiFi Source has been deprecated and will be removed in a future Flink release.
- */
-@Deprecated
-public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(NiFiSource.class);
-
- private static final long DEFAULT_WAIT_TIME_MS = 1000;
-
- // ------------------------------------------------------------------------
-
- private final SiteToSiteClientConfig clientConfig;
-
- private final long waitTimeMs;
-
- private transient SiteToSiteClient client;
-
- private volatile boolean isRunning = true;
-
- /**
- * Constructs a new NiFiSource using the given client config and the default wait time of 1000
- * ms.
- *
- * @param clientConfig the configuration for building a NiFi SiteToSiteClient
- */
- public NiFiSource(SiteToSiteClientConfig clientConfig) {
- this(clientConfig, DEFAULT_WAIT_TIME_MS);
- }
-
- /**
- * Constructs a new NiFiSource using the given client config and wait time.
- *
- * @param clientConfig the configuration for building a NiFi SiteToSiteClient
- * @param waitTimeMs the amount of time to wait (in milliseconds) if no data is available to
- * pull from NiFi
- */
- public NiFiSource(SiteToSiteClientConfig clientConfig, long waitTimeMs) {
- this.clientConfig = clientConfig;
- this.waitTimeMs = waitTimeMs;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
- }
-
- @Override
- public void run(SourceContext<NiFiDataPacket> ctx) throws Exception {
- while (isRunning) {
- final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
- if (transaction == null) {
- LOG.warn("A transaction could not be created, waiting and will try again...");
- try {
- Thread.sleep(waitTimeMs);
- } catch (InterruptedException ignored) {
-
- }
- continue;
- }
-
- DataPacket dataPacket = transaction.receive();
- if (dataPacket == null) {
- transaction.confirm();
- transaction.complete();
-
- LOG.debug("No data available to pull, waiting and will try again...");
- try {
- Thread.sleep(waitTimeMs);
- } catch (InterruptedException ignored) {
-
- }
- continue;
- }
-
- final List<NiFiDataPacket> niFiDataPackets = new ArrayList<>();
- do {
- // Read the data into a byte array and wrap it along with the attributes
- // into a NiFiDataPacket.
- final InputStream inStream = dataPacket.getData();
- final byte[] data = new byte[(int) dataPacket.getSize()];
- StreamUtils.fillBuffer(inStream, data);
-
- final Map<String, String> attributes = dataPacket.getAttributes();
-
- niFiDataPackets.add(new StandardNiFiDataPacket(data, attributes));
- dataPacket = transaction.receive();
- } while (dataPacket != null);
-
- // Confirm transaction to verify the data
- transaction.confirm();
-
- for (NiFiDataPacket dp : niFiDataPackets) {
- ctx.collect(dp);
- }
-
- transaction.complete();
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- client.close();
- }
-}
diff --git a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
deleted file mode 100644
index c460f12..0000000
--- a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.nifi;
-
-import java.io.Serializable;
-import java.util.Map;
-
-/** An implementation of NiFiDataPacket. */
-public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {
- private static final long serialVersionUID = 6364005260220243322L;
-
- private final byte[] content;
- private final Map<String, String> attributes;
-
- public StandardNiFiDataPacket(final byte[] content, final Map<String, String> attributes) {
- this.content = content;
- this.attributes = attributes;
- }
-
- @Override
- public byte[] getContent() {
- return content;
- }
-
- @Override
- public Map<String, String> getAttributes() {
- return attributes;
- }
-}
diff --git a/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java b/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
deleted file mode 100644
index 9927c69..0000000
--- a/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.nifi.examples;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
-import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
-import org.apache.flink.streaming.connectors.nifi.NiFiSink;
-import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
-
-import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-
-import java.util.HashMap;
-
-/** An example topology that sends data to a NiFi input port named "Data from Flink". */
-public class NiFiSinkTopologyExample {
-
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- SiteToSiteClientConfig clientConfig =
- new SiteToSiteClient.Builder()
- .url("http://localhost:8080/nifi")
- .portName("Data from Flink")
- .buildConfig();
-
- DataStreamSink<String> dataStream =
- env.fromElements("one", "two", "three", "four", "five", "q")
- .addSink(
- new NiFiSink<>(
- clientConfig,
- new NiFiDataPacketBuilder<String>() {
- @Override
- public NiFiDataPacket createNiFiDataPacket(
- String s, RuntimeContext ctx) {
- return new StandardNiFiDataPacket(
- s.getBytes(ConfigConstants.DEFAULT_CHARSET),
- new HashMap<String, String>());
- }
- }));
-
- env.execute();
- }
-}
diff --git a/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java b/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
deleted file mode 100644
index 4d8dbfb..0000000
--- a/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.nifi.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
-import org.apache.flink.streaming.connectors.nifi.NiFiSource;
-
-import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-
-import java.nio.charset.Charset;
-
-/** An example topology that pulls data from a NiFi output port named "Data for Flink". */
-public class NiFiSourceTopologyExample {
-
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- SiteToSiteClientConfig clientConfig =
- new SiteToSiteClient.Builder()
- .url("http://localhost:8080/nifi")
- .portName("Data for Flink")
- .requestBatchCount(5)
- .buildConfig();
-
- SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
- DataStream<NiFiDataPacket> streamSource = env.addSource(nifiSource).setParallelism(2);
-
- DataStream<String> dataStream =
- streamSource.map(
- new MapFunction<NiFiDataPacket, String>() {
- @Override
- public String map(NiFiDataPacket value) throws Exception {
- return new String(value.getContent(), Charset.defaultCharset());
- }
- });
-
- dataStream.print();
- env.execute();
- }
-}
diff --git a/flink-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml b/flink-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml
deleted file mode 100644
index d373d63..0000000
--- a/flink-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml
+++ /dev/null
@@ -1,16 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<template><description></description><name>NiFi_Flink</name><snippet><connections><id>34acfdda-dd21-48c0-8779-95d0e258f5cb</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><destination><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>8b8b6a2f-ee24-4f32-a178-248f3a3f5482</id><type>PROCESSOR</type></destination><flowFileExpiration>0 [...]
\ No newline at end of file
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 1ac2273..0d2ca2b 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -47,7 +47,6 @@ under the License.
<module>flink-connector-hive</module>
<module>flink-connector-jdbc</module>
<module>flink-connector-rabbitmq</module>
- <module>flink-connector-nifi</module>
<module>flink-connector-cassandra</module>
<module>flink-connector-kafka</module>
<module>flink-connector-gcp-pubsub</module>
diff --git a/tools/ci/stage.sh b/tools/ci/stage.sh
index 78562d8..164b7db 100755
--- a/tools/ci/stage.sh
+++ b/tools/ci/stage.sh
@@ -104,7 +104,6 @@ flink-connectors/flink-connector-elasticsearch7,\
flink-connectors/flink-sql-connector-elasticsearch6,\
flink-connectors/flink-sql-connector-elasticsearch7,\
flink-connectors/flink-connector-elasticsearch-base,\
-flink-connectors/flink-connector-nifi,\
flink-connectors/flink-connector-rabbitmq,\
flink-connectors/flink-connector-kinesis,\
flink-connectors/flink-connector-aws-kinesis-streams,\