You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/03/17 20:17:59 UTC

[flink] branch master updated: [FLINK-26687][Connectors][NiFi] Remove Apache NiFi connector

This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 328b7d1  [FLINK-26687][Connectors][NiFi] Remove Apache NiFi connector
328b7d1 is described below

commit 328b7d195cb1a4d5ab86a1b891723e77c0e63140
Author: MartijnVisser <ma...@apache.org>
AuthorDate: Thu Mar 17 09:45:27 2022 +0100

    [FLINK-26687][Connectors][NiFi] Remove Apache NiFi connector
---
 docs/content.zh/docs/connectors/datastream/nifi.md | 132 ------------------
 .../docs/connectors/datastream/overview.md         |   1 -
 docs/content/docs/connectors/datastream/nifi.md    | 141 -------------------
 .../content/docs/connectors/datastream/overview.md |   1 -
 .../flink-architecture-tests-production/pom.xml    |   5 -
 flink-architecture-tests/pom.xml                   |   7 -
 flink-connectors/flink-connector-nifi/pom.xml      |  82 -----------
 .../streaming/connectors/nifi/NiFiDataPacket.java  |  33 -----
 .../connectors/nifi/NiFiDataPacketBuilder.java     |  34 -----
 .../flink/streaming/connectors/nifi/NiFiSink.java  |  79 -----------
 .../streaming/connectors/nifi/NiFiSource.java      | 154 ---------------------
 .../connectors/nifi/StandardNiFiDataPacket.java    |  44 ------
 .../nifi/examples/NiFiSinkTopologyExample.java     |  63 ---------
 .../nifi/examples/NiFiSourceTopologyExample.java   |  60 --------
 .../src/test/resources/NiFi_Flink.xml              |  16 ---
 flink-connectors/pom.xml                           |   1 -
 tools/ci/stage.sh                                  |   1 -
 17 files changed, 854 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/nifi.md b/docs/content.zh/docs/connectors/datastream/nifi.md
