You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/02/03 21:59:05 UTC
[pulsar-adapters] branch master updated: Removing Flink in favor of https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-pulsar (#34)
This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git
The following commit(s) were added to refs/heads/master by this push:
new 195d125 Removing Flink in favor of https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-pulsar (#34)
195d125 is described below
commit 195d12552f9d168072a2ed830a69f5373e9655af
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Thu Feb 3 13:59:01 2022 -0800
Removing Flink in favor of https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-pulsar (#34)
### Motivation
Removing Flink adapter in favor of https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-pulsar
This one is based on an old version of Flink which brings in dependencies with various CVEs, since that version Flink added pulsar connector in their project.
### Modifications
Removed Flink adapter, tests, examples, and dependencies.
---
.gitignore | 2 -
README.md | 2 +-
examples/flink/pom.xml | 141 -----
.../example/FlinkPulsarBatchAvroSinkExample.java | 89 ---
.../example/FlinkPulsarBatchCsvSinkExample.java | 100 ----
.../example/FlinkPulsarBatchJsonSinkExample.java | 124 ----
.../example/FlinkPulsarBatchSinkExample.java | 128 ----
.../batch/connectors/pulsar/example/README.md | 220 -------
.../example/PulsarConsumerSourceWordCount.java | 128 ----
...lsarConsumerSourceWordCountToAvroTableSink.java | 121 ----
...lsarConsumerSourceWordCountToJsonTableSink.java | 134 -----
.../streaming/connectors/pulsar/example/README.md | 209 -------
.../flink/src/main/resources/avro/NasaMission.avsc | 20 -
examples/flink/src/main/resources/log4j2.xml | 37 --
.../FlinkPulsarBatchAvroSinkScalaExample.scala | 88 ---
.../FlinkPulsarBatchCsvSinkScalaExample.scala | 94 ---
.../FlinkPulsarBatchJsonSinkScalaExample.scala | 96 ---
.../example/FlinkPulsarBatchSinkScalaExample.scala | 102 ----
examples/pom.xml | 1 -
pom.xml | 45 --
pulsar-flink/README.md | 27 -
pulsar-flink/pom.xml | 196 ------
.../connectors/pulsar/BasePulsarOutputFormat.java | 133 -----
.../connectors/pulsar/PulsarAvroOutputFormat.java | 44 --
.../connectors/pulsar/PulsarCsvOutputFormat.java | 45 --
.../connectors/pulsar/PulsarJsonOutputFormat.java | 43 --
.../connectors/pulsar/PulsarOutputFormat.java | 49 --
.../batch/connectors/pulsar/package-info.java | 22 -
.../serialization/AvroSerializationSchema.java | 60 --
.../serialization/CsvSerializationSchema.java | 53 --
.../serialization/JsonSerializationSchema.java | 41 --
.../pulsar/serialization/package-info.java | 22 -
.../connectors/pulsar/CachedPulsarClient.java | 110 ----
.../connectors/pulsar/FlinkPulsarProducer.java | 351 -----------
.../connectors/pulsar/PulsarAvroTableSink.java | 197 ------
.../connectors/pulsar/PulsarConsumerSource.java | 200 -------
.../connectors/pulsar/PulsarJsonTableSink.java | 69 ---
.../connectors/pulsar/PulsarProduceMode.java | 37 --
.../connectors/pulsar/PulsarSourceBase.java | 31 -
.../connectors/pulsar/PulsarSourceBuilder.java | 332 -----------
.../connectors/pulsar/PulsarTableSink.java | 178 ------
.../streaming/connectors/pulsar/package-info.java | 22 -
.../pulsar/partitioner/PulsarKeyExtractor.java | 38 --
.../partitioner/PulsarPropertiesExtractor.java | 40 --
.../pulsar/partitioner/package-info.java | 22 -
.../pulsar/PulsarAvroOutputFormatTest.java | 117 ----
.../pulsar/PulsarCsvOutputFormatTest.java | 116 ----
.../pulsar/PulsarJsonOutputFormatTest.java | 116 ----
.../connectors/pulsar/PulsarOutputFormatTest.java | 195 ------
.../serialization/AvroSerializationSchemaTest.java | 66 ---
.../serialization/CsvSerializationSchemaTest.java | 53 --
.../serialization/JsonSerializationSchemaTest.java | 94 ---
.../connectors/pulsar/CachedPulsarClientTest.java | 128 ----
.../connectors/pulsar/PulsarAvroTableSinkTest.java | 122 ----
.../pulsar/PulsarConsumerSourceTests.java | 659 ---------------------
.../connectors/pulsar/PulsarJsonTableSinkTest.java | 118 ----
.../connectors/pulsar/PulsarSourceBuilderTest.java | 237 --------
.../src/test/resources/avro/NasaMission.avsc | 10 -
58 files changed, 1 insertion(+), 6273 deletions(-)
diff --git a/.gitignore b/.gitignore
index 297f31d..eecdc23 100644
--- a/.gitignore
+++ b/.gitignore
@@ -84,7 +84,5 @@ docker.debug-info
**/website/brodocs/documents/*.md
# Avro
-examples/flink/src/main/java/org/apache/flink/avro/generated
-pulsar-flink/src/test/java/org/apache/flink/avro/generated
pulsar-client/src/test/java/org/apache/pulsar/client/avro/generated
/build/
diff --git a/README.md b/README.md
index 00fccc8..3099f5d 100644
--- a/README.md
+++ b/README.md
@@ -23,7 +23,7 @@
This repository is used for hosting all the adapters maintained and supported by Apache Pulsar PMC.
-
+[Apache Flink adapter](https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-pulsar) is supported and maintained by Apache Flink Community.
## Building
diff --git a/examples/flink/pom.xml b/examples/flink/pom.xml
deleted file mode 100644
index 0ab275d..0000000
--- a/examples/flink/pom.xml
+++ /dev/null
@@ -1,141 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.pulsar.examples</groupId>
- <artifactId>pulsar-adapters-examples</artifactId>
- <version>2.8.0-SNAPSHOT</version>
- </parent>
-
- <groupId>org.apache.pulsar.examples</groupId>
- <artifactId>flink</artifactId>
- <name>Pulsar Examples :: Flink</name>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table_${scala.binary.version}</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-flink</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- </dependencies>
-
- <build>
- <resources>
- <resource>
- <directory>src/main/resources</directory>
- <filtering>true</filtering>
- </resource>
- </resources>
-
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <executions>
- <execution>
- <id>pulsar-flink-examples</id>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <shadeTestJar>false</shadeTestJar>
- <shadedArtifactAttached>false</shadedArtifactAttached>
- <createDependencyReducedPom>false</createDependencyReducedPom>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount</mainClass>
- </transformer>
- </transformers>
- <finalName>pulsar-flink-examples</finalName>
- <filters>
- <filter>
- <artifact>*</artifact>
- <includes>
- <include>org/apache/flink/streaming/examples/kafka/**</include>
- <include>org/apache/flink/streaming/**</include>
- <include>org/apache/pulsar/**</include>
- <include>org/bouncycastle/**</include>
- <include>org/apache/flink/batch/**</include>
- <include>net/jpountz/**</include>
- <include>com/scurrilous/circe/**</include>
- <include>org/apache/commons/csv/**</include>
- <include>org/apache/flink/avro/generated/**</include>
- <include>org/apache/avro/**</include>
- <include>org/codehaus/jackson/**</include>
- <include>avro/shaded/com/google/common/**</include>
- <include>org/apache/flink/formats/avro/**</include>
- </includes>
- </filter>
- </filters>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <!-- Scala Plugin to compile Scala Files -->
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>4.3.0</version>
- <executions>
- <execution>
- <goals>
- <goal>compile</goal>
- <goal>add-source</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <!-- Generate Test class from avro schema -->
- <plugin>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-maven-plugin</artifactId>
- <version>${avro.version}</version>
- <executions>
- <execution>
- <phase>generate-sources</phase>
- <goals>
- <goal>schema</goal>
- </goals>
- <configuration>
- <testSourceDirectory>${project.basedir}/src/main/resources/avro</testSourceDirectory>
- <testOutputDirectory>${project.basedir}/src/main/java/</testOutputDirectory>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
-</project>
diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
deleted file mode 100644
index 6d077f9..0000000
--- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.example;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.avro.generated.NasaMission;
-import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Implements a batch program on Pulsar topic by writing Flink DataSet as Avro.
- */
-public class FlinkPulsarBatchAvroSinkExample {
-
- private static final List<NasaMission> nasaMissions = Arrays.asList(
- NasaMission.newBuilder().setId(1).setName("Mercury program").setStartYear(1959).setEndYear(1963).build(),
- NasaMission.newBuilder().setId(2).setName("Apollo program").setStartYear(1961).setEndYear(1972).build(),
- NasaMission.newBuilder().setId(3).setName("Gemini program").setStartYear(1963).setEndYear(1966).build(),
- NasaMission.newBuilder().setId(4).setName("Skylab").setStartYear(1973).setEndYear(1974).build(),
- NasaMission.newBuilder().setId(5).setName("Apollo–Soyuz Test Project").setStartYear(1975).setEndYear(1975).build());
-
- public static void main(String[] args) throws Exception {
-
- // parse input arguments
- final ParameterTool parameterTool = ParameterTool.fromArgs(args);
-
- if (parameterTool.getNumberOfParameters() < 2) {
- System.out.println("Missing parameters!");
- System.out.println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>");
- return;
- }
-
- // set up the execution environment
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().setGlobalJobParameters(parameterTool);
-
- String serviceUrl = parameterTool.getRequired("service-url");
- String topic = parameterTool.getRequired("topic");
-
- System.out.println("Parameters:");
- System.out.println("\tServiceUrl:\t" + serviceUrl);
- System.out.println("\tTopic:\t" + topic);
-
- // create PulsarAvroOutputFormat instance
- final OutputFormat<NasaMission> pulsarAvroOutputFormat = new PulsarAvroOutputFormat<>(serviceUrl, topic, new AuthenticationDisabled());
-
- // create DataSet
- DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
- // map nasa mission names to upper-case
- nasaMissionDS.map(nasaMission -> new NasaMission(
- nasaMission.getId(),
- nasaMission.getName(),
- nasaMission.getStartYear(),
- nasaMission.getEndYear()))
- // filter missions which started after 1970
- .filter(nasaMission -> nasaMission.getStartYear() > 1970)
- // write batch data to Pulsar
- .output(pulsarAvroOutputFormat);
-
- // set parallelism to write Pulsar in parallel (optional)
- env.setParallelism(2);
-
- // execute program
- env.execute("Flink - Pulsar Batch Avro");
- }
-
-}
diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
deleted file mode 100644
index 4abb0a4..0000000
--- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.example;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.batch.connectors.pulsar.PulsarCsvOutputFormat;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Implements a batch program on Pulsar topic by writing Flink DataSet as Csv.
- */
-public class FlinkPulsarBatchCsvSinkExample {
-
- private static final List<Tuple4<Integer, String, Integer, Integer>> nasaMissions = Arrays.asList(
- new Tuple4(1, "Mercury program", 1959, 1963),
- new Tuple4(2, "Apollo program", 1961, 1972),
- new Tuple4(3, "Gemini program", 1963, 1966),
- new Tuple4(4, "Skylab", 1973, 1974),
- new Tuple4(5, "Apollo–Soyuz Test Project", 1975, 1975));
-
- public static void main(String[] args) throws Exception {
-
- // parse input arguments
- final ParameterTool parameterTool = ParameterTool.fromArgs(args);
-
- if (parameterTool.getNumberOfParameters() < 2) {
- System.out.println("Missing parameters!");
- System.out.println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>");
- return;
- }
-
- // set up the execution environment
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().setGlobalJobParameters(parameterTool);
-
- String serviceUrl = parameterTool.getRequired("service-url");
- String topic = parameterTool.getRequired("topic");
-
- System.out.println("Parameters:");
- System.out.println("\tServiceUrl:\t" + serviceUrl);
- System.out.println("\tTopic:\t" + topic);
-
- // create PulsarCsvOutputFormat instance
- final OutputFormat<Tuple4<Integer, String, Integer, Integer>> pulsarCsvOutputFormat =
- new PulsarCsvOutputFormat<>(serviceUrl, topic, new AuthenticationDisabled());
-
- // create DataSet
- DataSet<Tuple4<Integer, String, Integer, Integer>> nasaMissionDS = env.fromCollection(nasaMissions);
- // map nasa mission names to upper-case
- nasaMissionDS.map(
- new MapFunction<Tuple4<Integer, String, Integer, Integer>, Tuple4<Integer, String, Integer, Integer>>() {
- @Override
- public Tuple4<Integer, String, Integer, Integer> map(
- Tuple4<Integer, String, Integer, Integer> nasaMission) throws Exception {
- return new Tuple4(
- nasaMission.f0,
- nasaMission.f1.toUpperCase(),
- nasaMission.f2,
- nasaMission.f3);
- }
- }
- )
- // filter missions which started after 1970
- .filter(nasaMission -> nasaMission.f2 > 1970)
- // write batch data to Pulsar
- .output(pulsarCsvOutputFormat);
-
- // set parallelism to write Pulsar in parallel (optional)
- env.setParallelism(2);
-
- // execute program
- env.execute("Flink - Pulsar Batch Csv");
-
- }
-
-}
\ No newline at end of file
diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
deleted file mode 100644
index dc56364..0000000
--- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.example;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Implements a batch program on Pulsar topic by writing Flink DataSet as Json.
- */
-public class FlinkPulsarBatchJsonSinkExample {
-
- private static final List<NasaMission> nasaMissions = Arrays.asList(
- new NasaMission(1, "Mercury program", 1959, 1963),
- new NasaMission(2, "Apollo program", 1961, 1972),
- new NasaMission(3, "Gemini program", 1963, 1966),
- new NasaMission(4, "Skylab", 1973, 1974),
- new NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975));
-
- public static void main(String[] args) throws Exception {
-
- // parse input arguments
- final ParameterTool parameterTool = ParameterTool.fromArgs(args);
-
- if (parameterTool.getNumberOfParameters() < 2) {
- System.out.println("Missing parameters!");
- System.out.println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>");
- return;
- }
-
- // set up the execution environment
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().setGlobalJobParameters(parameterTool);
-
- String serviceUrl = parameterTool.getRequired("service-url");
- String topic = parameterTool.getRequired("topic");
-
- System.out.println("Parameters:");
- System.out.println("\tServiceUrl:\t" + serviceUrl);
- System.out.println("\tTopic:\t" + topic);
-
- // create PulsarJsonOutputFormat instance
- final OutputFormat<NasaMission> pulsarJsonOutputFormat = new PulsarJsonOutputFormat<>(serviceUrl, topic, new AuthenticationDisabled());
-
- // create DataSet
- DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
- // map nasa mission names to upper-case
- nasaMissionDS.map(nasaMission -> new NasaMission(
- nasaMission.id,
- nasaMission.missionName.toUpperCase(),
- nasaMission.startYear,
- nasaMission.endYear))
- // filter missions which started after 1970
- .filter(nasaMission -> nasaMission.startYear > 1970)
- // write batch data to Pulsar
- .output(pulsarJsonOutputFormat);
-
- // set parallelism to write Pulsar in parallel (optional)
- env.setParallelism(2);
-
- // execute program
- env.execute("Flink - Pulsar Batch Json");
- }
-
- /**
- * NasaMission data model
- *
- * Note: Properties should be public or have getter functions to be visible
- */
- private static class NasaMission {
-
- private int id;
- private String missionName;
- private int startYear;
- private int endYear;
-
- public NasaMission(int id, String missionName, int startYear, int endYear) {
- this.id = id;
- this.missionName = missionName;
- this.startYear = startYear;
- this.endYear = endYear;
- }
-
- public int getId() {
- return id;
- }
-
- public String getMissionName() {
- return missionName;
- }
-
- public int getStartYear() {
- return startYear;
- }
-
- public int getEndYear() {
- return endYear;
- }
- }
-
-}
\ No newline at end of file
diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
deleted file mode 100644
index 2c89579..0000000
--- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.example;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat;
-import org.apache.flink.util.Collector;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-
-/**
- * Implements a batch word-count program on Pulsar topic by writing Flink DataSet.
- */
-public class FlinkPulsarBatchSinkExample {
-
- private static final String EINSTEIN_QUOTE = "Imagination is more important than knowledge. " +
- "Knowledge is limited. Imagination encircles the world.";
-
- public static void main(String[] args) throws Exception {
-
- // parse input arguments
- final ParameterTool parameterTool = ParameterTool.fromArgs(args);
-
- if (parameterTool.getNumberOfParameters() < 2) {
- System.out.println("Missing parameters!");
- System.out.println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>");
- return;
- }
-
- // set up the execution environment
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().setGlobalJobParameters(parameterTool);
-
- String serviceUrl = parameterTool.getRequired("service-url");
- String topic = parameterTool.getRequired("topic");
-
- System.out.println("Parameters:");
- System.out.println("\tServiceUrl:\t" + serviceUrl);
- System.out.println("\tTopic:\t" + topic);
-
- // create PulsarOutputFormat instance
- final OutputFormat pulsarOutputFormat =
- new PulsarOutputFormat(serviceUrl, topic, new AuthenticationDisabled(), wordWithCount -> wordWithCount.toString().getBytes());
-
- // create DataSet
- DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);
-
- // convert sentences to words
- textDS.flatMap(new FlatMapFunction<String, WordWithCount>() {
- @Override
- public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
- String[] words = value.toLowerCase().split(" ");
- for(String word: words) {
- out.collect(new WordWithCount(word.replace(".", ""), 1));
- }
- }
- })
-
- // filter words which length is bigger than 4
- .filter(wordWithCount -> wordWithCount.word.length() > 4)
-
- // group the words
- .groupBy(new KeySelector<WordWithCount, String>() {
- @Override
- public String getKey(WordWithCount wordWithCount) throws Exception {
- return wordWithCount.word;
- }
- })
-
- // sum the word counts
- .reduce(new ReduceFunction<WordWithCount>() {
- @Override
- public WordWithCount reduce(WordWithCount wordWithCount1, WordWithCount wordWithCount2) throws Exception {
- return new WordWithCount(wordWithCount1.word, wordWithCount1.count + wordWithCount2.count);
- }
- })
-
- // write batch data to Pulsar
- .output(pulsarOutputFormat);
-
- // set parallelism to write Pulsar in parallel (optional)
- env.setParallelism(2);
-
- // execute program
- env.execute("Flink - Pulsar Batch WordCount");
-
- }
-
- /**
- * Data type for words with count.
- */
- private static class WordWithCount {
-
- public String word;
- public long count;
-
- public WordWithCount(String word, long count) {
- this.word = word;
- this.count = count;
- }
-
- @Override
- public String toString() {
- return "WordWithCount { word = " + word + ", count = " + count + " }";
- }
- }
-}
\ No newline at end of file
diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
deleted file mode 100644
index 29adc34..0000000
--- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
+++ /dev/null
@@ -1,220 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-
-The Flink Batch Sink for Pulsar is a custom sink that enables Apache [Flink](https://flink.apache.org/) to write [DataSet](https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/index.html) to Pulsar.
-
-# Prerequisites
-
-To use this sink, include a dependency for the `pulsar-flink` library in your Java configuration.
-
-# Maven
-
-If you're using Maven, add this to your `pom.xml`:
-
-```xml
-<!-- in your <properties> block -->
-<pulsar.version>{{pulsar:version}}</pulsar.version>
-
-<!-- in your <dependencies> block -->
-<dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-flink</artifactId>
- <version>${pulsar.version}</version>
-</dependency>
-```
-
-# Gradle
-
-If you're using Gradle, add this to your `build.gradle` file:
-
-```groovy
-def pulsarVersion = "{{pulsar:version}}"
-
-dependencies {
- compile group: 'org.apache.pulsar', name: 'pulsar-flink', version: pulsarVersion
-}
-```
-
-# Example
-
-### PulsarOutputFormat
-
-In this example, Flink DataSet is processed as word-count and being written to Pulsar. Please find a complete example for PulsarOutputFormat as follows:
-[java](https://github.com/apache/pulsar/tree/master/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java)
-[scala](https://github.com/apache/pulsar/tree/master/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala)
-
-The steps to run the example:
-
-1. Start Pulsar Standalone.
-
- You can follow the [instructions](https://pulsar.apache.org/docs/en/standalone/) to start a Pulsar standalone locally.
-
- ```shell
- $ bin/pulsar standalone
- ```
-
-2. Start Flink locally.
-
- You can follow the [instructions](https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/setup_quickstart.html) to download and start Flink.
-
- ```shell
- $ ./bin/start-cluster.sh
- ```
-
-3. Build the examples.
-
- ```shell
- $ cd ${PULSAR_HOME}
- $ mvn clean install -DskipTests
- ```
-
-4. Run the word count example to print results to stdout.
-
- ```shell
- # java
- $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchSinkExample ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
-
- # scala
- $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchSinkScalaExample ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
- ```
-
-5. Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_flink_topic`.
-
-```shell
-$ bin/pulsar-client consume -n 0 -s test test_flink_topic
-```
-
-6. Please find sample output for above linked application as follows:
-```
-WordWithCount { word = important, count = 1 }
-WordWithCount { word = encircles, count = 1 }
-WordWithCount { word = imagination, count = 2 }
-WordWithCount { word = knowledge, count = 2 }
-WordWithCount { word = limited, count = 1 }
-WordWithCount { word = world, count = 1 }
-```
-
-
-### PulsarCsvOutputFormat
-
-In this example, Flink DataSet is processed and written to Pulsar in Csv format. Please find a complete example for PulsarCsvOutputFormat as follows:
-[java](https://github.com/apache/pulsar/tree/master/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java)
-[scala](https://github.com/apache/pulsar/tree/master/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala)
-
-The steps to run the example:
-
-Step 1, 2 and 3 are same as above.
-
-4. Run the word count example to print results to stdout.
-
- ```shell
- # java
- $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchCsvSinkExample ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
-
- # scala
- $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchCsvSinkScalaExample ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
- ```
-
-5. Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_flink_topic`.
-
-```shell
-$ bin/pulsar-client consume -n 0 -s test test_flink_topic
-```
-
-6. Please find sample output for above linked application as follows:
-```
-4,SKYLAB,1973,1974
-5,APOLLO–SOYUZ TEST PROJECT,1975,1975
-```
-
-
-### PulsarJsonOutputFormat
-
-In this example, Flink DataSet is processed and written to Pulsar in Json format. Please find a complete example for PulsarJsonOutputFormat as follows:
-[java](https://github.com/apache/pulsar/tree/master/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java)
-[scala](https://github.com/apache/pulsar/tree/master/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala)
-
-**Note:** Property definitions of the model should be public or have getter functions to be visible.
-
-The steps to run the example:
-
-Step 1, 2 and 3 are same as above.
-
-4. Run the word count example to print results to stdout.
-
- ```shell
- # java
- $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchJsonSinkExample ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
-
- # scala
- $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchJsonSinkScalaExample ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
- ```
-
-5. Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_flink_topic`.
-
-```shell
-$ bin/pulsar-client consume -n 0 -s test test_flink_topic
-```
-
-6. Please find sample output for above linked application as follows:
-```
-{"id":4,"missionName":"SKYLAB","startYear":1973,"endYear":1974}
-{"id":5,"missionName":"APOLLO–SOYUZ TEST PROJECT","startYear":1975,"endYear":1975}
-```
-
-
-### PulsarAvroOutputFormat
-
-In this example, Flink DataSet is processed and written to Pulsar in Json format. Please find a complete example for PulsarAvroOutputFormat as follows:
-[java](https://github.com/apache/pulsar/tree/master/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java)
-[scala](https://github.com/apache/pulsar/tree/master/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala)
-
-**Note:** NasaMission class are automatically generated by Avro.
-
-The steps to run the example:
-
-Step 1, 2 and 3 are same as above.
-
-4. Run the word count example to print results to stdout.
-
- ```shell
- # java
- $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchAvroSinkExample ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
-
- # scala
- $ ./bin/flink run -c org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchAvroSinkScalaExample ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --topic test_flink_topic
- ```
-
-5. Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_flink_topic`.
-
-```shell
-$ bin/pulsar-client consume -n 0 -s test test_flink_topic
-```
-
-6. Please find sample output for above linked application as follows:
-```
- ----- got message -----
-
- Skylab��
- ----- got message -----
-
- 6Apollo–Soyuz Test Project��
-```
\ No newline at end of file
diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java
deleted file mode 100644
index 2240a4d..0000000
--- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar.example;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import lombok.AllArgsConstructor;
-import lombok.NoArgsConstructor;
-import lombok.ToString;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer;
-import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
-import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-
-/**
- * Implements a streaming wordcount program on pulsar topics.
- *
- * <p>Example usage:
- * --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
- */
-public class PulsarConsumerSourceWordCount {
-
- public static void main(String[] args) throws Exception {
- // parse input arguments
- final ParameterTool parameterTool = ParameterTool.fromArgs(args);
-
- if (parameterTool.getNumberOfParameters() < 2) {
- System.out.println("Missing parameters!");
- System.out.println("Usage: pulsar --service-url <pulsar-service-url> --input-topic <topic> --subscription <sub> --output-topic <topic>");
- return;
- }
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().disableSysoutLogging();
- env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
- env.enableCheckpointing(5000);
- env.getConfig().setGlobalJobParameters(parameterTool);
- env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
- String serviceUrl = parameterTool.getRequired("service-url");
- String inputTopic = parameterTool.getRequired("input-topic");
- String subscription = parameterTool.get("subscription", "flink-examples");
- String outputTopic = parameterTool.get("output-topic", null);
- int parallelism = parameterTool.getInt("parallelism", 1);
-
- System.out.println("Parameters:");
- System.out.println("\tServiceUrl:\t" + serviceUrl);
- System.out.println("\tInputTopic:\t" + inputTopic);
- System.out.println("\tSubscription:\t" + subscription);
- System.out.println("\tOutputTopic:\t" + outputTopic);
- System.out.println("\tParallelism:\t" + parallelism);
-
- PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new SimpleStringSchema())
- .serviceUrl(serviceUrl)
- .topic(inputTopic)
- .subscriptionName(subscription);
- SourceFunction<String> src = builder.build();
- DataStream<String> input = env.addSource(src);
-
- DataStream<WordWithCount> wc = input
- .flatMap((FlatMapFunction<String, WordWithCount>) (line, collector) -> {
- for (String word : line.split("\\s")) {
- collector.collect(new WordWithCount(word, 1));
- }
- })
- .returns(WordWithCount.class)
- .keyBy("word")
- .timeWindow(Time.seconds(5))
- .reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
- new WordWithCount(c1.word, c1.count + c2.count));
-
- if (null != outputTopic) {
- wc.addSink(new FlinkPulsarProducer<>(
- serviceUrl,
- outputTopic,
- new AuthenticationDisabled(),
- wordWithCount -> wordWithCount.toString().getBytes(UTF_8),
- wordWithCount -> wordWithCount.word,
- null
- )).setParallelism(parallelism);
- } else {
- // print the results with a single thread, rather than in parallel
- wc.print().setParallelism(1);
- }
-
- env.execute("Pulsar Stream WordCount");
- }
-
- /**
- * Data type for words with count.
- */
- @AllArgsConstructor
- @NoArgsConstructor
- @ToString
- public static class WordWithCount {
-
- public String word;
- public long count;
-
- }
-
-}
diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
deleted file mode 100644
index 84e85e8..0000000
--- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar.example;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.avro.generated.WordWithCount;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.connectors.pulsar.PulsarAvroTableSink;
-import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.sinks.CsvTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-
-/**
- * Implements a streaming wordcount program on pulsar topics.
- *
- * <p>Example usage:
- * --service-url pulsar://localhost:6650 --input-topic test_topic --subscription test_sub
- * or
- * --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_sub
- */
-public class PulsarConsumerSourceWordCountToAvroTableSink {
- private static final String ROUTING_KEY = "word";
-
- public static void main(String[] args) throws Exception {
- // parse input arguments
- final ParameterTool parameterTool = ParameterTool.fromArgs(args);
-
- if (parameterTool.getNumberOfParameters() < 3) {
- System.out.println("Missing parameters!");
- System.out.println("Usage: pulsar --service-url <pulsar-service-url> --input-topic <topic> --subscription <sub> --output-topic <topic>");
- return;
- }
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().disableSysoutLogging();
- env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
- env.enableCheckpointing(5000);
- env.getConfig().setGlobalJobParameters(parameterTool);
- env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
- StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);
-
- String serviceUrl = parameterTool.getRequired("service-url");
- String inputTopic = parameterTool.getRequired("input-topic");
- String subscription = parameterTool.get("subscription", "flink-examples");
- String outputTopic = parameterTool.get("output-topic", null);
- int parallelism = parameterTool.getInt("parallelism", 1);
-
- System.out.println("Parameters:");
- System.out.println("\tServiceUrl:\t" + serviceUrl);
- System.out.println("\tInputTopic:\t" + inputTopic);
- System.out.println("\tSubscription:\t" + subscription);
- System.out.println("\tOutputTopic:\t" + outputTopic);
- System.out.println("\tParallelism:\t" + parallelism);
-
- PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new SimpleStringSchema())
- .serviceUrl(serviceUrl)
- .topic(inputTopic)
- .subscriptionName(subscription);
- SourceFunction<String> src = builder.build();
- DataStream<String> input = env.addSource(src);
-
-
- DataStream<WordWithCount> wc = input
- .flatMap((FlatMapFunction<String, WordWithCount>) (line, collector) -> {
- for (String word : line.split("\\s")) {
- collector.collect(
- WordWithCount.newBuilder().setWord(word).setCount(1).build()
- );
- }
- })
- .returns(WordWithCount.class)
- .keyBy(ROUTING_KEY)
- .timeWindow(Time.seconds(5))
- .reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
- WordWithCount.newBuilder().setWord(c1.getWord()).setCount(c1.getCount() + c2.getCount()).build()
- );
-
- tableEnvironment.registerDataStream("wc",wc);
- Table table = tableEnvironment.sqlQuery("select word, `count` from wc");
- table.printSchema();
- TableSink sink = null;
- if (null != outputTopic) {
- sink = new PulsarAvroTableSink(serviceUrl, outputTopic, new AuthenticationDisabled(), ROUTING_KEY, WordWithCount.class);
- } else {
- // print the results with a csv file
- sink = new CsvTableSink("./examples/file", "|");
- }
- table.writeToSink(sink);
-
- env.execute("Pulsar Stream WordCount");
- }
-
-}
diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
deleted file mode 100644
index a4f9c3c..0000000
--- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar.example;
-
-import lombok.AllArgsConstructor;
-import lombok.NoArgsConstructor;
-import lombok.ToString;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.connectors.pulsar.PulsarJsonTableSink;
-import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.sinks.CsvTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-
-/**
- * Implements a streaming wordcount program on pulsar topics.
- *
- * <p>Example usage:
- * --service-url pulsar://localhost:6650 --input-topic test_topic --subscription test_sub
- * or
- * --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_sub
- */
-public class PulsarConsumerSourceWordCountToJsonTableSink {
- private static final String ROUTING_KEY = "word";
-
- public static void main(String[] args) throws Exception {
- // parse input arguments
- final ParameterTool parameterTool = ParameterTool.fromArgs(args);
-
- if (parameterTool.getNumberOfParameters() < 3) {
- System.out.println("Missing parameters!");
- System.out.println("Usage: pulsar --service-url <pulsar-service-url> --input-topic <topic> --subscription <sub> --output-topic <topic>");
- return;
- }
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().disableSysoutLogging();
- env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
- env.enableCheckpointing(5000);
- env.getConfig().setGlobalJobParameters(parameterTool);
- env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
- StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);
-
- String serviceUrl = parameterTool.getRequired("service-url");
- String inputTopic = parameterTool.getRequired("input-topic");
- String subscription = parameterTool.get("subscription", "flink-examples");
- String outputTopic = parameterTool.get("output-topic", null);
- int parallelism = parameterTool.getInt("parallelism", 1);
-
- System.out.println("Parameters:");
- System.out.println("\tServiceUrl:\t" + serviceUrl);
- System.out.println("\tInputTopic:\t" + inputTopic);
- System.out.println("\tSubscription:\t" + subscription);
- System.out.println("\tOutputTopic:\t" + outputTopic);
- System.out.println("\tParallelism:\t" + parallelism);
-
- PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new SimpleStringSchema())
- .serviceUrl(serviceUrl)
- .topic(inputTopic)
- .subscriptionName(subscription);
- SourceFunction<String> src = builder.build();
- DataStream<String> input = env.addSource(src);
-
-
- DataStream<WordWithCount> wc = input
- .flatMap((FlatMapFunction<String, WordWithCount>) (line, collector) -> {
- for (String word : line.split("\\s")) {
- collector.collect(
- new WordWithCount(word, 1)
- );
- }
- })
- .returns(WordWithCount.class)
- .keyBy(ROUTING_KEY)
- .timeWindow(Time.seconds(5))
- .reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
- new WordWithCount(c1.word, c1.count + c2.count));
-
- tableEnvironment.registerDataStream("wc",wc);
- Table table = tableEnvironment.sqlQuery("select word, `count` from wc");
- table.printSchema();
- TableSink sink = null;
- if (null != outputTopic) {
- sink = new PulsarJsonTableSink(serviceUrl, outputTopic, new AuthenticationDisabled(), ROUTING_KEY);
- } else {
- // print the results with a csv file
- sink = new CsvTableSink("./examples/file", "|");
- }
- table.writeToSink(sink);
-
- env.execute("Pulsar Stream WordCount");
- }
-
- /**
- * Data type for words with count.
- */
- @AllArgsConstructor
- @NoArgsConstructor
- @ToString
- public static class WordWithCount {
-
- public String word;
- public long count;
-
- }
-}
diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
deleted file mode 100644
index ac36eb5..0000000
--- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
+++ /dev/null
@@ -1,209 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-
-## Apache Flink Connectors for Pulsar
-
-This page describes how to use the connectors to read and write Pulsar topics with [Apache Flink](https://flink.apache.org/) stream processing applications.
-
-Build end-to-end stream processing pipelines that use Pulsar as the stream storage and message bus, and Apache Flink for computation over the streams.
-See the [Pulsar Concepts](https://pulsar.apache.org/docs/en/concepts-overview/) page for more information.
-
-## Example
-
-### PulsarConsumerSourceWordCount
-
-This Flink streaming job is consuming from a Pulsar topic and counting the wordcount in a streaming fashion. The job can write the word count results
-to stdout or another Pulsar topic.
-
-The steps to run the example:
-
-1. Start Pulsar Standalone.
-
- You can follow the [instructions](https://pulsar.apache.org/docs/en/standalone/) to start a Pulsar standalone locally.
-
- ```shell
- $ bin/pulsar standalone
- ```
-
-2. Start Flink locally.
-
- You can follow the [instructions](https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/setup_quickstart.html) to download and start Flink.
-
- ```shell
- $ ./bin/start-cluster.sh
- ```
-
-3. Build the examples.
-
- ```shell
- $ cd ${PULSAR_HOME}
- $ mvn clean install -DskipTests
- ```
-
-4. Run the word count example to print results to stdout.
-
- ```shell
- $ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
- ```
-
-5. Produce messages to topic `test_src`.
-
- ```shell
- $ bin/pulsar-client produce -m "hello world test again" -n 100 test_src
- ```
-
-6. You can check the flink taskexecutor `.out` file. The `.out` file will print the counts at the end of each time window as long as words are floating in, e.g.:
-
- ```shell
- PulsarConsumerSourceWordCount.WordWithCount(word=hello, count=100)
- PulsarConsumerSourceWordCount.WordWithCount(word=again, count=100)
- PulsarConsumerSourceWordCount.WordWithCount(word=test, count=100)
- PulsarConsumerSourceWordCount.WordWithCount(word=world, count=100)
- ```
-
-Alternatively, when you run the flink word count example at step 4, you can choose dump the result to another pulsar topic.
-
-```shell
-$ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
-```
-
-Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_dest`.
-
-```shell
-$ bin/pulsar-client consume -n 0 -s test test_dest
-```
-
-You will see similar results as what you see at step 6 when running the word count example to print results to stdout.
-
-
-### PulsarConsumerSourceWordCountToAvroTableSink
-
-This Flink streaming job is consuming from a Pulsar topic and counting the wordcount in a streaming fashion. The job can write the word count results
-to csv file or another Pulsar topic for avro format.
-
-The steps to run the example:
-
-Step 1, 2 and 3 are same as above.
-
-4. Run the word count example to print results to stdout.
-
- ```shell
- $ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToAvroTableSink ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
- ```
-
-5. Produce messages to topic `test_src`.
-
- ```shell
- $ bin/pulsar-client produce -m "hello world again" -n 100 test_src
- ```
-
-6. You can check the ${FLINK_HOME}/examples/file. The file contains the counts at the end of each time window as long as words are floating in, e.g.:
-
- ```file
- hello|100
- again|100
- test|100
- world|100
- ```
-
-Alternatively, when you run the flink word count example at step 4, you can choose dump the result to another pulsar topic.
-
-```shell
-$ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToAvroTableSink ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
-```
-
-Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_dest`.
-
-```shell
-$ bin/pulsar-client consume -n 0 -s test test_dest
-```
-
-You will see sample output for above linked application as follows:.
-```
------ got message -----
-
-hello�
------ got message -----
-
-again�
------ got message -----
-test�
------ got message -----
-
-world�
-
-```
-
-### PulsarConsumerSourceWordCountToJsonTableSink
-
-This Flink streaming job is consuming from a Pulsar topic and counting the wordcount in a streaming fashion. The job can write the word count results
-to csv file or another Pulsar topic for json format.
-
-The steps to run the example:
-
-Step 1, 2 and 3 are same as above.
-
-4. Run the word count example to print results to stdout.
-
- ```shell
- $ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToJsonTableSink ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
- ```
-
-If java.lang.ClassNotFoundException: org.apache.flink.table.sinks.TableSink and java.lang.NoClassDefFoundError: org/apache/flink/formats/json/JsonRowSerializationSchema, you need build Apache Flink from source, then copy flink-table_{version}.jar, flink-json_{version}.jar to ${FLINK_HOME}/lib and restart flink cluster.
-
-5. Produce messages to topic `test_src`.
-
- ```shell
- $ bin/pulsar-client produce -m "hello world again" -n 100 test_src
- ```
-
-6. You can check the ${FLINK_HOME}/examples/file. The file contains the counts at the end of each time window as long as words are floating in, e.g.:
-
- ```file
- hello|100
- again|100
- test|100
- world|100
- ```
-
-Alternatively, when you run the flink word count example at step 4, you can choose dump the result to another pulsar topic.
-
-```shell
-$ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToJsonTableSink ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
-```
-
-Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_dest`.
-
-```shell
-$ bin/pulsar-client consume -n 0 -s test test_dest
-```
-
-You will see sample output for above linked application as follows:.
-```
------ got message -----
-{"word":"hello","count":100}
------ got message -----
-{"word":"again","count":100}
------ got message -----
-{"word":"test","count":100}
------ got message -----
-{"word":"world","count":100}
-```
diff --git a/examples/flink/src/main/resources/avro/NasaMission.avsc b/examples/flink/src/main/resources/avro/NasaMission.avsc
deleted file mode 100644
index 45adc98..0000000
--- a/examples/flink/src/main/resources/avro/NasaMission.avsc
+++ /dev/null
@@ -1,20 +0,0 @@
-[
-{"namespace": "org.apache.flink.avro.generated",
- "type": "record",
- "name": "NasaMission",
- "fields": [
- {"name": "id", "type": "int"},
- {"name": "name", "type": "string"},
- {"name": "start_year", "type": ["int", "null"]},
- {"name": "end_year", "type": ["int", "null"]}
- ]
-},
-{"namespace": "org.apache.flink.avro.generated",
- "type": "record",
- "name": "WordWithCount",
- "fields": [
- {"name": "word", "type": "string"},
- {"name": "count", "type": "long"}
- ]
-}
-]
diff --git a/examples/flink/src/main/resources/log4j2.xml b/examples/flink/src/main/resources/log4j2.xml
deleted file mode 100644
index 2fdc2d0..0000000
--- a/examples/flink/src/main/resources/log4j2.xml
+++ /dev/null
@@ -1,37 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<Configuration status="INFO">
- <Appenders>
- <Console name="Console" target="SYSTEM_OUT">
- <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t:%C@%L] %-5level %logger{36} - %msg%n" />
- </Console>
- </Appenders>
- <Loggers>
- <Root level="warn">
- <AppenderRef ref="Console" />
- </Root>
- <Logger name="org.eclipse.jetty" level="info"/>
- <Logger name="org.apache.pulsar" level="info"/>
- <Logger name="org.apache.bookkeeper" level="info"/>
- <Logger name="org.apache.kafka" level="info"/>
- </Loggers>
-</Configuration>
diff --git a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
deleted file mode 100644
index a64e656..0000000
--- a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.example
-
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala._
-import org.apache.flink.avro.generated.NasaMission
-import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled
-
-/**
- * Implements a batch Scala program on Pulsar topic by writing Flink DataSet as Avro.
- */
-object FlinkPulsarBatchAvroSinkScalaExample {
-
- private val nasaMissions = List(
- NasaMission.newBuilder.setId(1).setName("Mercury program").setStartYear(1959).setEndYear(1963).build,
- NasaMission.newBuilder.setId(2).setName("Apollo program").setStartYear(1961).setEndYear(1972).build,
- NasaMission.newBuilder.setId(3).setName("Gemini program").setStartYear(1963).setEndYear(1966).build,
- NasaMission.newBuilder.setId(4).setName("Skylab").setStartYear(1973).setEndYear(1974).build,
- NasaMission.newBuilder.setId(5).setName("Apollo–Soyuz Test Project").setStartYear(1975).setEndYear(1975).build)
-
- def main(args: Array[String]): Unit = {
-
- // parse input arguments
- val parameterTool = ParameterTool.fromArgs(args)
-
- if (parameterTool.getNumberOfParameters < 2) {
- println("Missing parameters!")
- println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>")
- return
- }
-
- // set up the execution environment
- val env = ExecutionEnvironment.getExecutionEnvironment
- env.getConfig.setGlobalJobParameters(parameterTool)
-
- val serviceUrl = parameterTool.getRequired("service-url")
- val topic = parameterTool.getRequired("topic")
-
- println("Parameters:")
- println("\tServiceUrl:\t" + serviceUrl)
- println("\tTopic:\t" + topic)
-
- // create PulsarCsvOutputFormat instance
- val pulsarAvroOutputFormat =
- new PulsarAvroOutputFormat[NasaMission](serviceUrl, topic, new AuthenticationDisabled())
-
- // create DataSet
- val textDS = env.fromCollection(nasaMissions)
-
- // map nasa mission names to upper-case
- textDS.map(nasaMission => new NasaMission(
- nasaMission.getId,
- nasaMission.getName,
- nasaMission.getStartYear,
- nasaMission.getEndYear))
-
- // filter missions which started after 1970
- .filter(_.getStartYear > 1970)
-
- // write batch data to Pulsar as Avro
- .output(pulsarAvroOutputFormat)
-
- // set parallelism to write Pulsar in parallel (optional)
- env.setParallelism(2)
-
- // execute program
- env.execute("Flink - Pulsar Batch Avro")
- }
-
-}
diff --git a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
deleted file mode 100644
index 302d0ab..0000000
--- a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.example
-
-import org.apache.flink.api.java.tuple.Tuple4
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala._
-import org.apache.flink.batch.connectors.pulsar.PulsarCsvOutputFormat
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled
-
-/**
- * Implements a batch Scala program on Pulsar topic by writing Flink DataSet as Csv.
- */
-object FlinkPulsarBatchCsvSinkScalaExample {
-
- /**
- * NasaMission Model
- */
- private case class NasaMission(id: Int, missionName: String, startYear: Int, endYear: Int)
- extends Tuple4(id, missionName, startYear, endYear)
-
- private val nasaMissions = List(
- NasaMission(1, "Mercury program", 1959, 1963),
- NasaMission(2, "Apollo program", 1961, 1972),
- NasaMission(3, "Gemini program", 1963, 1966),
- NasaMission(4, "Skylab", 1973, 1974),
- NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))
-
- def main(args: Array[String]): Unit = {
-
- // parse input arguments
- val parameterTool = ParameterTool.fromArgs(args)
-
- if (parameterTool.getNumberOfParameters < 2) {
- println("Missing parameters!")
- println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>")
- return
- }
-
- // set up the execution environment
- val env = ExecutionEnvironment.getExecutionEnvironment
- env.getConfig.setGlobalJobParameters(parameterTool)
-
- val serviceUrl = parameterTool.getRequired("service-url")
- val topic = parameterTool.getRequired("topic")
-
- println("Parameters:")
- println("\tServiceUrl:\t" + serviceUrl)
- println("\tTopic:\t" + topic)
-
- // create PulsarCsvOutputFormat instance
- val pulsarCsvOutputFormat =
- new PulsarCsvOutputFormat[NasaMission](serviceUrl, topic, new AuthenticationDisabled())
-
- // create DataSet
- val textDS = env.fromCollection(nasaMissions)
-
- // map nasa mission names to upper-case
- textDS.map(nasaMission => NasaMission(
- nasaMission.id,
- nasaMission.missionName.toUpperCase,
- nasaMission.startYear,
- nasaMission.endYear))
-
- // filter missions which started after 1970
- .filter(_.startYear > 1970)
-
- // write batch data to Pulsar as Csv
- .output(pulsarCsvOutputFormat)
-
- // set parallelism to write Pulsar in parallel (optional)
- env.setParallelism(2)
-
- // execute program
- env.execute("Flink - Pulsar Batch Csv")
- }
-
-}
\ No newline at end of file
diff --git a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
deleted file mode 100644
index 9518751..0000000
--- a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.example
-
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala._
-import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat
-import scala.beans.BeanProperty
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled
-
-/**
- * Implements a batch Scala program on Pulsar topic by writing Flink DataSet as Json.
- */
-object FlinkPulsarBatchJsonSinkScalaExample {
-
- /**
- * NasaMission Model
- */
- private case class NasaMission(@BeanProperty id: Int,
- @BeanProperty missionName: String,
- @BeanProperty startYear: Int,
- @BeanProperty endYear: Int)
-
- private val nasaMissions = List(
- NasaMission(1, "Mercury program", 1959, 1963),
- NasaMission(2, "Apollo program", 1961, 1972),
- NasaMission(3, "Gemini program", 1963, 1966),
- NasaMission(4, "Skylab", 1973, 1974),
- NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))
-
- def main(args: Array[String]): Unit = {
-
- // parse input arguments
- val parameterTool = ParameterTool.fromArgs(args)
-
- if (parameterTool.getNumberOfParameters < 2) {
- println("Missing parameters!")
- println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>")
- return
- }
-
- // set up the execution environment
- val env = ExecutionEnvironment.getExecutionEnvironment
- env.getConfig.setGlobalJobParameters(parameterTool)
-
- val serviceUrl = parameterTool.getRequired("service-url")
- val topic = parameterTool.getRequired("topic")
-
- println("Parameters:")
- println("\tServiceUrl:\t" + serviceUrl)
- println("\tTopic:\t" + topic)
-
- // create PulsarJsonOutputFormat instance
- val pulsarJsonOutputFormat = new PulsarJsonOutputFormat[NasaMission](serviceUrl, topic, new AuthenticationDisabled())
-
- // create DataSet
- val nasaMissionDS = env.fromCollection(nasaMissions)
-
- // map nasa mission names to upper-case
- nasaMissionDS.map(nasaMission =>
- NasaMission(
- nasaMission.id,
- nasaMission.missionName.toUpperCase,
- nasaMission.startYear,
- nasaMission.endYear))
-
- // filter missions which started after 1970
- .filter(_.startYear > 1970)
-
- // write batch data to Pulsar
- .output(pulsarJsonOutputFormat)
-
- // set parallelism to write Pulsar in parallel (optional)
- env.setParallelism(2)
-
- // execute program
- env.execute("Flink - Pulsar Batch Json")
- }
-
-}
diff --git a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
deleted file mode 100644
index 369e56d..0000000
--- a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.example
-
-import org.apache.flink.api.common.serialization.SerializationSchema
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala._
-import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat
-import org.apache.flink.util.Collector
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled
-
-/**
- * Data type for words with count.
- */
-case class WordWithCount(word: String, count: Long) {
- override def toString: String = "WordWithCount { word = " + word + ", count = " + count + " }"
-}
-
-/**
- * Implements a batch word-count Scala program on Pulsar topic by writing Flink DataSet.
- */
-object FlinkPulsarBatchSinkScalaExample {
-
- private val EINSTEIN_QUOTE = "Imagination is more important than knowledge. " +
- "Knowledge is limited. Imagination encircles the world."
-
- def main(args: Array[String]): Unit = {
-
- // parse input arguments
- val parameterTool = ParameterTool.fromArgs(args)
-
- if (parameterTool.getNumberOfParameters < 2) {
- println("Missing parameters!")
- println("Usage: pulsar --service-url <pulsar-service-url> --topic <topic>")
- return
- }
-
- // set up the execution environment
- val env = ExecutionEnvironment.getExecutionEnvironment
- env.getConfig.setGlobalJobParameters(parameterTool)
-
- val serviceUrl = parameterTool.getRequired("service-url")
- val topic = parameterTool.getRequired("topic")
-
- println("Parameters:")
- println("\tServiceUrl:\t" + serviceUrl)
- println("\tTopic:\t" + topic)
-
- // create PulsarOutputFormat instance
- val pulsarOutputFormat =
- new PulsarOutputFormat[WordWithCount](serviceUrl, topic, new AuthenticationDisabled(), new SerializationSchema[WordWithCount] {
- override def serialize(wordWithCount: WordWithCount): Array[Byte] = wordWithCount.toString.getBytes
- })
-
- // create DataSet
- val textDS = env.fromElements[String](EINSTEIN_QUOTE)
-
- // convert sentence to words
- textDS.flatMap((value: String, out: Collector[WordWithCount]) => {
- val words = value.toLowerCase.split(" ")
- for (word <- words) {
- out.collect(new WordWithCount(word.replace(".", ""), 1))
- }
- })
-
- // filter words which length is bigger than 4
- .filter((wordWithCount: WordWithCount) => wordWithCount.word.length > 4)
-
- // group the words
- .groupBy((wordWithCount: WordWithCount) => wordWithCount.word)
-
- // sum the word counts
- .reduce((wordWithCount1: WordWithCount, wordWithCount2: WordWithCount) =>
- new WordWithCount(wordWithCount1.word, wordWithCount1.count + wordWithCount2.count))
-
- // write batch data to Pulsar
- .output(pulsarOutputFormat)
-
- // set parallelism to write Pulsar in parallel (optional)
- env.setParallelism(2)
-
- // execute program
- env.execute("Flink - Pulsar Batch WordCount")
- }
-
-}
\ No newline at end of file
diff --git a/examples/pom.xml b/examples/pom.xml
index 55382c4..f567c6b 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -33,7 +33,6 @@
<name>Pulsar Adapter Examples :: Parent</name>
<modules>
- <module>flink</module>
<module>spark</module>
<module>kafka-streams</module>
</modules>
diff --git a/pom.xml b/pom.xml
index 6ccf5f0..6683198 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,7 +77,6 @@
<properties>
<pulsar.version>2.8.0</pulsar.version>
- <flink.version>1.7.2</flink.version>
<storm.version>2.0.0</storm.version>
<kafka-client.version>2.7.0</kafka-client.version>
<kafka_0_8.version>0.8.1.1</kafka_0_8.version>
@@ -253,7 +252,6 @@
<modules>
<module>pulsar-storm</module>
- <module>pulsar-flink</module>
<module>pulsar-spark</module>
<module>pulsar-client-kafka-compat</module>
<module>pulsar-log4j2-appender</module>
@@ -363,49 +361,6 @@
</dependency>
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.11</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table_2.11</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-json</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-avro</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_2.11</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-scala_2.11</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime_2.11</artifactId>
- <type>test-jar</type>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.11</artifactId>
- <type>test-jar</type>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>${kafka_0_8.version}</version>
diff --git a/pulsar-flink/README.md b/pulsar-flink/README.md
deleted file mode 100644
index f970a55..0000000
--- a/pulsar-flink/README.md
+++ /dev/null
@@ -1,27 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-
-# Pulsar Flink Connector
-
-This Pulsar Flink connector is superseded by a newer version of [pulsar-flink](https://flink-packages.org/packages/pulsar-flink-connector)
-in Flink Packages.
-
-Details: https://cwiki.apache.org/confluence/display/FLINK/FLIP-72%3A+Introduce+Pulsar+Connector
\ No newline at end of file
diff --git a/pulsar-flink/pom.xml b/pulsar-flink/pom.xml
deleted file mode 100644
index bfa989f..0000000
--- a/pulsar-flink/pom.xml
+++ /dev/null
@@ -1,196 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
- xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-adapters</artifactId>
- <version>2.8.0-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>pulsar-flink</artifactId>
- <!-- This Pulsar Flink connector is superseded by a new version of `pulsar-flink` connector.
- https://flink-packages.org/packages/pulsar-flink-connector
- https://cwiki.apache.org/confluence/display/FLINK/FLIP-72%3A+Introduce+Pulsar+Connector -->
- <name>Pulsar Flink Connectors (Deprecated)</name>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table_${scala.binary.version}</artifactId>
- <scope>provided</scope>
- <!-- Projects depending on this project, won't depend on flink-table. -->
- <optional>true</optional>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-json</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-avro</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-protobuf</artifactId>
- <exclusions>
- <exclusion>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-csv</artifactId>
- <version>1.6</version>
- <scope>compile</scope>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime_${scala.binary.version}</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.javassist</groupId>
- <artifactId>javassist</artifactId>
- <version>3.20.0-GA</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.testng</groupId>
- <artifactId>testng</artifactId>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.yaml</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-api-mockito2</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-module-testng</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- </dependencies>
-
- <build>
- <resources>
- <resource>
- <directory>src/main/resources</directory>
- <filtering>true</filtering>
- </resource>
- <resource>
- <directory>src/test/resources</directory>
- <filtering>true</filtering>
- </resource>
- </resources>
-
- <plugins>
- <!-- Generate Test class from avro schema -->
- <plugin>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-maven-plugin</artifactId>
- <version>${avro.version}</version>
- <executions>
- <execution>
- <phase>generate-sources</phase>
- <goals>
- <goal>schema</goal>
- </goals>
- <configuration>
- <testSourceDirectory>${project.basedir}/src/test/resources/avro/</testSourceDirectory>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
deleted file mode 100644
index d061559..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.function.Function;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.io.RichOutputFormat;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Preconditions;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base Pulsar Output Format to write Flink DataSets into a Pulsar topic.
- */
-public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T> {
-
- private static final Logger LOG = LoggerFactory.getLogger(BasePulsarOutputFormat.class);
- private static final long serialVersionUID = 2304601727522060427L;
-
- private transient Function<Throwable, MessageId> failureCallback;
- private static volatile Producer<byte[]> producer;
-
- protected SerializationSchema<T> serializationSchema;
-
- private ClientConfigurationData clientConf;
- private ProducerConfigurationData producerConf;
-
-
- protected BasePulsarOutputFormat(final String serviceUrl, final String topicName,
- final Authentication authentication) {
- Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl), "serviceUrl cannot be blank.");
- Preconditions.checkArgument(StringUtils.isNotBlank(topicName), "topicName cannot be blank.");
-
- clientConf = new ClientConfigurationData();
- producerConf = new ProducerConfigurationData();
-
- this.clientConf.setServiceUrl(serviceUrl);
- this.clientConf.setAuthentication(authentication);
- this.producerConf.setTopicName(topicName);
-
- LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}",
- this.producerConf.getTopicName());
- }
-
- protected BasePulsarOutputFormat(ClientConfigurationData clientConf, ProducerConfigurationData producerConf) {
- this.clientConf = Preconditions.checkNotNull(clientConf, "client config data should not be null");
- this.producerConf = Preconditions.checkNotNull(producerConf, "producer config data should not be null");
-
- Preconditions.checkArgument(StringUtils.isNotBlank(clientConf.getServiceUrl()), "serviceUrl cannot be blank.");
- Preconditions.checkArgument(StringUtils.isNotBlank(producerConf.getTopicName()), "topicName cannot be blank.");
-
- LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}",
- this.producerConf.getTopicName());
- }
-
- @Override
- public void configure(Configuration configuration) {
-
- }
-
- @Override
- public void open(int taskNumber, int numTasks) throws IOException {
- this.producer = getProducerInstance();
-
- this.failureCallback = cause -> {
- LOG.error("Error while sending record to Pulsar: " + cause.getMessage(), cause);
- return null;
- };
- }
-
- @Override
- public void writeRecord(T t) throws IOException {
- byte[] data = this.serializationSchema.serialize(t);
- this.producer.sendAsync(data)
- .exceptionally(this.failureCallback);
- }
-
- @Override
- public void close() throws IOException {
-
- }
-
- private Producer<byte[]> getProducerInstance()
- throws PulsarClientException {
- if (producer == null){
- synchronized (PulsarOutputFormat.class) {
- if (producer == null){
- producer = Preconditions.checkNotNull(createPulsarProducer(),
- "Pulsar producer cannot be null.");
- }
- }
- }
- return producer;
- }
-
- private Producer<byte[]> createPulsarProducer()
- throws PulsarClientException {
- try {
- PulsarClientImpl client = new PulsarClientImpl(clientConf);
- return client.createProducerAsync(producerConf).get();
- } catch (PulsarClientException | InterruptedException | ExecutionException e) {
- LOG.error("Pulsar producer cannot be created.", e);
- throw new PulsarClientException(e);
- }
- }
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
deleted file mode 100644
index cc2acab..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar;
-
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.flink.batch.connectors.pulsar.serialization.AvroSerializationSchema;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-
-/**
- * Pulsar Avro Output Format to write Flink DataSets into a Pulsar topic in Avro format.
- */
-public class PulsarAvroOutputFormat<T extends SpecificRecord> extends BasePulsarOutputFormat<T> {
-
- private static final long serialVersionUID = -6794070714728773530L;
-
- public PulsarAvroOutputFormat(String serviceUrl, String topicName, Authentication authentication) {
- super(serviceUrl, topicName, authentication);
- this.serializationSchema = new AvroSerializationSchema();
- }
-
- public PulsarAvroOutputFormat(ClientConfigurationData clientConfigurationData,
- ProducerConfigurationData producerConfigurationData) {
- super(clientConfigurationData, producerConfigurationData);
- this.serializationSchema = new AvroSerializationSchema();
- }
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
deleted file mode 100644
index 74680d3..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.batch.connectors.pulsar.serialization.CsvSerializationSchema;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-
-/**
- * Pulsar Csv Output Format to write Flink DataSets into a Pulsar topic in Csv format.
- */
-public class PulsarCsvOutputFormat<T extends Tuple> extends BasePulsarOutputFormat<T> {
-
- private static final long serialVersionUID = -4461671510903404196L;
-
- public PulsarCsvOutputFormat(String serviceUrl, String topicName, Authentication authentication) {
- super(serviceUrl, topicName, authentication);
- this.serializationSchema = new CsvSerializationSchema<>();
- }
-
- public PulsarCsvOutputFormat(ClientConfigurationData clientConfigurationData,
- ProducerConfigurationData producerConfigurationData) {
- super(clientConfigurationData, producerConfigurationData);
- this.serializationSchema = new CsvSerializationSchema<>();
- }
-
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
deleted file mode 100644
index 837f743..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar;
-
-import org.apache.flink.batch.connectors.pulsar.serialization.JsonSerializationSchema;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-
-/**
- * Pulsar Json Output Format to write Flink DataSets into a Pulsar topic in Json format.
- */
-public class PulsarJsonOutputFormat<T> extends BasePulsarOutputFormat<T> {
-
- private static final long serialVersionUID = 8499620770848461958L;
-
- public PulsarJsonOutputFormat(String serviceUrl, String topicName, Authentication authentication) {
- super(serviceUrl, topicName, authentication);
- this.serializationSchema = new JsonSerializationSchema();
- }
-
- public PulsarJsonOutputFormat(ClientConfigurationData clientConfigurationData,
- ProducerConfigurationData producerConfigurationData) {
- super(clientConfigurationData, producerConfigurationData);
- this.serializationSchema = new JsonSerializationSchema();
- }
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
deleted file mode 100644
index 14e12e4..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.util.Preconditions;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-
-/**
- * Pulsar Output Format to write Flink DataSets into a Pulsar topic in user-defined format.
- */
-public class PulsarOutputFormat<T> extends BasePulsarOutputFormat<T> {
-
- private static final long serialVersionUID = 2997027580167793000L;
-
- public PulsarOutputFormat(String serviceUrl, String topicName, Authentication authentication,
- final SerializationSchema<T> serializationSchema) {
- super(serviceUrl, topicName, authentication);
- Preconditions.checkNotNull(serializationSchema, "serializationSchema cannot be null.");
- this.serializationSchema = serializationSchema;
- }
-
- public PulsarOutputFormat(final ClientConfigurationData clientConfigurationData,
- final ProducerConfigurationData producerConfigurationData,
- final SerializationSchema<T> serializationSchema) {
- super(clientConfigurationData, producerConfigurationData);
- Preconditions.checkNotNull(serializationSchema, "serializationSchema cannot be null.");
- this.serializationSchema = serializationSchema;
- }
-
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/package-info.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/package-info.java
deleted file mode 100644
index 79a8213..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Implementations of different output formats.
- */
-package org.apache.flink.batch.connectors.pulsar;
\ No newline at end of file
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchema.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchema.java
deleted file mode 100644
index ea71d4d..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchema.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.serialization;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.Encoder;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-
-/**
- * Avro Serialization Schema to serialize Dataset records to Avro.
- */
-public class AvroSerializationSchema<T extends SpecificRecord> implements SerializationSchema<T> {
-
- private static final long serialVersionUID = -6691140169413760919L;
-
- @Override
- public byte[] serialize(T t) {
- if (null == t) {
- return null;
- }
-
- // Writer to serialize Avro record into a byte array.
- DatumWriter<T> writer = new SpecificDatumWriter<>(t.getSchema());
- // Output stream to serialize records into byte array.
- ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
- // Low-level class for serialization of Avro values.
- Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
- arrayOutputStream.reset();
- try {
- writer.write(t, encoder);
- encoder.flush();
- } catch (IOException e) {
- throw new RuntimeException("Error while serializing the record to Avro", e);
- }
-
- return arrayOutputStream.toByteArray();
- }
-
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
deleted file mode 100644
index 4ee6f22..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.serialization;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import org.apache.commons.csv.CSVFormat;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Csv Serialization Schema to serialize Tuples to Csv.
- */
-public class CsvSerializationSchema<T extends Tuple> implements SerializationSchema<T> {
-
- private static final long serialVersionUID = -3379119592495232636L;
- private static final int STRING_WRITER_INITIAL_BUFFER_SIZE = 256;
-
- @Override
- public byte[] serialize(T t) {
- StringWriter stringWriter;
- try {
- Object[] fieldsValues = new Object[t.getArity()];
- for (int index = 0; index < t.getArity(); index++) {
- fieldsValues[index] = (t.getField(index));
- }
-
- stringWriter = new StringWriter(STRING_WRITER_INITIAL_BUFFER_SIZE);
- CSVFormat.DEFAULT.withRecordSeparator("").printRecord(stringWriter, fieldsValues);
- } catch (IOException e) {
- throw new RuntimeException("Error while serializing the record to Csv", e);
- }
-
- return stringWriter.toString().getBytes();
- }
-
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchema.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchema.java
deleted file mode 100644
index b7a56c5..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchema.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.serialization;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-
-/**
- * Json Serialization Schema to serialize Dataset records to Json.
- */
-public class JsonSerializationSchema<T> implements SerializationSchema<T> {
-
- private static final long serialVersionUID = -6938065355389311385L;
- private ObjectMapper mapper = new ObjectMapper();
-
- @Override
- public byte[] serialize(T t) {
- try {
- return mapper.writeValueAsBytes(t);
- } catch (JsonProcessingException e) {
- throw new RuntimeException("Error while serializing the record to Json", e);
- }
- }
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/package-info.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/package-info.java
deleted file mode 100644
index e63f03f..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Implementations of the serialization schemas.
- */
-package org.apache.flink.batch.connectors.pulsar.serialization;
\ No newline at end of file
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java
deleted file mode 100644
index 4de5145..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A Pulsar Client cache that enables client sharing among different flink tasks in same process.
- */
-public class CachedPulsarClient {
- private static final Logger LOG = LoggerFactory.getLogger(CachedPulsarClient.class);
-
- private static int cacheSize = 5;
-
- public static void setCacheSize(int size) {
- cacheSize = size;
- }
-
- private static CacheLoader<ClientConfigurationData, PulsarClientImpl> cacheLoader =
- new CacheLoader<ClientConfigurationData, PulsarClientImpl>() {
- @Override
- public PulsarClientImpl load(ClientConfigurationData key) throws Exception {
- return createPulsarClient(key);
- }
- };
-
- private static RemovalListener<ClientConfigurationData, PulsarClientImpl> removalListener = notification -> {
- ClientConfigurationData config = notification.getKey();
- PulsarClientImpl client = notification.getValue();
- LOG.debug("Evicting pulsar client {} with config {}, due to {}",
- client.toString(), config.toString(), notification.getCause().toString());
- close(config, client);
- };
-
- private static LoadingCache<ClientConfigurationData, PulsarClientImpl> guavaCache =
- CacheBuilder.newBuilder().maximumSize(cacheSize).removalListener(removalListener).build(cacheLoader);
-
- private static PulsarClientImpl createPulsarClient(
- ClientConfigurationData clientConfig) throws PulsarClientException {
- PulsarClientImpl client;
- try {
- client = new PulsarClientImpl(clientConfig);
- LOG.debug("Created a new instance of PulsarClientImpl for clientConf = {}", clientConfig.toString());
- } catch (PulsarClientException e) {
- LOG.error("Failed to create PulsarClientImpl for clientConf = {}", clientConfig.toString());
- throw e;
- }
- return client;
- }
-
- public static PulsarClientImpl getOrCreate(ClientConfigurationData config) throws ExecutionException {
- PulsarClientImpl instance = guavaCache.get(config);
- if (instance.getState().get() == PulsarClientImpl.State.Open) {
- return instance;
- } else {
- guavaCache.invalidate(config);
- return guavaCache.get(config);
- }
- }
-
- private static void close(ClientConfigurationData clientConfig, PulsarClientImpl client) {
- try {
- LOG.info("Closing the Pulsar client with config {}", clientConfig.toString());
- client.close();
- } catch (PulsarClientException e) {
- LOG.warn("Error while closing the Pulsar client with config {}", clientConfig.toString(), e);
- }
- }
-
- static void close(ClientConfigurationData clientConfig) {
- guavaCache.invalidate(clientConfig);
- }
-
- static void clear() {
- LOG.info("Cleaning up guava cache.");
- guavaCache.invalidateAll();
- }
-
- static ConcurrentMap<ClientConfigurationData, PulsarClientImpl> getAsMap() {
- return guavaCache.asMap();
- }
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
deleted file mode 100644
index cf3c657..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ /dev/null
@@ -1,351 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-import java.util.function.Function;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
-import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
-import org.apache.flink.util.SerializableObject;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Flink Sink to produce data into a Pulsar topic.
- */
-public class FlinkPulsarProducer<T>
- extends RichSinkFunction<T>
- implements CheckpointedFunction {
-
- private static final Logger LOG = LoggerFactory.getLogger(FlinkPulsarProducer.class);
-
- private ClientConfigurationData clientConf;
- private ProducerConfigurationData producerConf;
-
- /**
- * (Serializable) SerializationSchema for turning objects used with Flink into.
- * byte[] for Pulsar.
- */
- protected final SerializationSchema<T> schema;
-
- /**
- * User-provided key extractor for assigning a key to a pulsar message.
- */
- protected final PulsarKeyExtractor<T> flinkPulsarKeyExtractor;
-
- /**
- * User-provided properties extractor for assigning a key to a pulsar message.
- */
- protected final PulsarPropertiesExtractor<T> flinkPulsarPropertiesExtractor;
-
- /**
- * Produce Mode.
- */
- protected PulsarProduceMode produceMode = PulsarProduceMode.AT_LEAST_ONCE;
-
- /**
- * If true, the producer will wait until all outstanding records have been send to the broker.
- */
- protected boolean flushOnCheckpoint;
-
- // -------------------------------- Runtime fields ------------------------------------------
-
- /**
- * Pulsar Producer instance.
- */
- protected transient Producer<byte[]> producer;
-
- /**
- * The callback than handles error propagation or logging callbacks.
- */
- protected transient Function<MessageId, MessageId> successCallback;
-
- protected transient Function<Throwable, MessageId> failureCallback;
-
- /**
- * Errors encountered in the async producer are stored here.
- */
- protected transient volatile Exception asyncException;
-
- /**
- * Lock for accessing the pending records.
- */
- protected final SerializableObject pendingRecordsLock = new SerializableObject();
-
- /**
- * Number of unacknowledged records.
- */
- protected long pendingRecords;
-
- public FlinkPulsarProducer(String serviceUrl,
- String defaultTopicName,
- Authentication authentication,
- SerializationSchema<T> serializationSchema,
- PulsarKeyExtractor<T> keyExtractor,
- PulsarPropertiesExtractor<T> propertiesExtractor) {
- checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url cannot be blank");
- checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName cannot be blank");
- checkNotNull(authentication, "auth cannot be null, set disabled for no auth");
-
- clientConf = new ClientConfigurationData();
- producerConf = new ProducerConfigurationData();
-
- this.clientConf.setServiceUrl(serviceUrl);
- this.clientConf.setAuthentication(authentication);
- this.producerConf.setTopicName(defaultTopicName);
- this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
- this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
- this.flinkPulsarPropertiesExtractor = getOrNullPropertiesExtractor(propertiesExtractor);
- ClosureCleaner.ensureSerializable(serializationSchema);
- }
-
- public FlinkPulsarProducer(ClientConfigurationData clientConfigurationData,
- ProducerConfigurationData producerConfigurationData,
- SerializationSchema<T> serializationSchema,
- PulsarKeyExtractor<T> keyExtractor,
- PulsarPropertiesExtractor<T> propertiesExtractor) {
- this.clientConf = checkNotNull(clientConfigurationData, "client conf can not be null");
- this.producerConf = checkNotNull(producerConfigurationData, "producer conf can not be null");
- this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
- this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
- this.flinkPulsarPropertiesExtractor = getOrNullPropertiesExtractor(propertiesExtractor);
- ClosureCleaner.ensureSerializable(serializationSchema);
- }
-
- // ---------------------------------- Properties --------------------------
-
-
- /**
- * @return pulsar key extractor.
- */
- public PulsarKeyExtractor<T> getKeyExtractor() {
- return flinkPulsarKeyExtractor;
- }
-
- /**
- * @return pulsar properties extractor.
- */
- public PulsarPropertiesExtractor<T> getPulsarPropertiesExtractor() {
- return flinkPulsarPropertiesExtractor;
- }
-
- /**
- * Gets this producer's operating mode.
- */
- public PulsarProduceMode getProduceMode() {
- return this.produceMode;
- }
-
- /**
- * Sets this producer's operating mode.
- *
- * @param produceMode The mode of operation.
- */
- public void setProduceMode(PulsarProduceMode produceMode) {
- this.produceMode = checkNotNull(produceMode);
- }
-
- /**
- * If set to true, the Flink producer will wait for all outstanding messages in the Pulsar buffers
- * to be acknowledged by the Pulsar producer on a checkpoint.
- * This way, the producer can guarantee that messages in the Pulsar buffers are part of the checkpoint.
- *
- * @param flush Flag indicating the flushing mode (true = flush on checkpoint)
- */
- public void setFlushOnCheckpoint(boolean flush) {
- this.flushOnCheckpoint = flush;
- }
-
- // ----------------------------------- Sink Methods --------------------------
-
- @SuppressWarnings("unchecked")
- private static <T> PulsarKeyExtractor<T> getOrNullKeyExtractor(PulsarKeyExtractor<T> extractor) {
- if (null == extractor) {
- return PulsarKeyExtractor.NULL;
- } else {
- return extractor;
- }
- }
-
- @SuppressWarnings("unchecked")
- private static <T> PulsarPropertiesExtractor<T> getOrNullPropertiesExtractor(
- PulsarPropertiesExtractor<T> extractor) {
- if (null == extractor) {
- return PulsarPropertiesExtractor.EMPTY;
- } else {
- return extractor;
- }
- }
-
- private Producer<byte[]> createProducer() throws Exception {
- PulsarClientImpl client = CachedPulsarClient.getOrCreate(clientConf);
- return client.createProducerAsync(producerConf).get();
- }
-
- /**
- * Initializes the connection to pulsar.
- *
- * @param parameters configuration used for initialization
- * @throws Exception
- */
- @Override
- public void open(Configuration parameters) throws Exception {
- this.producer = createProducer();
-
- RuntimeContext ctx = getRuntimeContext();
-
- LOG.info("Starting FlinkPulsarProducer ({}/{}) to produce into pulsar topic {}",
- ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), producerConf.getTopicName());
-
- if (flushOnCheckpoint && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
- LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
- flushOnCheckpoint = false;
- }
-
- this.successCallback = msgId -> {
- acknowledgeMessage();
- return msgId;
- };
-
- if (PulsarProduceMode.AT_MOST_ONCE == produceMode) {
- this.failureCallback = cause -> {
- LOG.error("Error while sending record to Pulsar : " + cause.getMessage(), cause);
- return null;
- };
- } else if (PulsarProduceMode.AT_LEAST_ONCE == produceMode) {
- this.failureCallback = cause -> {
- if (null == asyncException) {
- if (cause instanceof Exception) {
- asyncException = (Exception) cause;
- } else {
- asyncException = new Exception(cause);
- }
- }
- return null;
- };
- } else {
- throw new UnsupportedOperationException("Unsupported produce mode " + produceMode);
- }
- }
-
- @Override
- public void invoke(T value, Context context) throws Exception {
- checkErroneous();
-
- byte[] serializedValue = schema.serialize(value);
-
- TypedMessageBuilder<byte[]> msgBuilder = producer.newMessage();
- if (null != context.timestamp()) {
- msgBuilder = msgBuilder.eventTime(context.timestamp());
- }
- String msgKey = flinkPulsarKeyExtractor.getKey(value);
- if (null != msgKey) {
- msgBuilder = msgBuilder.key(msgKey);
- }
-
- if (flushOnCheckpoint) {
- synchronized (pendingRecordsLock) {
- pendingRecords++;
- }
- }
- msgBuilder.value(serializedValue)
- .properties(this.flinkPulsarPropertiesExtractor.getProperties(value))
- .sendAsync()
- .thenApply(successCallback)
- .exceptionally(failureCallback);
- }
-
- @Override
- public void close() throws Exception {
- if (producer != null) {
- producer.close();
- }
-
- // make sure we propagate pending errors
- checkErroneous();
- }
-
- // ------------------- Logic for handling checkpoint flushing -------------------------- //
-
- private void acknowledgeMessage() {
- if (flushOnCheckpoint) {
- synchronized (pendingRecordsLock) {
- pendingRecords--;
- if (pendingRecords == 0) {
- pendingRecordsLock.notifyAll();
- }
- }
- }
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext context) throws Exception {
- // check for asynchronous errors and fail the checkpoint if necessary
- checkErroneous();
-
- if (flushOnCheckpoint) {
- // wait until all the messages are acknowledged
- synchronized (pendingRecordsLock) {
- while (pendingRecords > 0) {
- pendingRecordsLock.wait(100);
- }
- }
-
- // if the flushed requests has errors, we should propagate it also and fail the checkpoint
- checkErroneous();
- }
- }
-
- @Override
- public void initializeState(FunctionInitializationContext context) throws Exception {
- // nothing to do
- }
-
- // ----------------------------------- Utilities --------------------------
-
- protected void checkErroneous() throws Exception {
- Exception e = asyncException;
- if (e != null) {
- // prevent double throwing
- asyncException = null;
- throw new Exception("Failed to send data to Pulsar: " + e.getMessage(), e);
- }
- }
-
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
deleted file mode 100644
index 51b3572..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-import java.util.Arrays;
-
-import org.apache.avro.Schema;
-import org.apache.avro.specific.SpecificData;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.formats.avro.AvroRowSerializationSchema;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
-import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.types.Row;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-
-/**
- * An append-only table sink to emit a streaming table as a Pulsar stream that serializes data in Avro format.
- */
-public class PulsarAvroTableSink implements AppendStreamTableSink<Row> {
-
- protected ClientConfigurationData clientConfigurationData;
- protected ProducerConfigurationData producerConfigurationData;
- protected final String routingKeyFieldName;
- protected SerializationSchema<Row> serializationSchema;
- protected String[] fieldNames;
- protected TypeInformation[] fieldTypes;
- protected PulsarKeyExtractor<Row> keyExtractor;
- protected PulsarPropertiesExtractor<Row> propertiesExtractor;
- private Class<? extends SpecificRecord> recordClazz;
-
- /**
- * Create PulsarAvroTableSink.
- *
- * @param serviceUrl pulsar service url
- * @param topic topic in pulsar to which table is written
- * @param routingKeyFieldName routing key field name
- */
- public PulsarAvroTableSink(
- String serviceUrl,
- String topic,
- Authentication authentication,
- String routingKeyFieldName,
- Class<? extends SpecificRecord> recordClazz) {
- checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url not set");
- checkArgument(StringUtils.isNotBlank(topic), "Topic is null");
- checkNotNull(authentication, "authentication is null, set new AuthenticationDisabled() instead");
-
- clientConfigurationData = new ClientConfigurationData();
- producerConfigurationData = new ProducerConfigurationData();
-
- clientConfigurationData.setServiceUrl(serviceUrl);
- clientConfigurationData.setAuthentication(authentication);
- producerConfigurationData.setTopicName(topic);
- this.routingKeyFieldName = routingKeyFieldName;
- this.recordClazz = recordClazz;
- }
-
- public PulsarAvroTableSink(
- ClientConfigurationData clientConfigurationData,
- ProducerConfigurationData producerConfigurationData,
- String routingKeyFieldName,
- Class<? extends SpecificRecord> recordClazz) {
- this.clientConfigurationData = checkNotNull(clientConfigurationData, "client config can not be null");
- this.producerConfigurationData = checkNotNull(producerConfigurationData, "producer config can not be null");
-
- checkArgument(StringUtils.isNotBlank(clientConfigurationData.getServiceUrl()), "Service url not set");
- checkArgument(StringUtils.isNotBlank(producerConfigurationData.getTopicName()), "Topic is null");
-
- this.routingKeyFieldName = routingKeyFieldName;
- this.recordClazz = recordClazz;
- }
-
- /**
- * Returns the low-level producer.
- */
- protected FlinkPulsarProducer<Row> createFlinkPulsarProducer() {
- serializationSchema = new AvroRowSerializationSchema(recordClazz);
- return new FlinkPulsarProducer<Row>(
- clientConfigurationData,
- producerConfigurationData,
- serializationSchema,
- keyExtractor,
- propertiesExtractor);
- }
-
- @Override
- public void emitDataStream(DataStream<Row> dataStream) {
- checkState(fieldNames != null, "Table sink is not configured");
- checkState(fieldTypes != null, "Table sink is not configured");
- checkState(serializationSchema != null, "Table sink is not configured");
- checkState(keyExtractor != null, "Table sink is not configured");
- FlinkPulsarProducer<Row> producer = createFlinkPulsarProducer();
- dataStream.addSink(producer);
- }
-
- @Override
- public TypeInformation<Row> getOutputType() {
- RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames);
- return rowTypeInfo;
- }
-
- @Override
- public String[] getFieldNames() {
- return fieldNames;
- }
-
- @Override
- public TypeInformation<?>[] getFieldTypes() {
- return fieldTypes;
- }
-
- @Override
- public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
- PulsarAvroTableSink sink = new PulsarAvroTableSink(
- clientConfigurationData, producerConfigurationData, routingKeyFieldName, recordClazz);
-
- sink.fieldNames = checkNotNull(fieldNames, "Field names are null");
- sink.fieldTypes = checkNotNull(fieldTypes, "Field types are null");
- checkArgument(fieldNames.length == fieldTypes.length,
- "Number of provided field names and types do not match");
-
- sink.serializationSchema = new AvroRowSerializationSchema(recordClazz);
- sink.keyExtractor = new AvroKeyExtractor(
- routingKeyFieldName,
- fieldNames,
- fieldTypes,
- recordClazz);
- sink.propertiesExtractor = PulsarPropertiesExtractor.EMPTY;
-
- return sink;
- }
-
-
- /**
- * A key extractor that extracts the routing key from a {@link Row} by field name.
- */
- private static class AvroKeyExtractor implements PulsarKeyExtractor<Row> {
- private final int keyIndex;
-
- public AvroKeyExtractor(
- String keyFieldName,
- String[] fieldNames,
- TypeInformation<?>[] fieldTypes,
- Class<? extends SpecificRecord> recordClazz) {
-
- checkArgument(fieldNames.length == fieldTypes.length,
- "Number of provided field names and types does not match.");
-
- Schema schema = SpecificData.get().getSchema(recordClazz);
- Schema.Field keyField = schema.getField(keyFieldName);
- Schema.Type keyType = keyField.schema().getType();
-
- int keyIndex = Arrays.asList(fieldNames).indexOf(keyFieldName);
- checkArgument(keyIndex >= 0,
- "Key field '" + keyFieldName + "' not found");
-
- checkArgument(Schema.Type.STRING.equals(keyType),
- "Key field must be of type 'STRING'");
- this.keyIndex = keyIndex;
- }
-
- @Override
- public String getKey(Row event) {
- return event.getField(keyIndex).toString();
- }
- }
-
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
deleted file mode 100644
index 30361a0..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.util.IOUtils;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Pulsar source (consumer) which receives messages from a topic and acknowledges messages.
- * When checkpointing is enabled, it guarantees at least once processing semantics.
- *
- * <p>When checkpointing is disabled, it auto acknowledges messages based on the number of messages it has
- * received. In this mode messages may be dropped.
- */
-class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageId> implements PulsarSourceBase<T> {
-
- private static final Logger LOG = LoggerFactory.getLogger(PulsarConsumerSource.class);
-
- private int messageReceiveTimeoutMs;
-
- private ClientConfigurationData clientConfigurationData;
- private ConsumerConfigurationData<byte[]> consumerConfigurationData;
-
- private final DeserializationSchema<T> deserializer;
-
- private PulsarClient client;
- private Consumer<byte[]> consumer;
-
- private boolean isCheckpointingEnabled;
-
- private final long acknowledgementBatchSize;
- private long batchCount;
-
- private transient volatile boolean isRunning;
-
- PulsarConsumerSource(PulsarSourceBuilder<T> builder) {
- super(MessageId.class);
-
- clientConfigurationData = new ClientConfigurationData();
- consumerConfigurationData = new ConsumerConfigurationData<>();
-
- this.clientConfigurationData = builder.clientConfigurationData;
- this.consumerConfigurationData = builder.consumerConfigurationData;
- this.deserializer = builder.deserializationSchema;
- this.acknowledgementBatchSize = builder.acknowledgementBatchSize;
- this.messageReceiveTimeoutMs = builder.messageReceiveTimeoutMs;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
-
- final RuntimeContext context = getRuntimeContext();
- if (context instanceof StreamingRuntimeContext) {
- isCheckpointingEnabled = ((StreamingRuntimeContext) context).isCheckpointingEnabled();
- }
-
- client = getClient();
- consumer = createConsumer(client);
-
- isRunning = true;
- }
-
- @Override
- protected void acknowledgeIDs(long checkpointId, Set<MessageId> messageIds) {
- if (consumer == null) {
- LOG.error("null consumer unable to acknowledge messages");
- throw new RuntimeException("null pulsar consumer unable to acknowledge messages");
- }
-
- if (messageIds.isEmpty()) {
- LOG.info("no message ids to acknowledge");
- return;
- }
-
- Map<String, CompletableFuture<Void>> futures = new HashMap<>(messageIds.size());
- for (MessageId id : messageIds) {
- futures.put(id.toString(), consumer.acknowledgeAsync(id));
- }
-
- futures.forEach((k, f) -> {
- try {
- f.get();
- } catch (Exception e) {
- LOG.error("failed to acknowledge messageId " + k, e);
- throw new RuntimeException("Messages could not be acknowledged during checkpoint creation.", e);
- }
- });
- }
-
- @Override
- public void run(SourceContext<T> context) throws Exception {
- Message message;
- while (isRunning) {
- message = consumer.receive(messageReceiveTimeoutMs, TimeUnit.MILLISECONDS);
- if (message == null) {
- continue;
- }
-
- if (isCheckpointingEnabled) {
- emitCheckpointing(context, message);
- } else {
- emitAutoAcking(context, message);
- }
- }
- }
-
- private void emitCheckpointing(SourceContext<T> context, Message message) throws IOException {
- synchronized (context.getCheckpointLock()) {
- if (!addId(message.getMessageId())) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("messageId=" + message.getMessageId().toString() + " already processed.");
- }
- return;
- }
- context.collect(deserialize(message));
- }
- }
-
- private void emitAutoAcking(SourceContext<T> context, Message message) throws IOException {
- context.collect(deserialize(message));
- batchCount++;
- if (batchCount >= acknowledgementBatchSize) {
- LOG.info("processed {} messages acknowledging messageId {}", batchCount, message.getMessageId());
- consumer.acknowledgeCumulative(message.getMessageId());
- batchCount = 0;
- }
- }
-
- protected T deserialize(Message message) throws IOException {
- return deserializer.deserialize(message.getData());
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- IOUtils.cleanup(LOG, consumer);
- IOUtils.cleanup(LOG, client);
- }
-
- @Override
- public TypeInformation<T> getProducedType() {
- return deserializer.getProducedType();
- }
-
- boolean isCheckpointingEnabled() {
- return isCheckpointingEnabled;
- }
-
- PulsarClient getClient() throws ExecutionException {
- return CachedPulsarClient.getOrCreate(clientConfigurationData);
- }
-
- Consumer<byte[]> createConsumer(PulsarClient client) throws PulsarClientException {
- return ((PulsarClientImpl) client).subscribeAsync(consumerConfigurationData).join();
- }
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
deleted file mode 100644
index b6f82a5..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.formats.json.JsonRowSerializationSchema;
-import org.apache.flink.types.Row;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-
-/**
- * Base class for {@link PulsarTableSink} that serializes data in JSON format.
- */
-public class PulsarJsonTableSink extends PulsarTableSink {
-
- /**
- * Create PulsarJsonTableSink.
- *
- * @param serviceUrl pulsar service url
- * @param topic topic in pulsar to which table is written
- * @param authentication authetication info required by pulsar client
- * @param routingKeyFieldName routing key field name
- */
- public PulsarJsonTableSink(
- String serviceUrl,
- String topic,
- Authentication authentication,
- String routingKeyFieldName) {
- super(serviceUrl, topic, authentication, routingKeyFieldName);
- }
-
- public PulsarJsonTableSink(
- ClientConfigurationData clientConfigurationData,
- ProducerConfigurationData producerConfigurationData,
- String routingKeyFieldName) {
- super(clientConfigurationData, producerConfigurationData, routingKeyFieldName);
- }
-
- @Override
- protected SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema) {
- return new JsonRowSerializationSchema(rowSchema);
- }
-
- @Override
- protected PulsarTableSink createSink() {
- return new PulsarJsonTableSink(
- clientConfigurationData,
- producerConfigurationData,
- routingKeyFieldName);
- }
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java
deleted file mode 100644
index d42f5c3..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-/**
- * The supported producing modes of operation for flink's pulsar producer.
- */
-public enum PulsarProduceMode {
-
- /**
- * Any produce failures will be ignored hence there could be data loss.
- */
- AT_MOST_ONCE,
-
- /**
- * The producer will ensure that all the events are persisted in pulsar.
- * There could be duplicate events written though.
- */
- AT_LEAST_ONCE,
-
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBase.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBase.java
deleted file mode 100644
index 9d44215..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBase.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-
-/**
- * Base class for pulsar sources.
- * @param <T>
- */
-@PublicEvolving
-interface PulsarSourceBase<T> extends ParallelSourceFunction<T>, ResultTypeQueryable<T> {
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
deleted file mode 100644
index 67690be..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.regex.Pattern;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.util.Preconditions;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-
-/**
- * A class for building a pulsar source.
- */
-@PublicEvolving
-public class PulsarSourceBuilder<T> {
-
- private static final String SERVICE_URL = "pulsar://localhost:6650";
- private static final long ACKNOWLEDGEMENT_BATCH_SIZE = 100;
- private static final long MAX_ACKNOWLEDGEMENT_BATCH_SIZE = 1000;
- private static final int DEFAULT_MESSAGE_RECEIVE_TIMEOUT_MS = 100;
- private static final String SUBSCRIPTION_NAME = "flink-sub";
-
- final DeserializationSchema<T> deserializationSchema;
-
- ClientConfigurationData clientConfigurationData;
- ConsumerConfigurationData<byte[]> consumerConfigurationData;
-
- long acknowledgementBatchSize = ACKNOWLEDGEMENT_BATCH_SIZE;
- //
- int messageReceiveTimeoutMs = DEFAULT_MESSAGE_RECEIVE_TIMEOUT_MS;
-
- private PulsarSourceBuilder(DeserializationSchema<T> deserializationSchema) {
- this.deserializationSchema = deserializationSchema;
-
- clientConfigurationData = new ClientConfigurationData();
- consumerConfigurationData = new ConsumerConfigurationData<>();
- clientConfigurationData.setServiceUrl(SERVICE_URL);
- consumerConfigurationData.setTopicNames(new TreeSet<>());
- consumerConfigurationData.setSubscriptionName(SUBSCRIPTION_NAME);
- consumerConfigurationData.setSubscriptionInitialPosition(SubscriptionInitialPosition.Latest);
- }
-
- /**
- * Sets the pulsar service url to connect to. Defaults to pulsar://localhost:6650.
- *
- * @param serviceUrl service url to connect to
- * @return this builder
- */
- public PulsarSourceBuilder<T> serviceUrl(String serviceUrl) {
- Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl), "serviceUrl cannot be blank");
- this.clientConfigurationData.setServiceUrl(serviceUrl);
- return this;
- }
-
- /**
- * Sets topics to consumer from. This is required.
- *
- * <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
- * are in the following format:
- * {persistent|non-persistent}://tenant/namespace/topic
- *
- * @param topics the topic to consumer from
- * @return this builder
- */
- public PulsarSourceBuilder<T> topic(String... topics) {
- Preconditions.checkArgument(topics != null && topics.length > 0,
- "topics cannot be blank");
- for (String topic : topics) {
- Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topicNames cannot have blank topic");
- }
- this.consumerConfigurationData.getTopicNames().addAll(Arrays.asList(topics));
- return this;
- }
-
- /**
- * Sets topics to consumer from. This is required.
- *
- * <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
- * are in the following format:
- * {persistent|non-persistent}://tenant/namespace/topic
- *
- * @param topics the topic to consumer from
- * @return this builder
- */
- public PulsarSourceBuilder<T> topics(List<String> topics) {
- Preconditions.checkArgument(topics != null && !topics.isEmpty(), "topics cannot be blank");
- topics.forEach(topicName ->
- Preconditions.checkArgument(StringUtils.isNotBlank(topicName), "topicNames cannot have blank topic"));
- this.consumerConfigurationData.getTopicNames().addAll(topics);
- return this;
- }
-
- /**
- * Use topic pattern to config sets of topics to consumer.
- *
- * <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
- * are in the following format:
- * {persistent|non-persistent}://tenant/namespace/topic
- *
- * @param topicsPattern topic pattern to consumer from
- * @return this builder
- */
- public PulsarSourceBuilder<T> topicsPattern(Pattern topicsPattern) {
- Preconditions.checkArgument(topicsPattern != null, "Param topicsPattern cannot be null");
- Preconditions.checkArgument(this.consumerConfigurationData.getTopicsPattern() == null,
- "Pattern has already been set.");
- this.consumerConfigurationData.setTopicsPattern(topicsPattern);
- return this;
- }
-
- /**
- * Use topic pattern to config sets of topics to consumer.
- *
- * <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
- * are in the following format:
- * {persistent|non-persistent}://tenant/namespace/topic
- *
- * @param topicsPattern topic pattern string to consumer from
- * @return this builder
- */
- public PulsarSourceBuilder<T> topicsPatternString(String topicsPattern) {
- Preconditions.checkArgument(StringUtils.isNotBlank(topicsPattern), "Topics pattern string cannot be blank");
- Preconditions.checkArgument(this.consumerConfigurationData.getTopicsPattern() == null,
- "Pattern has already been set.");
- this.consumerConfigurationData.setTopicsPattern(Pattern.compile(topicsPattern));
- return this;
- }
-
- /**
- * Sets the subscription name for the topic consumer. Defaults to flink-sub.
- *
- * @param subscriptionName the subscription name for the topic consumer
- * @return this builder
- */
- public PulsarSourceBuilder<T> subscriptionName(String subscriptionName) {
- Preconditions.checkArgument(StringUtils.isNotBlank(subscriptionName),
- "subscriptionName cannot be blank");
- this.consumerConfigurationData.setSubscriptionName(subscriptionName);
- return this;
- }
-
- /**
- * Sets the subscription initial position for the topic consumer.
- * Default is {@link SubscriptionInitialPosition#Latest}
- *
- * @param initialPosition the subscription initial position.
- * @return this builder
- */
- public PulsarSourceBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition initialPosition) {
- Preconditions.checkNotNull(initialPosition, "subscription initial position cannot be null");
- this.consumerConfigurationData.setSubscriptionInitialPosition(initialPosition);
- return this;
- }
-
- /**
- * Sets the number of messages to receive before acknowledging. This defaults to 100. This
- * value is only used when checkpointing is disabled.
- *
- * @param size number of messages to receive before acknowledging
- * @return this builder
- */
- public PulsarSourceBuilder<T> acknowledgementBatchSize(long size) {
- if (size > 0 && size <= MAX_ACKNOWLEDGEMENT_BATCH_SIZE) {
- acknowledgementBatchSize = size;
- return this;
- }
- throw new IllegalArgumentException(
- "acknowledgementBatchSize can only take values > 0 and <= " + MAX_ACKNOWLEDGEMENT_BATCH_SIZE);
- }
-
- /**
- * parameterize messageReceiveTimeoutMs for `PulsarConsumerSource`.
- * @param timeout timeout in ms, should be gt 0
- * @return this builder
- */
- public PulsarSourceBuilder<T> messageReceiveTimeoutMs(int timeout) {
- if (timeout <= 0) {
- throw new IllegalArgumentException("messageReceiveTimeoutMs can only take values > 0");
- }
- this.messageReceiveTimeoutMs = timeout;
- return this;
- }
-
- /**
- * Set the authentication provider to use in the Pulsar client instance.
- *
- * @param authentication an instance of the {@link Authentication} provider already constructed
- * @return this builder
- */
- public PulsarSourceBuilder<T> authentication(Authentication authentication) {
- Preconditions.checkArgument(authentication != null,
- "authentication instance can not be null, use new AuthenticationDisabled() to disable authentication");
- this.clientConfigurationData.setAuthentication(authentication);
- return this;
- }
-
- /**
- * Configure the authentication provider to use in the Pulsar client instance.
- *
- * @param authPluginClassName
- * name of the Authentication-Plugin to use
- * @param authParamsString
- * string which represents parameters for the Authentication-Plugin, e.g., "key1:val1,key2:val2"
- * @return this builder
- * @throws PulsarClientException.UnsupportedAuthenticationException
- * failed to instantiate specified Authentication-Plugin
- */
- public PulsarSourceBuilder<T> authentication(String authPluginClassName, String authParamsString)
- throws PulsarClientException.UnsupportedAuthenticationException {
- Preconditions.checkArgument(StringUtils.isNotBlank(authPluginClassName),
- "Authentication-Plugin class name can not be blank");
- Preconditions.checkArgument(StringUtils.isNotBlank(authParamsString),
- "Authentication-Plugin parameters can not be blank");
- this.clientConfigurationData
- .setAuthentication(AuthenticationFactory.create(authPluginClassName, authParamsString));
- return this;
- }
-
- /**
- * Configure the authentication provider to use in the Pulsar client instance
- * using a config map.
- *
- * @param authPluginClassName
- * name of the Authentication-Plugin you want to use
- * @param authParams
- * map which represents parameters for the Authentication-Plugin
- * @return this builder
- * @throws PulsarClientException.UnsupportedAuthenticationException
- * failed to instantiate specified Authentication-Plugin
- */
- public PulsarSourceBuilder<T> authentication(String authPluginClassName, Map<String, String> authParams)
- throws PulsarClientException.UnsupportedAuthenticationException {
- Preconditions.checkArgument(StringUtils.isNotBlank(authPluginClassName),
- "Authentication-Plugin class name can not be blank");
- Preconditions.checkArgument((authParams != null && !authParams.isEmpty()),
- "parameters to authentication plugin can not be null/empty");
- this.clientConfigurationData.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParams));
- return this;
- }
-
- /**
- *
- * @param clientConfigurationData All client conf wrapped in a POJO
- * @return this builder
- */
- public PulsarSourceBuilder<T> pulsarAllClientConf(ClientConfigurationData clientConfigurationData) {
- Preconditions.checkNotNull(clientConfigurationData, "client conf should not be null");
- this.clientConfigurationData = clientConfigurationData;
- return this;
- }
-
- /**
- *
- * @param consumerConfigurationData All consumer conf wrapped in a POJO
- * @return this builder
- */
- public PulsarSourceBuilder<T> pulsarAllConsumerConf(ConsumerConfigurationData consumerConfigurationData) {
- Preconditions.checkNotNull(consumerConfigurationData, "consumer conf should not be null");
- this.consumerConfigurationData = consumerConfigurationData;
- return this;
- }
-
-
- public SourceFunction<T> build() throws PulsarClientException{
- Preconditions.checkArgument(StringUtils.isNotBlank(this.clientConfigurationData.getServiceUrl()),
- "a service url is required");
- Preconditions.checkArgument((this.consumerConfigurationData.getTopicNames() != null
- && !this.consumerConfigurationData.getTopicNames().isEmpty())
- || this.consumerConfigurationData.getTopicsPattern() != null,
- "At least one topic or topics pattern is required");
- Preconditions.checkArgument(StringUtils.isNotBlank(this.consumerConfigurationData.getSubscriptionName()),
- "a subscription name is required");
-
- setTransientFields();
-
- return new PulsarConsumerSource<>(this);
- }
-
- private void setTransientFields() throws PulsarClientException {
- setAuth();
- }
-
- private void setAuth() throws PulsarClientException{
- if (StringUtils.isBlank(this.clientConfigurationData.getAuthPluginClassName())
- || StringUtils.isBlank(this.clientConfigurationData.getAuthParams())) {
- return;
- }
-
- clientConfigurationData.setAuthentication(
- AuthenticationFactory.create(
- this.clientConfigurationData.getAuthPluginClassName(),
- this.clientConfigurationData.getAuthParams()));
- }
-
- /**
- * Creates a PulsarSourceBuilder.
- *
- * @param deserializationSchema the deserializer used to convert between Pulsar's byte messages and Flink's objects.
- * @return a builder
- */
- public static <T> PulsarSourceBuilder<T> builder(DeserializationSchema<T> deserializationSchema) {
- Preconditions.checkNotNull(deserializationSchema, "deserializationSchema cannot be null");
- return new PulsarSourceBuilder<>(deserializationSchema);
- }
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
deleted file mode 100644
index 6d7479c..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
-import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.types.Row;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-
-/**
- * An append-only table sink to emit a streaming table as a Pulsar stream.
- */
-public abstract class PulsarTableSink implements AppendStreamTableSink<Row> {
-
- protected ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
- protected ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData();
- protected SerializationSchema<Row> serializationSchema;
- protected PulsarKeyExtractor<Row> keyExtractor;
- protected PulsarPropertiesExtractor<Row> propertiesExtractor;
- protected String[] fieldNames;
- protected TypeInformation[] fieldTypes;
- protected final String routingKeyFieldName;
-
- public PulsarTableSink(
- String serviceUrl,
- String topic,
- Authentication authentication,
- String routingKeyFieldName) {
- checkNotNull(serviceUrl, "Service url not set");
- checkNotNull(topic, "Topic is null");
- this.clientConfigurationData.setServiceUrl(serviceUrl);
- this.clientConfigurationData.setAuthentication(authentication);
- this.producerConfigurationData.setTopicName(topic);
- this.routingKeyFieldName = routingKeyFieldName;
- }
-
- public PulsarTableSink(
- ClientConfigurationData clientConfigurationData,
- ProducerConfigurationData producerConfigurationData,
- String routingKeyFieldName) {
- this.clientConfigurationData = checkNotNull(clientConfigurationData, "client config is null");
- this.producerConfigurationData = checkNotNull(producerConfigurationData, "producer config is null");
- this.routingKeyFieldName = routingKeyFieldName;
- }
-
- /**
- * Create serialization schema for converting table rows into bytes.
- *
- * @param rowSchema the schema of the row to serialize.
- * @return Instance of serialization schema
- */
- protected abstract SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema);
-
- /**
- * Create a deep copy of this sink.
- *
- * @return Deep copy of this sink
- */
- protected abstract PulsarTableSink createSink();
-
- /**
- * Returns the low-level producer.
- */
- protected FlinkPulsarProducer<Row> createFlinkPulsarProducer() {
- return new FlinkPulsarProducer<>(
- clientConfigurationData,
- producerConfigurationData,
- serializationSchema,
- keyExtractor,
- propertiesExtractor);
- }
-
- @Override
- public void emitDataStream(DataStream<Row> dataStream) {
- checkState(fieldNames != null, "Table sink is not configured");
- checkState(fieldTypes != null, "Table sink is not configured");
- checkState(serializationSchema != null, "Table sink is not configured");
- checkState(keyExtractor != null, "Table sink is not configured");
-
- FlinkPulsarProducer<Row> producer = createFlinkPulsarProducer();
- dataStream.addSink(producer);
- }
-
- @Override
- public TypeInformation<Row> getOutputType() {
- return new RowTypeInfo(fieldTypes, fieldNames);
- }
-
- @Override
- public String[] getFieldNames() {
- return fieldNames;
- }
-
- @Override
- public TypeInformation<?>[] getFieldTypes() {
- return fieldTypes;
- }
-
- @Override
- public TableSink<Row> configure(String[] fieldNames,
- TypeInformation<?>[] fieldTypes) {
-
- PulsarTableSink sink = createSink();
-
- sink.fieldNames = checkNotNull(fieldNames, "Field names are null");
- sink.fieldTypes = checkNotNull(fieldTypes, "Field types are null");
- checkArgument(fieldNames.length == fieldTypes.length,
- "Number of provided field names and types do not match");
-
- RowTypeInfo rowSchema = new RowTypeInfo(fieldTypes, fieldNames);
- sink.serializationSchema = createSerializationSchema(rowSchema);
- sink.keyExtractor = new RowKeyExtractor(
- routingKeyFieldName,
- fieldNames,
- fieldTypes);
- sink.propertiesExtractor = PulsarPropertiesExtractor.EMPTY;
-
- return sink;
- }
-
- /**
- * A key extractor that extracts the routing key from a {@link Row} by field name.
- */
- private static class RowKeyExtractor implements PulsarKeyExtractor<Row> {
-
- private final int keyIndex;
-
- public RowKeyExtractor(
- String keyFieldName,
- String[] fieldNames,
- TypeInformation<?>[] fieldTypes) {
- checkArgument(fieldNames.length == fieldTypes.length,
- "Number of provided field names and types does not match.");
- int keyIndex = Arrays.asList(fieldNames).indexOf(keyFieldName);
- checkArgument(keyIndex >= 0,
- "Key field '" + keyFieldName + "' not found");
- checkArgument(Types.STRING.equals(fieldTypes[keyIndex]),
- "Key field must be of type 'STRING'");
- this.keyIndex = keyIndex;
- }
-
- @Override
- public String getKey(Row event) {
- return (String) event.getField(keyIndex);
- }
- }
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/package-info.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/package-info.java
deleted file mode 100644
index 6ed4a01..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Classes for implementing pulsar flink connector.
- */
-package org.apache.flink.streaming.connectors.pulsar;
\ No newline at end of file
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
deleted file mode 100644
index c8a858d..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar.partitioner;
-
-import java.io.Serializable;
-
-/**
- * Extract key from a value.
- */
-public interface PulsarKeyExtractor<T> extends Serializable {
-
- PulsarKeyExtractor NULL = in -> null;
-
- /**
- * Retrieve a key from the value.
- *
- * @param in the value to extract a key.
- * @return key.
- */
- String getKey(T in);
-
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarPropertiesExtractor.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarPropertiesExtractor.java
deleted file mode 100644
index 67e86f3..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarPropertiesExtractor.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar.partitioner;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Extract message properties from a value or others.
- */
-public interface PulsarPropertiesExtractor<T> extends Serializable {
-
- PulsarPropertiesExtractor EMPTY = in -> Collections.emptyMap();
-
- /**
- * Retrieve properties from the value or others.
- *
- * @param in the value to extract a key.
- * @return key.
- */
- Map<String, String> getProperties(T in);
-
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/package-info.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/package-info.java
deleted file mode 100644
index 7a25be1..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Classes for implementing key extractors.
- */
-package org.apache.flink.streaming.connectors.pulsar.partitioner;
\ No newline at end of file
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
deleted file mode 100644
index 736c417..0000000
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertNotNull;
-
-
-/**
- * Tests for Pulsar Avro Output Format
- */
-public class PulsarAvroOutputFormatTest {
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarAvroOutputFormatConstructorWhenServiceUrlIsNull() {
- new PulsarAvroOutputFormat(null, "testTopic", new AuthenticationDisabled());
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarAvroOutputFormatConstructorWhenTopicNameIsNull() {
- new PulsarAvroOutputFormat("testServiceUrl", null, new AuthenticationDisabled());
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarAvroOutputFormatConstructorWhenTopicNameIsBlank() {
- new PulsarAvroOutputFormat("testServiceUrl", " ", new AuthenticationDisabled());
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarAvroOutputFormatConstructorWhenServiceUrlIsBlank() {
- new PulsarAvroOutputFormat(" ", "testTopic", new AuthenticationDisabled());
- }
-
- @Test
- public void testPulsarAvroOutputFormatConstructor() {
- PulsarAvroOutputFormat pulsarAvroOutputFormat =
- new PulsarAvroOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled());
- assertNotNull(pulsarAvroOutputFormat);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarAvroOutputFormatConstructorV2WhenServiceUrlIsNull() {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl(null);
-
- ProducerConfigurationData producerConf = new ProducerConfigurationData();
- producerConf.setTopicName("testTopic");
-
- new PulsarAvroOutputFormat(clientConf, producerConf);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarAvroOutputFormatConstructorV2WhenTopicNameIsNull() {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl("testServiceUrl");
-
- ProducerConfigurationData producerConf = new ProducerConfigurationData();
- producerConf.setTopicName(null);
-
- new PulsarAvroOutputFormat(clientConf, producerConf);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarAvroOutputFormatConstructorV2WhenTopicNameIsBlank() {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl("testServiceUrl");
-
- ProducerConfigurationData producerConf = new ProducerConfigurationData();
- producerConf.setTopicName(StringUtils.EMPTY);
-
- new PulsarAvroOutputFormat(clientConf, producerConf);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarAvroOutputFormatConstructorV2WhenServiceUrlIsBlank() {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl(StringUtils.EMPTY);
-
- ProducerConfigurationData producerConf = new ProducerConfigurationData();
- producerConf.setTopicName(StringUtils.EMPTY);
-
- new PulsarAvroOutputFormat(clientConf, producerConf);
- }
-
- @Test
- public void testPulsarAvroOutputFormatConstructorV2() {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl("testServiceUrl");
-
- ProducerConfigurationData producerConf = new ProducerConfigurationData();
- producerConf.setTopicName("testTopic");
-
- PulsarAvroOutputFormat pulsarAvroOutputFormat = new PulsarAvroOutputFormat(clientConf, producerConf);
- assertNotNull(pulsarAvroOutputFormat);
- }
-}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
deleted file mode 100644
index 713f867..0000000
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertNotNull;
-
-/**
- * Tests for Pulsar Csv Output Format
- */
-public class PulsarCsvOutputFormatTest {
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarCsvOutputFormatConstructorWhenServiceUrlIsNull() {
- new PulsarCsvOutputFormat(null, "testTopic", new AuthenticationDisabled());
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarCsvOutputFormatConstructorWhenTopicNameIsNull() {
- new PulsarCsvOutputFormat("testServiceUrl", null, new AuthenticationDisabled());
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarCsvOutputFormatConstructorWhenTopicNameIsBlank() {
- new PulsarCsvOutputFormat("testServiceUrl", " ", new AuthenticationDisabled());
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarCsvOutputFormatConstructorWhenServiceUrlIsBlank() {
- new PulsarCsvOutputFormat(" ", "testTopic", new AuthenticationDisabled());
- }
-
- @Test
- public void testPulsarCsvOutputFormatConstructor() {
- PulsarCsvOutputFormat pulsarCsvOutputFormat =
- new PulsarCsvOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled());
- assertNotNull(pulsarCsvOutputFormat);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarCsvOutputFormatConstructorV2WhenServiceUrlIsNull() {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl(null);
-
- ProducerConfigurationData producerConf = new ProducerConfigurationData();
- producerConf.setTopicName("testTopic");
-
- new PulsarAvroOutputFormat(clientConf, producerConf);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarCsvOutputFormatConstructorV2WhenTopicNameIsNull() {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl("testServiceUrl");
-
- ProducerConfigurationData producerConf = new ProducerConfigurationData();
- producerConf.setTopicName(null);
-
- new PulsarAvroOutputFormat(clientConf, producerConf);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarCsvOutputFormatConstructorV2WhenTopicNameIsBlank() {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl("testServiceUrl");
-
- ProducerConfigurationData producerConf = new ProducerConfigurationData();
- producerConf.setTopicName(StringUtils.EMPTY);
-
- new PulsarAvroOutputFormat(clientConf, producerConf);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarCsvOutputFormatConstructorV2WhenServiceUrlIsBlank() {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl(StringUtils.EMPTY);
-
- ProducerConfigurationData producerConf = new ProducerConfigurationData();
- producerConf.setTopicName("testTopic");
-
- new PulsarAvroOutputFormat(clientConf, producerConf);
- }
-
- @Test
- public void testPulsarCsvOutputFormatConstructorV2() {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl("testServiceUrl");
-
- ProducerConfigurationData producerConf = new ProducerConfigurationData();
- producerConf.setTopicName("testTopic");
-
- PulsarCsvOutputFormat pulsarCsvOutputFormat = new PulsarCsvOutputFormat(clientConf, producerConf);
- assertNotNull(pulsarCsvOutputFormat);
- }
-}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
deleted file mode 100644
index d45d9b1..0000000
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertNotNull;
-
-/**
- * Tests for Pulsar Json Output Format
- */
-public class PulsarJsonOutputFormatTest {
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarJsonOutputFormatConstructorWhenServiceUrlIsNull() {
- new PulsarJsonOutputFormat(null, "testTopic", new AuthenticationDisabled());
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarJsonOutputFormatConstructorWhenTopicNameIsNull() {
- new PulsarJsonOutputFormat("testServiceUrl", null, new AuthenticationDisabled());
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarJsonOutputFormatConstructorWhenTopicNameIsBlank() {
- new PulsarJsonOutputFormat("testServiceUrl", " ", new AuthenticationDisabled());
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarJsonOutputFormatConstructorWhenServiceUrlIsBlank() {
- new PulsarJsonOutputFormat(" ", "testTopic", new AuthenticationDisabled());
- }
-
- @Test
- public void testPulsarJsonOutputFormatConstructor() {
- PulsarJsonOutputFormat pulsarJsonOutputFormat =
- new PulsarJsonOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled());
- assertNotNull(pulsarJsonOutputFormat);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarJsonOutputFormatConstructorV2WhenServiceUrlIsNull() {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl(null);
-
- ProducerConfigurationData producerConf = new ProducerConfigurationData();
- producerConf.setTopicName("testTopic");
-
- new PulsarAvroOutputFormat(clientConf, producerConf);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarJsonROutputFormatConstructorV2WhenTopicNameIsNull() {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl("testServiceUrl");
-
- ProducerConfigurationData producerConf = new ProducerConfigurationData();
- producerConf.setTopicName(null);
-
- new PulsarAvroOutputFormat(clientConf, producerConf);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarJsonOutputFormatConstructorV2WhenTopicNameIsBlank() {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl("testServiceUrl");
-
- ProducerConfigurationData producerConf = new ProducerConfigurationData();
- producerConf.setTopicName(StringUtils.EMPTY);
-
- new PulsarAvroOutputFormat(clientConf, producerConf);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarJsonOutputFormatConstructorV2WhenServiceUrlIsBlank() {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl(StringUtils.EMPTY);
-
- ProducerConfigurationData producerConf = new ProducerConfigurationData();
- producerConf.setTopicName("testTopic");
-
- new PulsarAvroOutputFormat(clientConf, producerConf);
- }
-
- @Test
- public void testPulsarJsonOutputFormatConstructorV2() {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl("testServiceUrl");
-
- ProducerConfigurationData producerConf = new ProducerConfigurationData();
- producerConf.setTopicName("testTopic");
-
- PulsarJsonOutputFormat pulsarJsonOutputFormat = new PulsarJsonOutputFormat(clientConf, producerConf);
- assertNotNull(pulsarJsonOutputFormat);
- }
-}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
deleted file mode 100644
index 97b23b4..0000000
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
-/**
- * Tests for Pulsar Output Format
- */
-public class PulsarOutputFormatTest {
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarOutputFormatConstructorWhenServiceUrlIsNull() {
- new PulsarOutputFormat(null, "testTopic", new AuthenticationDisabled(), text -> text.toString().getBytes());
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarOutputFormatConstructorWhenTopicNameIsNull() {
- new PulsarOutputFormat("testServiceUrl", null, new AuthenticationDisabled(), text -> text.toString().getBytes());
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarOutputFormatConstructorWhenTopicNameIsBlank() {
- new PulsarOutputFormat("testServiceUrl", " ", new AuthenticationDisabled(), text -> text.toString().getBytes());
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarOutputFormatConstructorWhenServiceUrlIsBlank() {
- new PulsarOutputFormat(" ", "testTopic", new AuthenticationDisabled(), text -> text.toString().getBytes());
- }
-
- @Test(expectedExceptions = NullPointerException.class)
- public void testPulsarOutputFormatConstructorWhenSerializationSchemaIsNull() {
- new PulsarOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled(), null);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarOutputFormatConstructorV2WhenServiceUrlIsNull() {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl(null);
-
- ProducerConfigurationData producerConf = new ProducerConfigurationData();
- producerConf.setTopicName("testTopic");
-
- new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarOutputFormatConstructorV2WhenTopicNameIsNull() {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl("testServiceUrl");
-
- ProducerConfigurationData producerConf = new ProducerConfigurationData();
- producerConf.setTopicName(null);
-
- new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarOutputFormatConstructorV2WhenTopicNameIsBlank() {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl("testServiceUrl");
-
- ProducerConfigurationData producerConf = new ProducerConfigurationData();
- producerConf.setTopicName(StringUtils.EMPTY);
-
- new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testPulsarOutputFormatConstructorV2WhenServiceUrlIsBlank() {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl(StringUtils.EMPTY);
-
- ProducerConfigurationData producerConf = new ProducerConfigurationData();
- producerConf.setTopicName("testTopic");
-
- new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
- }
-
- @Test(expectedExceptions = NullPointerException.class)
- public void testPulsarOutputFormatConstructorV2WhenSerializationSchemaIsNull() {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl("testServiceUrl");
-
- ProducerConfigurationData producerConf = new ProducerConfigurationData();
- producerConf.setTopicName("testTopic");
- new PulsarOutputFormat(clientConf, producerConf, null);
- }
-
- @Test
- public void testPulsarOutputFormatConstructorV2() {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl("testServiceUrl");
-
- ProducerConfigurationData producerConf = new ProducerConfigurationData();
- producerConf.setTopicName("testTopic");
-
- PulsarCsvOutputFormat pulsarCsvOutputFormat = new PulsarCsvOutputFormat(clientConf, producerConf);
- assertNotNull(pulsarCsvOutputFormat);
- }
-
- @Test
- public void testPulsarOutputFormatWithStringSerializationSchema() throws IOException {
- String input = "Wolfgang Amadeus Mozart";
- PulsarOutputFormat pulsarOutputFormat =
- new PulsarOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled(),
- text -> text.toString().getBytes());
- assertNotNull(pulsarOutputFormat);
- byte[] bytes = pulsarOutputFormat.serializationSchema.serialize(input);
- String resultString = IOUtils.toString(bytes, StandardCharsets.UTF_8.toString());
- assertEquals(input, resultString);
- }
-
- @Test
- public void testPulsarOutputFormatWithCustomSerializationSchema() throws IOException {
- Employee employee = new Employee(1, "Test Employee", "Test Department");
- PulsarOutputFormat pulsarOutputFormat =
- new PulsarOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled(),
- new EmployeeSerializationSchema());
- assertNotNull(pulsarOutputFormat);
-
- byte[] bytes = pulsarOutputFormat.serializationSchema.serialize(employee);
- String resultString = IOUtils.toString(bytes, StandardCharsets.UTF_8.toString());
- assertEquals(employee.toString(), resultString);
- }
-
- /**
- * Employee Serialization Schema.
- */
- private class EmployeeSerializationSchema implements SerializationSchema<Employee> {
-
- @Override
- public byte[] serialize(Employee employee) {
- StringBuilder stringBuilder = new StringBuilder();
- stringBuilder.append(employee.id);
- stringBuilder.append(" - ");
- stringBuilder.append(employee.name);
- stringBuilder.append(" - ");
- stringBuilder.append(employee.department);
-
- return stringBuilder.toString().getBytes();
- }
- }
-
- /**
- * Data type for Employee Model.
- */
- private class Employee {
-
- public long id;
- public String name;
- public String department;
-
- public Employee(long id, String name, String department) {
- this.id = id;
- this.name = name;
- this.department = department;
- }
-
- @Override
- public String toString() {
- return id + " - " + name + " - " + department;
- }
- }
-
-}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchemaTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchemaTest.java
deleted file mode 100644
index c283d2f..0000000
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchemaTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.serialization;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.flink.avro.generated.NasaMission;
-import org.apache.flink.formats.avro.AvroDeserializationSchema;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
-
-/**
- * Tests for Avro Serialization Schema
- */
-public class AvroSerializationSchemaTest {
-
- @Test
- public void testAvroSerializationSchemaWithSuccessfulCase() throws IOException {
- NasaMission nasaMission = NasaMission.newBuilder().setId(1).setName("Mercury program").setStartYear(1959).setEndYear(1963).build();
- AvroSerializationSchema schema = new AvroSerializationSchema();
- byte[] rowBytes = schema.serialize(nasaMission);
-
- AvroDeserializationSchema<GenericRecord> deserializationSchema = AvroDeserializationSchema.forGeneric(nasaMission.getSchema());
- GenericRecord genericRecord = deserializationSchema.deserialize(rowBytes);
-
- assertEquals(nasaMission.getId(), genericRecord.get("id"));
- assertEquals(nasaMission.getName(), genericRecord.get("name").toString());
- assertEquals(nasaMission.getStartYear(), genericRecord.get("start_year"));
- assertEquals(nasaMission.getEndYear(), genericRecord.get("end_year"));
- }
-
- @Test
- public void testAvroSerializationSchemaWithEmptyRecord() throws IOException {
- NasaMission nasaMission = NasaMission.newBuilder().setId(1).setName("Mercury program").setStartYear(null).setEndYear(null).build();
- AvroSerializationSchema schema = new AvroSerializationSchema();
- byte[] rowBytes = schema.serialize(nasaMission);
-
- AvroDeserializationSchema<GenericRecord> deserializationSchema = AvroDeserializationSchema.forGeneric(nasaMission.getSchema());
- GenericRecord genericRecord = deserializationSchema.deserialize(rowBytes);
-
- assertEquals(nasaMission.getId(), genericRecord.get("id"));
- assertEquals(nasaMission.getName(), genericRecord.get("name").toString());
- assertNull(genericRecord.get("start_year"));
- assertNull(genericRecord.get("end_year"));
- }
-
-}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchemaTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchemaTest.java
deleted file mode 100644
index 3666e54..0000000
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchemaTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.serialization;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-
-import static org.testng.Assert.assertEquals;
-
-/**
- * Tests for Csv Serialization Schema
- */
-public class CsvSerializationSchemaTest {
-
- @Test
- public void testCsvSerializationSchemaWithSuccessfulCase() throws IOException {
- Tuple3<Integer, String, String> employee = new Tuple3(1, "Wolfgang Amadeus", "Mozart");
- CsvSerializationSchema schema = new CsvSerializationSchema();
- byte[] rowBytes = schema.serialize(employee);
- String csvContent = IOUtils.toString(rowBytes, StandardCharsets.UTF_8.toString());
- assertEquals(csvContent, "1,Wolfgang Amadeus,Mozart");
- }
-
- @Test
- public void testCsvSerializationSchemaWithEmptyRecord() throws IOException {
- Tuple3<Integer, String, String> employee = new Tuple3();
- CsvSerializationSchema schema = new CsvSerializationSchema();
- byte[] employeeBytes = schema.serialize(employee);
- String str = IOUtils.toString(employeeBytes, StandardCharsets.UTF_8.toString());
- assertEquals(str, ",,");
- }
-
-}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchemaTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchemaTest.java
deleted file mode 100644
index 80a1cd8..0000000
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchemaTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.batch.connectors.pulsar.serialization;
-
-import org.apache.commons.io.IOUtils;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-
-import static org.testng.Assert.assertEquals;
-
-/**
- * Tests for Json Serialization Schema
- */
-public class JsonSerializationSchemaTest {
-
- @Test
- public void testJsonSerializationSchemaWithSuccessfulCase() throws IOException {
- Employee employee = new Employee(1, "Test Name");
- JsonSerializationSchema schema = new JsonSerializationSchema();
- byte[] rowBytes = schema.serialize(employee);
- String jsonContent = IOUtils.toString(rowBytes, StandardCharsets.UTF_8.toString());
- assertEquals(jsonContent, "{\"id\":1,\"name\":\"Test Name\"}");
- }
-
- @Test
- public void testJsonSerializationSchemaWithEmptyRecord() throws IOException {
- Employee employee = new Employee();
- JsonSerializationSchema schema = new JsonSerializationSchema();
- byte[] employeeBytes = schema.serialize(employee);
- String jsonContent = IOUtils.toString(employeeBytes, StandardCharsets.UTF_8.toString());
- assertEquals(jsonContent, "{\"id\":0,\"name\":null}");
- }
-
- @Test(expectedExceptions = RuntimeException.class)
- public void testJsonSerializationSchemaWithNotSerializableObject() {
- NotSerializableObject notSerializableObject = new NotSerializableObject();
- JsonSerializationSchema schema = new JsonSerializationSchema();
- schema.serialize(notSerializableObject);
- }
-
- /**
- * Employee data model
- */
- private static class Employee {
-
- private long id;
- private String name;
-
- public Employee() {
- }
-
- public Employee(long id, String name) {
- this.id = id;
- this.name = name;
- }
-
- public long getId() {
- return id;
- }
-
- public String getName() {
- return name;
- }
-
- }
-
- /**
- * Not Serializable Object due to not having any public property
- */
- private static class NotSerializableObject {
-
- private long id;
- private String name;
-
- }
-}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
deleted file mode 100644
index 0c81c4a..0000000
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotEquals;
-
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-/**
- * Unit test of {@link CachedPulsarClient}.
- */
-public class CachedPulsarClientTest {
-
- private static final String SERVICE_URL = "pulsar://localhost:6650";
-
- @BeforeMethod
- public void clearCache() {
- CachedPulsarClient.clear();
- }
-
- @Test
- public void testShouldReturnSameInstanceWithSameParam() throws Exception {
- PulsarClientImpl impl1 = Mockito.mock(PulsarClientImpl.class);
- PulsarClientImpl impl2 = Mockito.mock(PulsarClientImpl.class);
-
- ClientConfigurationData conf1 = new ClientConfigurationData();
- conf1.setServiceUrl(SERVICE_URL);
-
- ClientConfigurationData conf2 = new ClientConfigurationData();
- conf2.setServiceUrl(SERVICE_URL);
-
- PowerMockito.whenNew(PulsarClientImpl.class)
- .withArguments(conf1).thenReturn(impl1);
- PowerMockito.whenNew(PulsarClientImpl.class)
- .withArguments(conf2).thenReturn(impl2);
-
- PulsarClientImpl client1 = CachedPulsarClient.getOrCreate(conf1);
- PulsarClientImpl client2 = CachedPulsarClient.getOrCreate(conf2);
- PulsarClientImpl client3 = CachedPulsarClient.getOrCreate(conf1);
-
- assertEquals(client1, client2);
- assertEquals(client1, client3);
-
- assertEquals(CachedPulsarClient.getAsMap().size(), 1);
- }
-
- @Test
- public void testShouldCloseTheCorrectClient() throws Exception {
- PulsarClientImpl impl1 = Mockito.mock(PulsarClientImpl.class);
- PulsarClientImpl impl2 = Mockito.mock(PulsarClientImpl.class);
-
- ClientConfigurationData conf1 = new ClientConfigurationData();
- conf1.setServiceUrl(SERVICE_URL);
-
- ClientConfigurationData conf2 = new ClientConfigurationData();
- conf2.setServiceUrl(SERVICE_URL);
- conf2.setNumIoThreads(5);
-
- PowerMockito.whenNew(PulsarClientImpl.class)
- .withArguments(conf1).thenReturn(impl1);
- PowerMockito.whenNew(PulsarClientImpl.class)
- .withArguments(conf2).thenReturn(impl2);
-
- PulsarClientImpl client1 = CachedPulsarClient.getOrCreate(conf1);
- PulsarClientImpl client2 = CachedPulsarClient.getOrCreate(conf2);
-
- assertNotEquals(client1, client2);
-
- ConcurrentMap<ClientConfigurationData, PulsarClientImpl> map1 = CachedPulsarClient.getAsMap();
- assertEquals(map1.size(), 2);
-
- CachedPulsarClient.close(conf2);
-
- ConcurrentMap<ClientConfigurationData, PulsarClientImpl> map2 = CachedPulsarClient.getAsMap();
- assertEquals(map2.size(), 1);
-
- assertEquals(map2.values().iterator().next(), client1);
- }
-
- @Test
- public void getClientFromCacheShouldAlwaysReturnAnOpenedInstance() throws Exception {
- PulsarClientImpl impl1 = Mockito.mock(PulsarClientImpl.class);
-
- ClientConfigurationData conf1 = new ClientConfigurationData();
- conf1.setServiceUrl(SERVICE_URL);
-
- PowerMockito.whenNew(PulsarClientImpl.class)
- .withArguments(conf1).thenReturn(impl1);
-
- PulsarClientImpl client1 = CachedPulsarClient.getOrCreate(conf1);
-
- ConcurrentMap<ClientConfigurationData, PulsarClientImpl> map1 = CachedPulsarClient.getAsMap();
- assertEquals(map1.size(), 1);
-
- client1.getState().set(PulsarClientImpl.State.Closed);
-
- PulsarClientImpl client2 = CachedPulsarClient.getOrCreate(conf1);
-
- assertNotEquals(client1, client2);
-
- ConcurrentMap<ClientConfigurationData, PulsarClientImpl> map2 = CachedPulsarClient.getAsMap();
- assertEquals(map2.size(), 1);
- }
-}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
deleted file mode 100644
index 49b50bb..0000000
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.avro.generated.NasaMission;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
-import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.types.Row;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertNotNull;
-import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
-
-/**
- * Unit test of {@link PulsarAvroTableSink}.
- */
-public class PulsarAvroTableSinkTest {
- private static final String SERVICE_URL = "pulsar://localhost:6650";
- private static final String TOPIC_NAME = "test_topic";
- private static final Authentication AUTHENTICATION = new AuthenticationDisabled();
- private static final String ROUTING_KEY = "name";
-
- private final String[] fieldNames = {"id", "name","start_year","end_year"};
- private final TypeInformation[] typeInformations = {
- TypeInformation.of(Integer.class),
- TypeInformation.of(String.class),
- TypeInformation.of(Integer.class),
- TypeInformation.of(Integer.class)
- };
-
- /**
- * Test configure PulsarTableSink.
- *
- * @throws Exception
- */
- @Test
- public void testConfigure() throws Exception {
- PulsarAvroTableSink sink = spySink();
-
- TableSink<Row> configuredSink = sink.configure(fieldNames, typeInformations);
-
- assertArrayEquals(fieldNames, configuredSink.getFieldNames());
- assertArrayEquals(typeInformations, configuredSink.getFieldTypes());
- assertNotNull(((PulsarAvroTableSink) configuredSink).keyExtractor);
- assertNotNull(((PulsarAvroTableSink) configuredSink).serializationSchema);
- }
-
-
- /**
- * Test emit data stream.
- *
- * @throws Exception
- */
- @Test
- public void testEmitDataStream() throws Exception {
- DataStream mockedDataStream = Mockito.mock(DataStream.class);
-
- PulsarAvroTableSink sink = spySink();
-
- sink.emitDataStream(mockedDataStream);
-
- Mockito.verify(mockedDataStream).addSink(Mockito.any(FlinkPulsarProducer.class));
- }
-
-
- private PulsarAvroTableSink spySink() throws Exception {
-
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl(SERVICE_URL);
-
- ProducerConfigurationData producerConf = new ProducerConfigurationData();
- producerConf.setTopicName(TOPIC_NAME);
-
- PulsarAvroTableSink sink =
- new PulsarAvroTableSink(clientConf, producerConf, ROUTING_KEY, NasaMission.class);
- FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class);
- PowerMockito.whenNew(
- FlinkPulsarProducer.class
- ).withArguments(
- Mockito.anyString(),
- Mockito.anyString(),
- Mockito.any(Authentication.class),
- Mockito.any(SerializationSchema.class),
- Mockito.any(PulsarKeyExtractor.class),
- Mockito.any(PulsarPropertiesExtractor.class)
- ).thenReturn(producer);
- FieldUtils.writeField(sink, "fieldNames", fieldNames, true);
- FieldUtils.writeField(sink, "fieldTypes", typeInformations, true);
- FieldUtils.writeField(sink, "serializationSchema", Mockito.mock(SerializationSchema.class), true);
- FieldUtils.writeField(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class), true);
- FieldUtils.writeField(sink, "propertiesExtractor", Mockito.mock(PulsarPropertiesExtractor.class), true);
- return sink;
- }
-
-}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
deleted file mode 100644
index 4266008..0000000
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
+++ /dev/null
@@ -1,659 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import java.util.function.Function;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
-import org.apache.pulsar.client.api.transaction.Transaction;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerStats;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Messages;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-/**
- * Tests for the PulsarConsumerSource. The source supports two operation modes.
- * 1) At-least-once (when checkpointed) with Pulsar message acknowledgements and the deduplication mechanism in
- * {@link org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase}..
- * 3) No strong delivery guarantees (without checkpointing) with Pulsar acknowledging messages after
- * after it receives x number of messages.
- *
- * <p>This tests assumes that the MessageIds are increasing monotonously. That doesn't have to be the
- * case. The MessageId is used to uniquely identify messages.
- */
-public class PulsarConsumerSourceTests {
-
- private PulsarConsumerSource<String> source;
-
- private TestConsumer consumer;
-
- private TestSourceContext context;
-
- private Thread sourceThread;
-
- private Exception exception;
-
- @BeforeMethod
- public void before() {
- context = new TestSourceContext();
-
- sourceThread = new Thread(() -> {
- try {
- source.run(context);
- } catch (Exception e) {
- exception = e;
- }
- });
- }
-
- @AfterMethod
- public void after() throws Exception {
- if (source != null) {
- source.cancel();
- }
- if (sourceThread != null) {
- sourceThread.join();
- }
- }
-
- @Test
- public void testCheckpointing() throws Exception {
- final int numMessages = 5;
- consumer = new TestConsumer(numMessages);
-
- source = createSource(consumer, 1, true);
- source.open(new Configuration());
-
- final StreamSource<String, PulsarConsumerSource<String>> src = new StreamSource<>(source);
- final AbstractStreamOperatorTestHarness<String> testHarness =
- new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
-
- testHarness.open();
-
- sourceThread.start();
-
- final Random random = new Random(System.currentTimeMillis());
- for (int i = 0; i < 3; ++i) {
-
- // wait and receive messages from the test consumer
- receiveMessages();
-
- final long snapshotId = random.nextLong();
- OperatorSubtaskState data;
- synchronized (context.getCheckpointLock()) {
- data = testHarness.snapshot(snapshotId, System.currentTimeMillis());
- }
-
- final TestPulsarConsumerSource sourceCopy =
- createSource(mock(Consumer.class), 1, true);
- final StreamSource<String, TestPulsarConsumerSource> srcCopy = new StreamSource<>(sourceCopy);
- final AbstractStreamOperatorTestHarness<String> testHarnessCopy =
- new AbstractStreamOperatorTestHarness<>(srcCopy, 1, 1, 0);
-
- testHarnessCopy.setup();
- testHarnessCopy.initializeState(data);
- testHarnessCopy.open();
-
- final ArrayDeque<Tuple2<Long, Set<MessageId>>> deque = sourceCopy.getRestoredState();
- final Set<MessageId> messageIds = deque.getLast().f1;
-
- final int start = consumer.currentMessage.get() - numMessages;
- for (int mi = start; mi < (start + numMessages); ++mi) {
- assertTrue(messageIds.contains(consumer.messages.get(mi).getMessageId()));
- }
-
- // check if the messages are being acknowledged
- synchronized (context.getCheckpointLock()) {
- source.notifyCheckpointComplete(snapshotId);
-
- assertEquals(consumer.acknowledgedIds.keySet(), messageIds);
- // clear acknowledgements for the next snapshot comparison
- consumer.acknowledgedIds.clear();
- }
-
- final int lastMessageIndex = consumer.currentMessage.get();
- consumer.addMessages(createMessages(lastMessageIndex, 5));
- }
- }
-
- @Test
- public void testCheckpointingDuplicatedIds() throws Exception {
- consumer = new TestConsumer(5);
-
- source = createSource(consumer, 1, true);
- source.open(new Configuration());
-
- sourceThread.start();
-
- receiveMessages();
-
- assertEquals(5, context.elements.size());
-
- // try to reprocess the messages we should not collect any more elements
- consumer.reset();
-
- receiveMessages();
-
- assertEquals(5, context.elements.size());
- }
-
- @Test
- public void testCheckpointingDisabledMessagesEqualBatchSize() throws Exception {
-
- consumer = new TestConsumer(5);
-
- source = createSource(consumer, 5, false);
- source.open(new Configuration());
-
- sourceThread.start();
-
- receiveMessages();
-
- assertEquals(1, consumer.acknowledgedIds.size());
- }
-
- @Test
- public void testCheckpointingDisabledMoreMessagesThanBatchSize() throws Exception {
-
- consumer = new TestConsumer(6);
-
- source = createSource(consumer, 5, false);
- source.open(new Configuration());
-
- sourceThread.start();
-
- receiveMessages();
-
- assertEquals(1, consumer.acknowledgedIds.size());
- }
-
- @Test
- public void testCheckpointingDisabledLessMessagesThanBatchSize() throws Exception {
-
- consumer = new TestConsumer(4);
-
- source = createSource(consumer, 5, false);
- source.open(new Configuration());
-
- sourceThread.start();
-
- receiveMessages();
-
- assertEquals(0, consumer.acknowledgedIds.size());
- }
-
- @Test
- public void testCheckpointingDisabledMessages2XBatchSize() throws Exception {
-
- consumer = new TestConsumer(10);
-
- source = createSource(consumer, 5, false);
- source.open(new Configuration());
-
- sourceThread.start();
-
- receiveMessages();
-
- assertEquals(2, consumer.acknowledgedIds.size());
- }
-
- private void receiveMessages() throws InterruptedException {
- while (consumer.currentMessage.get() < consumer.messages.size()) {
- Thread.sleep(5);
- }
- }
-
- private TestPulsarConsumerSource createSource(Consumer<byte[]> testConsumer,
- long batchSize, boolean isCheckpointingEnabled) throws Exception {
- PulsarSourceBuilder<String> builder =
- PulsarSourceBuilder.builder(new SimpleStringSchema())
- .acknowledgementBatchSize(batchSize);
- TestPulsarConsumerSource source = new TestPulsarConsumerSource(builder, testConsumer, isCheckpointingEnabled);
-
- OperatorStateStore mockStore = mock(OperatorStateStore.class);
- FunctionInitializationContext mockContext = mock(FunctionInitializationContext.class);
- when(mockContext.getOperatorStateStore()).thenReturn(mockStore);
- when(mockStore.getSerializableListState(any(String.class))).thenReturn(null);
-
- source.initializeState(mockContext);
-
- return source;
- }
-
- private static class TestPulsarConsumerSource extends PulsarConsumerSource<String> {
-
- private ArrayDeque<Tuple2<Long, Set<MessageId>>> restoredState;
-
- private Consumer<byte[]> testConsumer;
- private boolean isCheckpointingEnabled;
-
- TestPulsarConsumerSource(PulsarSourceBuilder<String> builder,
- Consumer<byte[]> testConsumer, boolean isCheckpointingEnabled) {
- super(builder);
- this.testConsumer = testConsumer;
- this.isCheckpointingEnabled = isCheckpointingEnabled;
- }
-
- @Override
- protected boolean addId(MessageId messageId) {
- assertTrue(isCheckpointingEnabled());
- return super.addId(messageId);
- }
-
- @Override
- public RuntimeContext getRuntimeContext() {
- StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
- when(context.isCheckpointingEnabled()).thenReturn(isCheckpointingEnabled);
- return context;
- }
-
- @Override
- public void initializeState(FunctionInitializationContext context) throws Exception {
- super.initializeState(context);
- this.restoredState = this.pendingCheckpoints;
- }
-
- public ArrayDeque<Tuple2<Long, Set<MessageId>>> getRestoredState() {
- return this.restoredState;
- }
-
- @Override
- PulsarClient getClient() {
- return mock(PulsarClient.class);
- }
-
- @Override
- Consumer<byte[]> createConsumer(PulsarClient client) {
- return testConsumer;
- }
- }
-
- private static class TestSourceContext implements SourceFunction.SourceContext<String> {
-
- private static final Object lock = new Object();
-
- private final List<String> elements = new ArrayList<>();
-
- @Override
- public void collect(String element) {
- elements.add(element);
- }
-
- @Override
- public void collectWithTimestamp(String element, long timestamp) {
-
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
-
- }
-
- @Override
- public void markAsTemporarilyIdle() {
-
- }
-
- @Override
- public Object getCheckpointLock() {
- return lock;
- }
-
- @Override
- public void close() {
-
- }
- }
-
- private static class TestConsumer implements Consumer<byte[]> {
-
- private final List<Message> messages = new ArrayList<>();
-
- private AtomicInteger currentMessage = new AtomicInteger();
-
- private final Map<MessageId, MessageId> acknowledgedIds = new ConcurrentHashMap<>();
-
- private TestConsumer(int numMessages) {
- messages.addAll(createMessages(0, numMessages));
- }
-
- private void reset() {
- currentMessage.set(0);
- }
-
- @Override
- public String getTopic() {
- return null;
- }
-
- @Override
- public String getSubscription() {
- return null;
- }
-
- @Override
- public void unsubscribe() throws PulsarClientException {
-
- }
-
- @Override
- public long getLastDisconnectedTimestamp() {
- return 0L;
- }
-
-
- @Override
- public CompletableFuture<Void> unsubscribeAsync() {
- return null;
- }
-
- @Override
- public Message<byte[]> receive() throws PulsarClientException {
- return null;
- }
-
- public synchronized void addMessages(List<Message> messages) {
- this.messages.addAll(messages);
- }
-
- @Override
- public CompletableFuture<Message<byte[]>> receiveAsync() {
- return null;
- }
-
- @Override
- public Message<byte[]> receive(int i, TimeUnit timeUnit) throws PulsarClientException {
- synchronized (this) {
- if (currentMessage.get() == messages.size()) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- System.out.println("no more messages sleeping index: " + currentMessage.get());
- }
- return null;
- }
- return messages.get(currentMessage.getAndIncrement());
- }
- }
-
- @Override
- public Messages<byte[]> batchReceive() throws PulsarClientException {
- return null;
- }
-
- @Override
- public CompletableFuture<Messages<byte[]>> batchReceiveAsync() {
- return null;
- }
-
- @Override
- public void acknowledge(Message<?> message) throws PulsarClientException {
-
- }
-
- @Override
- public void acknowledge(MessageId messageId) throws PulsarClientException {
-
- }
-
- @Override
- public void acknowledge(Messages<?> messages) throws PulsarClientException {
-
- }
-
- @Override
- public void acknowledge(List<MessageId> messageIdList) throws PulsarClientException {
-
- }
-
- @Override
- public void negativeAcknowledge(Message<?> message) {
- }
-
- @Override
- public void negativeAcknowledge(MessageId messageId) {
- }
-
- @Override
- public void negativeAcknowledge(Messages<?> messages) {
-
- }
-
- @Override
- public void acknowledgeCumulative(Message<?> message) throws PulsarClientException {
-
- }
-
- @Override
- public void acknowledgeCumulative(MessageId messageId) throws PulsarClientException {
- acknowledgedIds.put(messageId, messageId);
- }
-
- @Override
- public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Transaction transaction) {
- return null;
- }
-
- @Override
- public CompletableFuture<Void> acknowledgeAsync(Message<?> message) {
- return null;
- }
-
- @Override
- public CompletableFuture<Void> acknowledgeAsync(MessageId messageId) {
- acknowledgedIds.put(messageId, messageId);
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public CompletableFuture<Void> acknowledgeAsync(MessageId messageId, Transaction transaction) {
- return null;
- }
-
- @Override
- public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages) {
- return null;
- }
-
- @Override
- public CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList) {
- return null;
- }
-
- @Override
- public CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message) {
- return null;
- }
-
- @Override
- public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId) {
- return null;
- }
-
- @Override
- public ConsumerStats getStats() {
- return null;
- }
-
- @Override
- public void close() throws PulsarClientException {
-
- }
-
- @Override
- public CompletableFuture<Void> closeAsync() {
- return null;
- }
-
- @Override
- public boolean hasReachedEndOfTopic() {
- return false;
- }
-
- @Override
- public void redeliverUnacknowledgedMessages() {
-
- }
-
- @Override
- public void seek(MessageId messageId) throws PulsarClientException {
-
- }
-
- @Override
- public void seek(long timestamp) throws PulsarClientException {
-
- }
-
- @Override
- public void seek(Function<String, Object> function) throws PulsarClientException {
-
- }
-
- @Override
- public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
- return null;
- }
-
- @Override
- public CompletableFuture<Void> seekAsync(MessageId messageId) {
- return null;
- }
-
- @Override
- public CompletableFuture<Void> seekAsync(long timestamp) {
- return null;
- }
-
- @Override
- public boolean isConnected() {
- return true;
- }
-
- @Override
- public String getConsumerName() {
- return "test-consumer-0";
- }
-
- @Override
- public void pause() {
- }
-
- @Override
- public void resume() {
- }
-
- @Override
- public MessageId getLastMessageId() throws PulsarClientException {
- return null;
- }
-
- @Override
- public CompletableFuture<MessageId> getLastMessageIdAsync() {
- return null;
- }
-
- @Override
- public void reconsumeLater(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException {
-
- }
-
- @Override
- public void reconsumeLater(Messages<?> messages, long delayTime, TimeUnit unit) throws PulsarClientException {
-
- }
-
- @Override
- public void reconsumeLaterCumulative(Message<?> message, long delayTime, TimeUnit unit)
- throws PulsarClientException {
-
- }
-
- @Override
- public CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit) {
- return null;
- }
-
- @Override
- public CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, long delayTime, TimeUnit unit) {
- return null;
- }
-
- @Override
- public CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, long delayTime,
- TimeUnit unit) {
- return null;
- }
- }
-
- private static List<Message> createMessages(int startIndex, int numMessages) {
- final List<Message> messages = new ArrayList<>();
- for (int i = startIndex; i < (startIndex + numMessages); ++i) {
- String content = "message-" + i;
- messages.add(createMessage(content, createMessageId(1, i + 1, 1)));
- }
- return messages;
- }
-
- private static Message<byte[]> createMessage(String content, String messageId) {
- return new MessageImpl<byte[]>("my-topic", messageId, Collections.emptyMap(),
- content.getBytes(), Schema.BYTES, new MessageMetadata());
- }
-
- private static String createMessageId(long ledgerId, long entryId, long partitionIndex) {
- return String.format("%d:%d:%d", ledgerId, entryId, partitionIndex);
- }
-}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
deleted file mode 100644
index d895a5f..0000000
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
-import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.types.Row;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
-
-/**
- * Unit test of {@link PulsarJsonTableSink}.
- */
-public class PulsarJsonTableSinkTest {
-
- private static final String SERVICE_URL = "pulsar://localhost:6650";
- private static final String TOPIC_NAME = "test_topic";
- private static final Authentication AUTHENTICATION = new AuthenticationDisabled();
- private static final String ROUTING_KEY = "key";
- private final String[] fieldNames = {"key", "value"};
- private final TypeInformation[] typeInformations = {
- TypeInformation.of(String.class),
- TypeInformation.of(String.class)
- };
-
- /**
- * Test configure PulsarTableSink.
- *
- * @throws Exception
- */
- @Test
- public void testConfigure() throws Exception {
- PulsarJsonTableSink sink = spySink();
-
- TableSink<Row> configuredSink = sink.configure(fieldNames, typeInformations);
-
- assertArrayEquals(fieldNames, configuredSink.getFieldNames());
- assertArrayEquals(typeInformations, configuredSink.getFieldTypes());
- Assert.assertNotNull(((PulsarJsonTableSink) configuredSink).keyExtractor);
- Assert.assertNotNull(((PulsarJsonTableSink) configuredSink).serializationSchema);
- }
-
- /**
- * Test emit data stream.
- *
- * @throws Exception
- */
- @Test
- public void testEmitDataStream() throws Exception {
- DataStream mockedDataStream = Mockito.mock(DataStream.class);
-
- PulsarJsonTableSink sink = spySink();
-
- sink.emitDataStream(mockedDataStream);
-
- Mockito.verify(mockedDataStream).addSink(Mockito.any(FlinkPulsarProducer.class));
- }
-
- private PulsarJsonTableSink spySink() throws Exception {
- ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
- clientConfigurationData.setServiceUrl(SERVICE_URL);
-
- ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData();
- producerConfigurationData.setTopicName(TOPIC_NAME);
-
- PulsarJsonTableSink sink = new PulsarJsonTableSink(
- clientConfigurationData, producerConfigurationData,
- ROUTING_KEY);
-
- FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class);
- PowerMockito.whenNew(
- FlinkPulsarProducer.class
- ).withArguments(
- Mockito.anyString(),
- Mockito.anyString(),
- Mockito.any(Authentication.class),
- Mockito.any(SerializationSchema.class),
- Mockito.any(PulsarKeyExtractor.class),
- Mockito.any(PulsarPropertiesExtractor.class)
- ).thenReturn(producer);
-
- FieldUtils.writeField(sink, "fieldNames", fieldNames, true);
- FieldUtils.writeField(sink, "fieldTypes", typeInformations, true);
- FieldUtils.writeField(sink, "serializationSchema", Mockito.mock(SerializationSchema.class), true);
- FieldUtils.writeField(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class), true);
- FieldUtils.writeField(sink, "propertiesExtractor", Mockito.mock(PulsarPropertiesExtractor.class), true);
- return sink;
- }
-}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
deleted file mode 100644
index c89ad7e..0000000
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.regex.Pattern;
-
-/**
- * Tests for PulsarSourceBuilder
- */
-public class PulsarSourceBuilderTest {
-
- private PulsarSourceBuilder pulsarSourceBuilder;
-
- @BeforeMethod
- public void before() {
- pulsarSourceBuilder = PulsarSourceBuilder.builder(new TestDeserializationSchema());
- }
-
- @Test
- public void testBuild() throws PulsarClientException {
- SourceFunction sourceFunction = pulsarSourceBuilder
- .serviceUrl("testServiceUrl")
- .topic("testTopic")
- .subscriptionName("testSubscriptionName")
- .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .build();
- Assert.assertNotNull(sourceFunction);
- }
-
-
- @Test
- public void testBuildWithConfPojo() throws PulsarClientException {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl("testServiceUrl");
-
- ConsumerConfigurationData consumerConf = new ConsumerConfigurationData();
- consumerConf.setTopicNames(new HashSet<>(Arrays.asList("testTopic")));
- consumerConf.setSubscriptionName("testSubscriptionName");
- consumerConf.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
-
- SourceFunction sourceFunction = pulsarSourceBuilder
- .pulsarAllClientConf(clientConf)
- .pulsarAllConsumerConf(consumerConf)
- .build();
- Assert.assertNotNull(sourceFunction);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testBuildWithoutSettingRequiredProperties() throws PulsarClientException {
- pulsarSourceBuilder.build();
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testServiceUrlWithNull() {
- pulsarSourceBuilder.serviceUrl(null);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testServiceUrlWithBlank() {
- pulsarSourceBuilder.serviceUrl(" ");
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testTopicWithNull() {
- pulsarSourceBuilder.topic(null);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testTopicWithBlank() {
- pulsarSourceBuilder.topic(" ");
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testTopicsWithNull() {
- pulsarSourceBuilder.topics(null);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testTopicsWithBlank() {
- pulsarSourceBuilder.topics(Arrays.asList(" ", " "));
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testTopicPatternWithNull() {
- pulsarSourceBuilder.topicsPattern(null);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testTopicPatternAlreadySet() {
- pulsarSourceBuilder.topicsPattern(Pattern.compile("persistent://tenants/ns/topic-*"));
- pulsarSourceBuilder.topicsPattern(Pattern.compile("persistent://tenants/ns/topic-my-*"));
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testTopicPattenStringWithNull() {
- pulsarSourceBuilder.topicsPatternString(null);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testSubscriptionNameWithNull() {
- pulsarSourceBuilder.subscriptionName(null);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testSubscriptionNameWithBlank() {
- pulsarSourceBuilder.subscriptionName(" ");
- }
-
- @Test(expectedExceptions = NullPointerException.class)
- public void testSubscriptionInitialPosition() {
- pulsarSourceBuilder.subscriptionInitialPosition(null);
- }
-
- private class TestDeserializationSchema<T> implements DeserializationSchema<T> {
-
- @Override
- public T deserialize(byte[] bytes) throws IOException {
- return null;
- }
-
- @Override
- public boolean isEndOfStream(T t) {
- return false;
- }
-
- @Override
- public TypeInformation<T> getProducedType() {
- return null;
- }
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testServiceUrlNullWithConfPojo() throws PulsarClientException {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl(null);
-
- ConsumerConfigurationData consumerConf = new ConsumerConfigurationData();
- consumerConf.setTopicNames(new HashSet<String>(Arrays.asList("testServiceUrl")));
- consumerConf.setSubscriptionName("testSubscriptionName");
-
- pulsarSourceBuilder
- .pulsarAllClientConf(clientConf)
- .pulsarAllConsumerConf(consumerConf)
- .build();
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testServiceUrlWithBlankWithConfPojo() throws PulsarClientException {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl(StringUtils.EMPTY);
-
- ConsumerConfigurationData consumerConf = new ConsumerConfigurationData();
- consumerConf.setTopicNames(new HashSet<String>(Arrays.asList("testTopic")));
- consumerConf.setSubscriptionName("testSubscriptionName");
-
- pulsarSourceBuilder
- .pulsarAllClientConf(clientConf)
- .pulsarAllConsumerConf(consumerConf)
- .build();
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testTopicPatternWithNullWithConfPojo() throws PulsarClientException {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl("testServiceUrl");
- ConsumerConfigurationData consumerConf = new ConsumerConfigurationData();
- consumerConf.setTopicsPattern(null);
- consumerConf.setSubscriptionName("testSubscriptionName");
-
- pulsarSourceBuilder
- .pulsarAllClientConf(clientConf)
- .pulsarAllConsumerConf(consumerConf)
- .build();
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testSubscriptionNameWithNullWithConfPojo() throws PulsarClientException {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl("testServiceUrl");
-
- ConsumerConfigurationData consumerConf = new ConsumerConfigurationData();
- consumerConf.setTopicNames(new HashSet<String>(Arrays.asList("testTopic")));
- consumerConf.setSubscriptionName(null);
-
- pulsarSourceBuilder
- .pulsarAllClientConf(clientConf)
- .pulsarAllConsumerConf(consumerConf)
- .build();
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testSubscriptionNameWithBlankWithConfPojo() throws PulsarClientException {
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl("testServiceUrl");
-
- ConsumerConfigurationData consumerConf = new ConsumerConfigurationData();
- consumerConf.setTopicNames(new HashSet<String>(Arrays.asList("testTopic")));
- consumerConf.setSubscriptionName(StringUtils.EMPTY);
-
- pulsarSourceBuilder
- .pulsarAllClientConf(clientConf)
- .pulsarAllConsumerConf(consumerConf)
- .build();
- }
-
-}
diff --git a/pulsar-flink/src/test/resources/avro/NasaMission.avsc b/pulsar-flink/src/test/resources/avro/NasaMission.avsc
deleted file mode 100644
index 521f475..0000000
--- a/pulsar-flink/src/test/resources/avro/NasaMission.avsc
+++ /dev/null
@@ -1,10 +0,0 @@
-{"namespace": "org.apache.flink.avro.generated",
- "type": "record",
- "name": "NasaMission",
- "fields": [
- {"name": "id", "type": "int"},
- {"name": "name", "type": "string"},
- {"name": "start_year", "type": ["int", "null"]},
- {"name": "end_year", "type": ["int", "null"]}
- ]
-}