You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/02/16 09:41:03 UTC
[flink] 01/09: [FLINK-24246][connector/pulsar] Bump PulsarClient version to latest 2.9.1
This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1602e4b7d26cf52cea993c410769b7b15a672aff
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Wed Feb 9 13:00:53 2022 +0800
[FLINK-24246][connector/pulsar] Bump PulsarClient version to latest 2.9.1
1. Bump the pulsar-client-all version in pom file.
2. Exclude useless dependencies for pulsar-client-all.
3. Bump the Pulsar docker version.
4. Change the dependencies to pass the tests.
5. Drop PulsarTransactionUtils and fix compile issues in tests.
6. Add bouncycastle to Pulsar e2e tests.
---
flink-connectors/flink-connector-pulsar/pom.xml | 74 +++++++++++--
.../common/utils/PulsarTransactionUtils.java | 118 ---------------------
.../split/PulsarUnorderedPartitionSplitReader.java | 3 +-
.../PulsarDeserializationSchemaTest.java | 2 +-
.../src/main/resources/META-INF/NOTICE | 16 +--
.../flink-end-to-end-tests-pulsar/pom.xml | 43 +++++++-
.../FlinkContainerWithPulsarEnvironment.java | 5 +
.../org/apache/flink/util/DockerImageVersions.java | 2 +-
8 files changed, 124 insertions(+), 139 deletions(-)
diff --git a/flink-connectors/flink-connector-pulsar/pom.xml b/flink-connectors/flink-connector-pulsar/pom.xml
index 87b6ba0..45047eb 100644
--- a/flink-connectors/flink-connector-pulsar/pom.xml
+++ b/flink-connectors/flink-connector-pulsar/pom.xml
@@ -36,12 +36,14 @@ under the License.
<packaging>jar</packaging>
<properties>
- <pulsar.version>2.8.0</pulsar.version>
+ <pulsar.version>2.9.1</pulsar.version>
<!-- Test Libraries -->
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
- <commons-lang3.version>3.11</commons-lang3.version>
- <grpc.version>1.33.0</grpc.version>
+ <pulsar-commons-lang3.version>3.11</pulsar-commons-lang3.version>
+ <pulsar-zookeeper.version>3.6.3</pulsar-zookeeper.version>
+ <pulsar-netty.version>4.1.72.Final</pulsar-netty.version>
+ <pulsar-grpc.version>1.33.0</pulsar-grpc.version>
</properties>
<dependencies>
@@ -138,12 +140,22 @@ under the License.
<version>${pulsar.version}</version>
<scope>test</scope>
</dependency>
+
<!-- Pulsar use a newer commons-lang3 in broker. -->
<!-- Bump the version only for testing. -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
- <version>${commons-lang3.version}</version>
+ <version>${pulsar-commons-lang3.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Pulsar use a newer zookeeper in broker. -->
+ <!-- Bump the version only for testing. -->
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>${pulsar-zookeeper.version}</version>
<scope>test</scope>
</dependency>
@@ -156,9 +168,41 @@ under the License.
<version>${pulsar.version}</version>
<exclusions>
<exclusion>
+ <groupId>com.sun.activation</groupId>
+ <artifactId>javax.activation</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>jakarta.activation</groupId>
+ <artifactId>jakarta.activation-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>jakarta.ws.rs</groupId>
+ <artifactId>jakarta.ws.rs-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>jakarta.xml.bind</groupId>
+ <artifactId>jakarta.xml.bind-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.validation</groupId>
+ <artifactId>validation-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.jcip</groupId>
+ <artifactId>jcip-annotations</artifactId>
+ </exclusion>
+ <exclusion>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-package-core</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.beust</groupId>
+ <artifactId>jcommander</artifactId>
+ </exclusion>
</exclusions>
</dependency>
@@ -171,13 +215,23 @@ under the License.
</dependency>
</dependencies>
- <!-- gRPC use version range which don't support by flink ci. -->
+
<dependencyManagement>
<dependencies>
+ <!-- Pulsar use higher gRPC version. -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-bom</artifactId>
- <version>${grpc.version}</version>
+ <version>${pulsar-grpc.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+
+ <!-- Pulsar use higher netty version. -->
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-bom</artifactId>
+ <version>${pulsar-netty.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
@@ -200,7 +254,9 @@ under the License.
<configuration>
<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
<forkCount>1</forkCount>
- <argLine>-Xms256m -Xmx2048m -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit -Duser.country=US -Duser.language=en</argLine>
+ <argLine>-Xms256m -Xmx2048m -Dmvn.forkNumber=${surefire.forkNumber}
+ -XX:-UseGCOverheadLimit -Duser.country=US -Duser.language=en
+ </argLine>
</configuration>
</plugin>
<plugin>
@@ -222,7 +278,9 @@ under the License.
<outputDirectory>
${project.build.directory}/generated-test-sources/protobuf/java
</outputDirectory>
- <protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
+ <protocArtifact>
+ com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
+ </protocArtifact>
</configuration>
<executions>
<execution>
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java
deleted file mode 100644
index ef54779..0000000
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.common.utils;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.pulsar.client.api.transaction.Transaction;
-import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.client.impl.transaction.TransactionImpl;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Transaction was introduced into pulsar since 2.7.0, but the interface {@link Transaction} didn't
- * provide a id method until 2.8.1. We have to add this util for acquiring the {@link TxnID} for
- * compatible consideration.
- *
- * <p>TODO Remove this hack after pulsar 2.8.1 release.
- */
-@Internal
-@SuppressWarnings("java:S3011")
-public final class PulsarTransactionUtils {
-
- private static volatile Field mostBitsField;
- private static volatile Field leastBitsField;
-
- private PulsarTransactionUtils() {
- // No public constructor
- }
-
- public static TxnID getId(Transaction transaction) {
- // 2.8.1 and after.
- try {
- Method getId = Transaction.class.getDeclaredMethod("getTxnID");
- return (TxnID) getId.invoke(transaction);
- } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
- // 2.8.0 and before.
- TransactionImpl impl = (TransactionImpl) transaction;
- Long txnIdMostBits = getTxnIdMostBits(impl);
- Long txnIdLeastBits = getTxnIdLeastBits(impl);
-
- checkNotNull(txnIdMostBits, "Failed to get txnIdMostBits");
- checkNotNull(txnIdLeastBits, "Failed to get txnIdLeastBits");
-
- return new TxnID(txnIdMostBits, txnIdLeastBits);
- }
- }
-
- private static Long getTxnIdMostBits(TransactionImpl transaction) {
- if (mostBitsField == null) {
- synchronized (PulsarTransactionUtils.class) {
- if (mostBitsField == null) {
- try {
- mostBitsField = TransactionImpl.class.getDeclaredField("txnIdMostBits");
- mostBitsField.setAccessible(true);
- } catch (NoSuchFieldException e) {
- // Nothing to do for this exception.
- }
- }
- }
- }
-
- if (mostBitsField != null) {
- try {
- return (Long) mostBitsField.get(transaction);
- } catch (IllegalAccessException e) {
- // Nothing to do for this exception.
- }
- }
-
- return null;
- }
-
- private static Long getTxnIdLeastBits(TransactionImpl transaction) {
- if (leastBitsField == null) {
- synchronized (PulsarTransactionUtils.class) {
- if (leastBitsField == null) {
- try {
- leastBitsField = TransactionImpl.class.getDeclaredField("txnIdLeastBits");
- leastBitsField.setAccessible(true);
- } catch (NoSuchFieldException e) {
- // Nothing to do for this exception.
- }
- }
- }
- }
-
- if (leastBitsField != null) {
- try {
- return (Long) leastBitsField.get(transaction);
- } catch (IllegalAccessException e) {
- // Nothing to do for this exception.
- }
- }
-
- return null;
- }
-}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
index 846101d..7262863 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
@@ -20,7 +20,6 @@ package org.apache.flink.connector.pulsar.source.reader.split;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader;
@@ -155,7 +154,7 @@ public class PulsarUnorderedPartitionSplitReader<OUT> extends PulsarPartitionSpl
// Avoiding NP problem when Pulsar don't get the message before Flink checkpoint.
if (uncommittedTransaction != null) {
- TxnID txnID = PulsarTransactionUtils.getId(uncommittedTransaction);
+ TxnID txnID = uncommittedTransaction.getTxnID();
this.uncommittedTransaction = newTransaction();
state.setUncommittedTransactionId(txnID);
}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
index aa4bcee..48e6e7a 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
@@ -113,7 +113,7 @@ class PulsarDeserializationSchemaTest {
MessageMetadata metadata = new MessageMetadata();
ByteBuffer payload = ByteBuffer.wrap(bytes);
- return MessageImpl.create(metadata, payload, Schema.BYTES);
+ return MessageImpl.create(metadata, payload, Schema.BYTES, "");
}
/** This collector is used for collecting only one message. Used for test purpose. */
diff --git a/flink-connectors/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE b/flink-connectors/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
index 79ebbfc..56ad187 100644
--- a/flink-connectors/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
+++ b/flink-connectors/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
@@ -6,12 +6,12 @@ The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
-- org.apache.pulsar:bouncy-castle-bc:pkg:2.8.0
-- org.apache.pulsar:pulsar-client-admin-api:2.8.0
-- org.apache.pulsar:pulsar-client-all:2.8.0
-- org.apache.pulsar:pulsar-client-api:2.8.0
-- org.bouncycastle:bcpkix-jdk15on:1.68
-- org.bouncycastle:bcprov-ext-jdk15on:1.68
-- org.bouncycastle:bcprov-jdk15on:1.68
-- org.bouncycastle:bcutil-jdk15on:1.68
+- org.apache.pulsar:bouncy-castle-bc:pkg:2.9.1
+- org.apache.pulsar:pulsar-client-admin-api:2.9.1
+- org.apache.pulsar:pulsar-client-all:2.9.1
+- org.apache.pulsar:pulsar-client-api:2.9.1
+- org.bouncycastle:bcpkix-jdk15on:1.69
+- org.bouncycastle:bcprov-ext-jdk15on:1.69
+- org.bouncycastle:bcprov-jdk15on:1.69
+- org.bouncycastle:bcutil-jdk15on:1.69
- org.slf4j:jul-to-slf4j:1.7.25
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml
index e7caf8b..7c87ec7 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml
@@ -31,7 +31,8 @@ under the License.
<name>Flink : E2E Tests : Pulsar</name>
<properties>
- <pulsar.version>2.8.0</pulsar.version>
+ <pulsar.version>2.9.1</pulsar.version>
+ <bouncycastle.version>1.69</bouncycastle.version>
</properties>
<dependencies>
@@ -105,6 +106,46 @@ under the License.
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>bouncy-castle-bc</artifactId>
+ <version>${pulsar.version}</version>
+ <destFileName>bouncy-castle-bc.jar</destFileName>
+ <type>jar</type>
+ <outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+ </dependency>
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcpkix-jdk15on</artifactId>
+ <version>${bouncycastle.version}</version>
+ <destFileName>bcpkix-jdk15on.jar</destFileName>
+ <type>jar</type>
+ <outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+ </dependency>
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcprov-jdk15on</artifactId>
+ <version>${bouncycastle.version}</version>
+ <destFileName>bcprov-jdk15on.jar</destFileName>
+ <type>jar</type>
+ <outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+ </dependency>
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcutil-jdk15on</artifactId>
+ <version>${bouncycastle.version}</version>
+ <destFileName>bcutil-jdk15on.jar</destFileName>
+ <type>jar</type>
+ <outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+ </dependency>
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcprov-ext-jdk15on</artifactId>
+ <version>${bouncycastle.version}</version>
+ <destFileName>bcprov-ext-jdk15on.jar</destFileName>
+ <type>jar</type>
+ <outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+ </dependency>
<artifactItem>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
index 52957fc..ccfe277 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
@@ -37,6 +37,11 @@ public class FlinkContainerWithPulsarEnvironment extends FlinkContainerTestEnvir
resourcePath("pulsar-client-all.jar"),
resourcePath("pulsar-client-api.jar"),
resourcePath("pulsar-admin-api.jar"),
+ resourcePath("bouncy-castle-bc.jar"),
+ resourcePath("bcpkix-jdk15on.jar"),
+ resourcePath("bcprov-jdk15on.jar"),
+ resourcePath("bcutil-jdk15on.jar"),
+ resourcePath("bcprov-ext-jdk15on.jar"),
resourcePath("jul-to-slf4j.jar"));
}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
index 04298b4..273cee8 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
@@ -42,7 +42,7 @@ public class DockerImageVersions {
public static final String LOCALSTACK = "localstack/localstack:0.13.3";
- public static final String PULSAR = "apachepulsar/pulsar:2.8.0";
+ public static final String PULSAR = "apachepulsar/pulsar:2.9.1";
public static final String CASSANDRA_3 = "cassandra:3.0";