deleted file mode 100644
index d8e53e6..0000000
--- a/docs/content.zh/docs/connectors/datastream/nifi.md
+++ /dev/null
@@ -1,132 +0,0 @@
----
-title: NiFi
-weight: 8
-type: docs
-aliases:
-  - /zh/dev/connectors/nifi.html
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-# Apache NiFi 连接器
-
-{{< hint warning >}}
-The NiFi connector is deprecated and will be removed with Flink 1.16.
-{{< /hint >}}
-
-[Apache NiFi](https://nifi.apache.org/) 连接器提供了可以读取和写入的 Source 和 Sink。
-使用这个连接器,需要在工程中添加下面的依赖:
-
-{{< artifact flink-connector-nifi >}}
-
-注意这些连接器目前还没有包含在二进制发行版中。添加依赖、打包配置以及集群运行的相关信息请参考 [这里]({{< ref "docs/dev/datastream/project-configuration" >}})。
-
-#### 安装 Apache NiFi
-
-安装 Apache NiFi 集群请参考 [这里](https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#how-to-install-and-start-nifi)。
-
-#### Apache NiFi Source
-
-该连接器提供了一个 Source 可以用来从 Apache NiFi 读取数据到 Apache Flink。
-
-`NiFiSource(…)` 类有两个构造方法。
-
-- `NiFiSource(SiteToSiteConfig config)` - 构造一个 `NiFiSource(…)` ,需要指定参数 SiteToSiteConfig ,采用默认的等待时间 1000 ms。
-
-- `NiFiSource(SiteToSiteConfig config, long waitTimeMs)` - 构造一个 `NiFiSource(…)`,需要指定参数 SiteToSiteConfig 和等待时间(单位为毫秒)。
-
-示例:
-
-{{< tabs "44ccc35b-83c3-464f-9464-995d4981f4d9" >}}
-{{< tab "Java" >}}
-```java
-StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-
-SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
-        .url("http://localhost:8080/nifi")
-        .portName("Data for Flink")
-        .requestBatchCount(5)
-        .buildConfig();
-
-SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()
-
-val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
-       .url("http://localhost:8080/nifi")
-       .portName("Data for Flink")
-       .requestBatchCount(5)
-       .buildConfig()
-
-val nifiSource = new NiFiSource(clientConfig)       
-```       
-{{< /tab >}}
-{{< /tabs >}}
-
-数据从 Apache NiFi Output Port 读取,Apache NiFi Output Port 也被称为 "Data for Flink",是 Apache NiFi Site-to-site 协议配置的一部分。
-
-#### Apache NiFi Sink
-
-该连接器提供了一个 Sink 可以用来把 Apache Flink 的数据写入到 Apache NiFi。
-
-`NiFiSink(…)` 类只有一个构造方法。
-
-- `NiFiSink(SiteToSiteClientConfig, NiFiDataPacketBuilder<T>)` 构造一个 `NiFiSink(…)`,需要指定 `SiteToSiteConfig` 和  `NiFiDataPacketBuilder` 参数 ,`NiFiDataPacketBuilder` 可以将Flink数据转化成可以被NiFi识别的 `NiFiDataPacket`.
-
-示例:
-
-{{< tabs "599dbd31-e2a4-4203-a428-0a4c95c8fd07" >}}
-{{< tab "Java" >}}
-```java
-StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-
-SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
-        .url("http://localhost:8080/nifi")
-        .portName("Data from Flink")
-        .requestBatchCount(5)
-        .buildConfig();
-
-SinkFunction<NiFiDataPacket> nifiSink = new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<T>() {...});
-
-streamExecEnv.addSink(nifiSink);
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()
-
-val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
-       .url("http://localhost:8080/nifi")
-       .portName("Data from Flink")
-       .requestBatchCount(5)
-       .buildConfig()
-
-val nifiSink: NiFiSink[NiFiDataPacket] = new NiFiSink[NiFiDataPacket](clientConfig, new NiFiDataPacketBuilder<T>() {...})
-
-streamExecEnv.addSink(nifiSink)
-```       
-{{< /tab >}}
-{{< /tabs >}}      
-
-更多关于 [Apache NiFi](https://nifi.apache.org) Site-to-Site Protocol 的信息请参考 [这里](https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#site-to-site)。
-
-{{< top >}}
diff --git a/docs/content.zh/docs/connectors/datastream/overview.md b/docs/content.zh/docs/connectors/datastream/overview.md
index 6d08762..911559f 100644
--- a/docs/content.zh/docs/connectors/datastream/overview.md
+++ b/docs/content.zh/docs/connectors/datastream/overview.md
@@ -45,7 +45,6 @@ under the License.
  * [RabbitMQ]({{< ref "docs/connectors/datastream/rabbitmq" >}}) (source/sink)
  * [Google PubSub]({{< ref "docs/connectors/datastream/pubsub" >}}) (source/sink)
  * [Hybrid Source]({{< ref "docs/connectors/datastream/hybridsource" >}}) (source)
- * [Apache NiFi]({{< ref "docs/connectors/datastream/nifi" >}}) (source/sink)
  * [Apache Pulsar]({{< ref "docs/connectors/datastream/pulsar" >}}) (source)
  * [JDBC]({{< ref "docs/connectors/datastream/jdbc" >}}) (sink)
 
diff --git a/docs/content/docs/connectors/datastream/nifi.md b/docs/content/docs/connectors/datastream/nifi.md
deleted file mode 100644
index 1c68ba2..0000000
--- a/docs/content/docs/connectors/datastream/nifi.md
+++ /dev/null
@@ -1,141 +0,0 @@
----
-title: NiFi
-weight: 8
-type: docs
-aliases:
-  - /dev/connectors/nifi.html
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-# Apache NiFi Connector
-
-{{< hint warning >}}
-The NiFi connector is deprecated and will be removed with Flink 1.16.
-{{< /hint >}}
-
-This connector provides a Source and Sink that can read from and write to
-[Apache NiFi](https://nifi.apache.org/). To use this connector, add the
-following dependency to your project:
-
-{{< artifact flink-connector-nifi >}}
-
-Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{< ref "docs/dev/configuration/overview" >}})
-for information about how to package the program with the libraries for
-cluster execution.
-
-#### Installing Apache NiFi
-
-Instructions for setting up a Apache NiFi cluster can be found
-[here](https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#how-to-install-and-start-nifi).
-
-#### Apache NiFi Source
-
-The connector provides a Source for reading data from Apache NiFi to Apache Flink.
-
-The class `NiFiSource(…)` provides 2 constructors for reading data from NiFi.
-
-- `NiFiSource(SiteToSiteConfig config)` - Constructs a `NiFiSource(…)` given the client's SiteToSiteConfig and a
-     default wait time of 1000 ms.
-
-- `NiFiSource(SiteToSiteConfig config, long waitTimeMs)` - Constructs a `NiFiSource(…)` given the client's
-     SiteToSiteConfig and the specified wait time (in milliseconds).
-
-Example:
-
-{{< tabs "14bce3dd-fcc4-4c98-bdd8-ed7819b7f0c4" >}}
-{{< tab "Java" >}}
-```java
-StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-
-SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
-        .url("http://localhost:8080/nifi")
-        .portName("Data for Flink")
-        .requestBatchCount(5)
-        .buildConfig();
-
-SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()
-
-val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
-       .url("http://localhost:8080/nifi")
-       .portName("Data for Flink")
-       .requestBatchCount(5)
-       .buildConfig()
-
-val nifiSource = new NiFiSource(clientConfig)       
-```       
-{{< /tab >}}
-{{< /tabs >}}
-
-Here data is read from the Apache NiFi Output Port called "Data for Flink" which is part of Apache NiFi
-Site-to-site protocol configuration.
-
-#### Apache NiFi Sink
-
-The connector provides a Sink for writing data from Apache Flink to Apache NiFi.
-
-The class `NiFiSink(…)` provides a constructor for instantiating a `NiFiSink`.
-
-- `NiFiSink(SiteToSiteClientConfig, NiFiDataPacketBuilder<T>)` constructs a `NiFiSink(…)` given the client's `SiteToSiteConfig` and a `NiFiDataPacketBuilder` that converts data from Flink to `NiFiDataPacket` to be ingested by NiFi.
-
-Example:
-
-{{< tabs "bcf46513-edfb-4b41-b588-51009eb9f59a" >}}
-{{< tab "Java" >}}
-```java
-StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-
-SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
-        .url("http://localhost:8080/nifi")
-        .portName("Data from Flink")
-        .requestBatchCount(5)
-        .buildConfig();
-
-SinkFunction<NiFiDataPacket> nifiSink = new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<T>() {...});
-
-streamExecEnv.addSink(nifiSink);
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()
-
-val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
-       .url("http://localhost:8080/nifi")
-       .portName("Data from Flink")
-       .requestBatchCount(5)
-       .buildConfig()
-
-val nifiSink: NiFiSink[NiFiDataPacket] = new NiFiSink[NiFiDataPacket](clientConfig, new NiFiDataPacketBuilder<T>() {...})
-
-streamExecEnv.addSink(nifiSink)
-```       
-{{< /tab >}}
-{{< /tabs >}}      
-
-More information about [Apache NiFi](https://nifi.apache.org) Site-to-Site Protocol can be found [here](https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#site-to-site)
-
-{{< top >}}
diff --git a/docs/content/docs/connectors/datastream/overview.md b/docs/content/docs/connectors/datastream/overview.md
index 42cf647..591cd89 100644
--- a/docs/content/docs/connectors/datastream/overview.md
+++ b/docs/content/docs/connectors/datastream/overview.md
@@ -46,7 +46,6 @@ Connectors provide code for interfacing with various third-party systems. Curren
  * [RabbitMQ]({{< ref "docs/connectors/datastream/rabbitmq" >}}) (source/sink)
  * [Google PubSub]({{< ref "docs/connectors/datastream/pubsub" >}}) (source/sink)
  * [Hybrid Source]({{< ref "docs/connectors/datastream/hybridsource" >}}) (source)
- * [Apache NiFi]({{< ref "docs/connectors/datastream/nifi" >}}) (source/sink)
  * [Apache Pulsar]({{< ref "docs/connectors/datastream/pulsar" >}}) (source)
  * [JDBC]({{< ref "docs/connectors/datastream/jdbc" >}}) (sink)
 
diff --git a/flink-architecture-tests/flink-architecture-tests-production/pom.xml b/flink-architecture-tests/flink-architecture-tests-production/pom.xml
index ce5d53c..2b858b0 100644
--- a/flink-architecture-tests/flink-architecture-tests-production/pom.xml
+++ b/flink-architecture-tests/flink-architecture-tests-production/pom.xml
@@ -184,11 +184,6 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-nifi</artifactId>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-connector-pulsar</artifactId>
 		</dependency>
 
diff --git a/flink-architecture-tests/pom.xml b/flink-architecture-tests/pom.xml
index 243afe7..6fb1e7f 100644
--- a/flink-architecture-tests/pom.xml
+++ b/flink-architecture-tests/pom.xml
@@ -233,13 +233,6 @@ under the License.
 
 			<dependency>
 				<groupId>org.apache.flink</groupId>
-				<artifactId>flink-connector-nifi</artifactId>
-				<version>${project.version}</version>
-				<scope>test</scope>
-			</dependency>
-
-			<dependency>
-				<groupId>org.apache.flink</groupId>
 				<artifactId>flink-connector-pulsar</artifactId>
 				<version>${project.version}</version>
 				<scope>test</scope>
diff --git a/flink-connectors/flink-connector-nifi/pom.xml b/flink-connectors/flink-connector-nifi/pom.xml
deleted file mode 100644
index 0a284d9..0000000
--- a/flink-connectors/flink-connector-nifi/pom.xml
+++ /dev/null
@@ -1,82 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-connectors</artifactId>
-		<version>1.16-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-connector-nifi</artifactId>
-	<name>Flink : Connectors : Nifi</name>
-
-	<packaging>jar</packaging>
-
-	<!-- Allow users to pass custom connector versions -->
-	<properties>
-		<nifi.version>1.14.0</nifi.version>
-	</properties>
-
-	<dependencies>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-site-to-site-client</artifactId>
-            <version>${nifi.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java</artifactId>
-            <version>${project.version}</version>
-			<scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-            <type>test-jar</type>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-tests</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-surefire-plugin</artifactId>
-				<configuration>
-					<rerunFailingTestsCount>3</rerunFailingTestsCount>
-				</configuration>
-			</plugin>
-		</plugins>
-	</build>
-
-</project>
diff --git a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
deleted file mode 100644
index 01e77d33..0000000
--- a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.nifi;
-
-import java.util.Map;
-
-/**
- * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both a FlowFile's
- * content and its attributes so that they can be processed by Flink.
- */
-public interface NiFiDataPacket {
-
-    /** @return the contents of a NiFi FlowFile */
-    byte[] getContent();
-
-    /** @return a Map of attributes that are associated with the NiFi FlowFile */
-    Map<String, String> getAttributes();
-}
diff --git a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
deleted file mode 100644
index 492b0ec..0000000
--- a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.nifi;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-import java.io.Serializable;
-
-/**
- * A function that can create a NiFiDataPacket from an incoming instance of the given type.
- *
- * @param <T>
- */
-public interface NiFiDataPacketBuilder<T> extends Function, Serializable {
-
-    NiFiDataPacket createNiFiDataPacket(T t, RuntimeContext ctx);
-}
diff --git a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
deleted file mode 100644
index 0921c28..0000000
--- a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.nifi;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
-import org.apache.nifi.remote.Transaction;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-
-/**
- * A sink that delivers data to Apache NiFi using the NiFi Site-to-Site client. The sink requires a
- * NiFiDataPacketBuilder which can create instances of NiFiDataPacket from the incoming data.
- *
- * @deprecated The NiFi Sink has been deprecated and will be removed in a future Flink release.
- */
-@Deprecated
-public class NiFiSink<T> extends RichSinkFunction<T> {
-
-    private SiteToSiteClient client;
-    private SiteToSiteClientConfig clientConfig;
-    private NiFiDataPacketBuilder<T> builder;
-
-    /**
-     * Construct a new NiFiSink with the given client config and NiFiDataPacketBuilder.
-     *
-     * @param clientConfig the configuration for building a NiFi SiteToSiteClient
-     * @param builder a builder to produce NiFiDataPackets from incoming data
-     */
-    public NiFiSink(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder<T> builder) {
-        this.clientConfig = clientConfig;
-        this.builder = builder;
-    }
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        super.open(parameters);
-        this.client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
-    }
-
-    @Override
-    public void invoke(T value) throws Exception {
-        final NiFiDataPacket niFiDataPacket =
-                builder.createNiFiDataPacket(value, getRuntimeContext());
-
-        final Transaction transaction = client.createTransaction(TransferDirection.SEND);
-        if (transaction == null) {
-            throw new IllegalStateException("Unable to create a NiFi Transaction to send data");
-        }
-
-        transaction.send(niFiDataPacket.getContent(), niFiDataPacket.getAttributes());
-        transaction.confirm();
-        transaction.complete();
-    }
-
-    @Override
-    public void close() throws Exception {
-        super.close();
-        client.close();
-    }
-}
diff --git a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
deleted file mode 100644
index a2ec6c7..0000000
--- a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.nifi;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-
-import org.apache.nifi.remote.Transaction;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-import org.apache.nifi.remote.protocol.DataPacket;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A source that pulls data from Apache NiFi using the NiFi Site-to-Site client. This source
- * produces NiFiDataPackets which encapsulate the content and attributes of a NiFi FlowFile.
- *
- * @deprecated The NiFi Source has been deprecated and will be removed in a future Flink release.
- */
-@Deprecated
-public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final Logger LOG = LoggerFactory.getLogger(NiFiSource.class);
-
-    private static final long DEFAULT_WAIT_TIME_MS = 1000;
-
-    // ------------------------------------------------------------------------
-
-    private final SiteToSiteClientConfig clientConfig;
-
-    private final long waitTimeMs;
-
-    private transient SiteToSiteClient client;
-
-    private volatile boolean isRunning = true;
-
-    /**
-     * Constructs a new NiFiSource using the given client config and the default wait time of 1000
-     * ms.
-     *
-     * @param clientConfig the configuration for building a NiFi SiteToSiteClient
-     */
-    public NiFiSource(SiteToSiteClientConfig clientConfig) {
-        this(clientConfig, DEFAULT_WAIT_TIME_MS);
-    }
-
-    /**
-     * Constructs a new NiFiSource using the given client config and wait time.
-     *
-     * @param clientConfig the configuration for building a NiFi SiteToSiteClient
-     * @param waitTimeMs the amount of time to wait (in milliseconds) if no data is available to
-     *     pull from NiFi
-     */
-    public NiFiSource(SiteToSiteClientConfig clientConfig, long waitTimeMs) {
-        this.clientConfig = clientConfig;
-        this.waitTimeMs = waitTimeMs;
-    }
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        super.open(parameters);
-        client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
-    }
-
-    @Override
-    public void run(SourceContext<NiFiDataPacket> ctx) throws Exception {
-        while (isRunning) {
-            final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
-            if (transaction == null) {
-                LOG.warn("A transaction could not be created, waiting and will try again...");
-                try {
-                    Thread.sleep(waitTimeMs);
-                } catch (InterruptedException ignored) {
-
-                }
-                continue;
-            }
-
-            DataPacket dataPacket = transaction.receive();
-            if (dataPacket == null) {
-                transaction.confirm();
-                transaction.complete();
-
-                LOG.debug("No data available to pull, waiting and will try again...");
-                try {
-                    Thread.sleep(waitTimeMs);
-                } catch (InterruptedException ignored) {
-
-                }
-                continue;
-            }
-
-            final List<NiFiDataPacket> niFiDataPackets = new ArrayList<>();
-            do {
-                // Read the data into a byte array and wrap it along with the attributes
-                // into a NiFiDataPacket.
-                final InputStream inStream = dataPacket.getData();
-                final byte[] data = new byte[(int) dataPacket.getSize()];
-                StreamUtils.fillBuffer(inStream, data);
-
-                final Map<String, String> attributes = dataPacket.getAttributes();
-
-                niFiDataPackets.add(new StandardNiFiDataPacket(data, attributes));
-                dataPacket = transaction.receive();
-            } while (dataPacket != null);
-
-            // Confirm transaction to verify the data
-            transaction.confirm();
-
-            for (NiFiDataPacket dp : niFiDataPackets) {
-                ctx.collect(dp);
-            }
-
-            transaction.complete();
-        }
-    }
-
-    @Override
-    public void cancel() {
-        isRunning = false;
-    }
-
-    @Override
-    public void close() throws Exception {
-        super.close();
-        client.close();
-    }
-}
diff --git a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
deleted file mode 100644
index c460f12..0000000
--- a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.nifi;
-
-import java.io.Serializable;
-import java.util.Map;
-
-/** An implementation of NiFiDataPacket. */
-public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {
-    private static final long serialVersionUID = 6364005260220243322L;
-
-    private final byte[] content;
-    private final Map<String, String> attributes;
-
-    public StandardNiFiDataPacket(final byte[] content, final Map<String, String> attributes) {
-        this.content = content;
-        this.attributes = attributes;
-    }
-
-    @Override
-    public byte[] getContent() {
-        return content;
-    }
-
-    @Override
-    public Map<String, String> getAttributes() {
-        return attributes;
-    }
-}
diff --git a/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java b/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
deleted file mode 100644
index 9927c69..0000000
--- a/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.nifi.examples;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
-import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
-import org.apache.flink.streaming.connectors.nifi.NiFiSink;
-import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
-
-import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-
-import java.util.HashMap;
-
-/** An example topology that sends data to a NiFi input port named "Data from Flink". */
-public class NiFiSinkTopologyExample {
-
-    public static void main(String[] args) throws Exception {
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-        SiteToSiteClientConfig clientConfig =
-                new SiteToSiteClient.Builder()
-                        .url("http://localhost:8080/nifi")
-                        .portName("Data from Flink")
-                        .buildConfig();
-
-        DataStreamSink<String> dataStream =
-                env.fromElements("one", "two", "three", "four", "five", "q")
-                        .addSink(
-                                new NiFiSink<>(
-                                        clientConfig,
-                                        new NiFiDataPacketBuilder<String>() {
-                                            @Override
-                                            public NiFiDataPacket createNiFiDataPacket(
-                                                    String s, RuntimeContext ctx) {
-                                                return new StandardNiFiDataPacket(
-                                                        s.getBytes(ConfigConstants.DEFAULT_CHARSET),
-                                                        new HashMap<String, String>());
-                                            }
-                                        }));
-
-        env.execute();
-    }
-}
diff --git a/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java b/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
deleted file mode 100644
index 4d8dbfb..0000000
--- a/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.nifi.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
-import org.apache.flink.streaming.connectors.nifi.NiFiSource;
-
-import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-
-import java.nio.charset.Charset;
-
-/** An example topology that pulls data from a NiFi output port named "Data for Flink". */
-public class NiFiSourceTopologyExample {
-
-    public static void main(String[] args) throws Exception {
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-        SiteToSiteClientConfig clientConfig =
-                new SiteToSiteClient.Builder()
-                        .url("http://localhost:8080/nifi")
-                        .portName("Data for Flink")
-                        .requestBatchCount(5)
-                        .buildConfig();
-
-        SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
-        DataStream<NiFiDataPacket> streamSource = env.addSource(nifiSource).setParallelism(2);
-
-        DataStream<String> dataStream =
-                streamSource.map(
-                        new MapFunction<NiFiDataPacket, String>() {
-                            @Override
-                            public String map(NiFiDataPacket value) throws Exception {
-                                return new String(value.getContent(), Charset.defaultCharset());
-                            }
-                        });
-
-        dataStream.print();
-        env.execute();
-    }
-}
diff --git a/flink-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml b/flink-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml
deleted file mode 100644
index d373d63..0000000
--- a/flink-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml
+++ /dev/null
@@ -1,16 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements. See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License. You may obtain a copy of the License at
-  http://www.apache.org/licenses/LICENSE-2.0
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<template><description></description><name>NiFi_Flink</name><snippet><connections><id>34acfdda-dd21-48c0-8779-95d0e258f5cb</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><destination><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>8b8b6a2f-ee24-4f32-a178-248f3a3f5482</id><type>PROCESSOR</type></destination><flowFileExpiration>0  [...]
\ No newline at end of file
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 1ac2273..0d2ca2b 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -47,7 +47,6 @@ under the License.
 		<module>flink-connector-hive</module>
 		<module>flink-connector-jdbc</module>
 		<module>flink-connector-rabbitmq</module>
-		<module>flink-connector-nifi</module>
 		<module>flink-connector-cassandra</module>
 		<module>flink-connector-kafka</module>
 		<module>flink-connector-gcp-pubsub</module>
diff --git a/tools/ci/stage.sh b/tools/ci/stage.sh
index 78562d8..164b7db 100755
--- a/tools/ci/stage.sh
+++ b/tools/ci/stage.sh
@@ -104,7 +104,6 @@ flink-connectors/flink-connector-elasticsearch7,\
 flink-connectors/flink-sql-connector-elasticsearch6,\
 flink-connectors/flink-sql-connector-elasticsearch7,\
 flink-connectors/flink-connector-elasticsearch-base,\
-flink-connectors/flink-connector-nifi,\
 flink-connectors/flink-connector-rabbitmq,\
 flink-connectors/flink-connector-kinesis,\
 flink-connectors/flink-connector-aws-kinesis-streams,\