You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/02/16 09:41:03 UTC

[flink] 01/09: [FLINK-24246][connector/pulsar] Bump PulsarClient version to latest 2.9.1

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1602e4b7d26cf52cea993c410769b7b15a672aff
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Wed Feb 9 13:00:53 2022 +0800

    [FLINK-24246][connector/pulsar] Bump PulsarClient version to latest 2.9.1
    
    1. Bump the pulsar-client-all version in pom file.
    2. Exclude useless dependencies for pulsar-client-all.
    3. Bump the Pulsar docker version.
    4. Change the dependencies to pass the tests.
    5. Drop PulsarTransactionUtils and fix compile issues in tests.
    6. Add bouncycastle to Pulsar e2e tests.
---
 flink-connectors/flink-connector-pulsar/pom.xml    |  74 +++++++++++--
 .../common/utils/PulsarTransactionUtils.java       | 118 ---------------------
 .../split/PulsarUnorderedPartitionSplitReader.java |   3 +-
 .../PulsarDeserializationSchemaTest.java           |   2 +-
 .../src/main/resources/META-INF/NOTICE             |  16 +--
 .../flink-end-to-end-tests-pulsar/pom.xml          |  43 +++++++-
 .../FlinkContainerWithPulsarEnvironment.java       |   5 +
 .../org/apache/flink/util/DockerImageVersions.java |   2 +-
 8 files changed, 124 insertions(+), 139 deletions(-)

diff --git a/flink-connectors/flink-connector-pulsar/pom.xml b/flink-connectors/flink-connector-pulsar/pom.xml
index 87b6ba0..45047eb 100644
--- a/flink-connectors/flink-connector-pulsar/pom.xml
+++ b/flink-connectors/flink-connector-pulsar/pom.xml
@@ -36,12 +36,14 @@ under the License.
 	<packaging>jar</packaging>
 
 	<properties>
-		<pulsar.version>2.8.0</pulsar.version>
+		<pulsar.version>2.9.1</pulsar.version>
 
 		<!-- Test Libraries -->
 		<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
-		<commons-lang3.version>3.11</commons-lang3.version>
-		<grpc.version>1.33.0</grpc.version>
+		<pulsar-commons-lang3.version>3.11</pulsar-commons-lang3.version>
+		<pulsar-zookeeper.version>3.6.3</pulsar-zookeeper.version>
+		<pulsar-netty.version>4.1.72.Final</pulsar-netty.version>
+		<pulsar-grpc.version>1.33.0</pulsar-grpc.version>
 	</properties>
 
 	<dependencies>
@@ -138,12 +140,22 @@ under the License.
 			<version>${pulsar.version}</version>
 			<scope>test</scope>
 		</dependency>
+
 		<!-- Pulsar use a newer commons-lang3 in broker. -->
 		<!-- Bump the version only for testing. -->
 		<dependency>
 			<groupId>org.apache.commons</groupId>
 			<artifactId>commons-lang3</artifactId>
-			<version>${commons-lang3.version}</version>
+			<version>${pulsar-commons-lang3.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<!-- Pulsar use a newer zookeeper in broker. -->
+		<!-- Bump the version only for testing. -->
+		<dependency>
+			<groupId>org.apache.zookeeper</groupId>
+			<artifactId>zookeeper</artifactId>
+			<version>${pulsar-zookeeper.version}</version>
 			<scope>test</scope>
 		</dependency>
 
@@ -156,9 +168,41 @@ under the License.
 			<version>${pulsar.version}</version>
 			<exclusions>
 				<exclusion>
+					<groupId>com.sun.activation</groupId>
+					<artifactId>javax.activation</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>jakarta.activation</groupId>
+					<artifactId>jakarta.activation-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>jakarta.ws.rs</groupId>
+					<artifactId>jakarta.ws.rs-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>jakarta.xml.bind</groupId>
+					<artifactId>jakarta.xml.bind-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.validation</groupId>
+					<artifactId>validation-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.xml.bind</groupId>
+					<artifactId>jaxb-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>net.jcip</groupId>
+					<artifactId>jcip-annotations</artifactId>
+				</exclusion>
+				<exclusion>
 					<groupId>org.apache.pulsar</groupId>
 					<artifactId>pulsar-package-core</artifactId>
 				</exclusion>
+				<exclusion>
+					<groupId>com.beust</groupId>
+					<artifactId>jcommander</artifactId>
+				</exclusion>
 			</exclusions>
 		</dependency>
 
@@ -171,13 +215,23 @@ under the License.
 		</dependency>
 	</dependencies>
 
-	<!-- gRPC use version range which don't support by flink ci. -->
+
 	<dependencyManagement>
 		<dependencies>
+			<!-- Pulsar use higher gRPC version. -->
 			<dependency>
 				<groupId>io.grpc</groupId>
 				<artifactId>grpc-bom</artifactId>
-				<version>${grpc.version}</version>
+				<version>${pulsar-grpc.version}</version>
+				<type>pom</type>
+				<scope>import</scope>
+			</dependency>
+
+			<!-- Pulsar use higher netty version. -->
+			<dependency>
+				<groupId>io.netty</groupId>
+				<artifactId>netty-bom</artifactId>
+				<version>${pulsar-netty.version}</version>
 				<type>pom</type>
 				<scope>import</scope>
 			</dependency>
@@ -200,7 +254,9 @@ under the License.
 				<configuration>
 					<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
 					<forkCount>1</forkCount>
-					<argLine>-Xms256m -Xmx2048m -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit -Duser.country=US -Duser.language=en</argLine>
+					<argLine>-Xms256m -Xmx2048m -Dmvn.forkNumber=${surefire.forkNumber}
+						-XX:-UseGCOverheadLimit -Duser.country=US -Duser.language=en
+					</argLine>
 				</configuration>
 			</plugin>
 			<plugin>
@@ -222,7 +278,9 @@ under the License.
 					<outputDirectory>
 						${project.build.directory}/generated-test-sources/protobuf/java
 					</outputDirectory>
-					<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
+					<protocArtifact>
+						com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
+					</protocArtifact>
 				</configuration>
 				<executions>
 					<execution>
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java
deleted file mode 100644
index ef54779..0000000
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.common.utils;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.pulsar.client.api.transaction.Transaction;
-import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.client.impl.transaction.TransactionImpl;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Transaction was introduced into pulsar since 2.7.0, but the interface {@link Transaction} didn't
- * provide a id method until 2.8.1. We have to add this util for acquiring the {@link TxnID} for
- * compatible consideration.
- *
- * <p>TODO Remove this hack after pulsar 2.8.1 release.
- */
-@Internal
-@SuppressWarnings("java:S3011")
-public final class PulsarTransactionUtils {
-
-    private static volatile Field mostBitsField;
-    private static volatile Field leastBitsField;
-
-    private PulsarTransactionUtils() {
-        // No public constructor
-    }
-
-    public static TxnID getId(Transaction transaction) {
-        // 2.8.1 and after.
-        try {
-            Method getId = Transaction.class.getDeclaredMethod("getTxnID");
-            return (TxnID) getId.invoke(transaction);
-        } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
-            // 2.8.0 and before.
-            TransactionImpl impl = (TransactionImpl) transaction;
-            Long txnIdMostBits = getTxnIdMostBits(impl);
-            Long txnIdLeastBits = getTxnIdLeastBits(impl);
-
-            checkNotNull(txnIdMostBits, "Failed to get txnIdMostBits");
-            checkNotNull(txnIdLeastBits, "Failed to get txnIdLeastBits");
-
-            return new TxnID(txnIdMostBits, txnIdLeastBits);
-        }
-    }
-
-    private static Long getTxnIdMostBits(TransactionImpl transaction) {
-        if (mostBitsField == null) {
-            synchronized (PulsarTransactionUtils.class) {
-                if (mostBitsField == null) {
-                    try {
-                        mostBitsField = TransactionImpl.class.getDeclaredField("txnIdMostBits");
-                        mostBitsField.setAccessible(true);
-                    } catch (NoSuchFieldException e) {
-                        // Nothing to do for this exception.
-                    }
-                }
-            }
-        }
-
-        if (mostBitsField != null) {
-            try {
-                return (Long) mostBitsField.get(transaction);
-            } catch (IllegalAccessException e) {
-                // Nothing to do for this exception.
-            }
-        }
-
-        return null;
-    }
-
-    private static Long getTxnIdLeastBits(TransactionImpl transaction) {
-        if (leastBitsField == null) {
-            synchronized (PulsarTransactionUtils.class) {
-                if (leastBitsField == null) {
-                    try {
-                        leastBitsField = TransactionImpl.class.getDeclaredField("txnIdLeastBits");
-                        leastBitsField.setAccessible(true);
-                    } catch (NoSuchFieldException e) {
-                        // Nothing to do for this exception.
-                    }
-                }
-            }
-        }
-
-        if (leastBitsField != null) {
-            try {
-                return (Long) leastBitsField.get(transaction);
-            } catch (IllegalAccessException e) {
-                // Nothing to do for this exception.
-            }
-        }
-
-        return null;
-    }
-}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
index 846101d..7262863 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
@@ -20,7 +20,6 @@ package org.apache.flink.connector.pulsar.source.reader.split;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
 import org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader;
@@ -155,7 +154,7 @@ public class PulsarUnorderedPartitionSplitReader<OUT> extends PulsarPartitionSpl
 
         // Avoiding NP problem when Pulsar don't get the message before Flink checkpoint.
         if (uncommittedTransaction != null) {
-            TxnID txnID = PulsarTransactionUtils.getId(uncommittedTransaction);
+            TxnID txnID = uncommittedTransaction.getTxnID();
             this.uncommittedTransaction = newTransaction();
             state.setUncommittedTransactionId(txnID);
         }
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
index aa4bcee..48e6e7a 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
@@ -113,7 +113,7 @@ class PulsarDeserializationSchemaTest {
         MessageMetadata metadata = new MessageMetadata();
         ByteBuffer payload = ByteBuffer.wrap(bytes);
 
-        return MessageImpl.create(metadata, payload, Schema.BYTES);
+        return MessageImpl.create(metadata, payload, Schema.BYTES, "");
     }
 
     /** This collector is used for collecting only one message. Used for test purpose. */
diff --git a/flink-connectors/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE b/flink-connectors/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
index 79ebbfc..56ad187 100644
--- a/flink-connectors/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
+++ b/flink-connectors/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
@@ -6,12 +6,12 @@ The Apache Software Foundation (http://www.apache.org/).
 
 This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
-- org.apache.pulsar:bouncy-castle-bc:pkg:2.8.0
-- org.apache.pulsar:pulsar-client-admin-api:2.8.0
-- org.apache.pulsar:pulsar-client-all:2.8.0
-- org.apache.pulsar:pulsar-client-api:2.8.0
-- org.bouncycastle:bcpkix-jdk15on:1.68
-- org.bouncycastle:bcprov-ext-jdk15on:1.68
-- org.bouncycastle:bcprov-jdk15on:1.68
-- org.bouncycastle:bcutil-jdk15on:1.68
+- org.apache.pulsar:bouncy-castle-bc:pkg:2.9.1
+- org.apache.pulsar:pulsar-client-admin-api:2.9.1
+- org.apache.pulsar:pulsar-client-all:2.9.1
+- org.apache.pulsar:pulsar-client-api:2.9.1
+- org.bouncycastle:bcpkix-jdk15on:1.69
+- org.bouncycastle:bcprov-ext-jdk15on:1.69
+- org.bouncycastle:bcprov-jdk15on:1.69
+- org.bouncycastle:bcutil-jdk15on:1.69
 - org.slf4j:jul-to-slf4j:1.7.25
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml
index e7caf8b..7c87ec7 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml
@@ -31,7 +31,8 @@ under the License.
 	<name>Flink : E2E Tests : Pulsar</name>
 
 	<properties>
-		<pulsar.version>2.8.0</pulsar.version>
+		<pulsar.version>2.9.1</pulsar.version>
+		<bouncycastle.version>1.69</bouncycastle.version>
 	</properties>
 
 	<dependencies>
@@ -105,6 +106,46 @@ under the License.
 							<type>jar</type>
 							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
 						</artifactItem>
+						<dependency>
+							<groupId>org.apache.pulsar</groupId>
+							<artifactId>bouncy-castle-bc</artifactId>
+							<version>${pulsar.version}</version>
+							<destFileName>bouncy-castle-bc.jar</destFileName>
+							<type>jar</type>
+							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+						</dependency>
+						<dependency>
+							<groupId>org.bouncycastle</groupId>
+							<artifactId>bcpkix-jdk15on</artifactId>
+							<version>${bouncycastle.version}</version>
+							<destFileName>bcpkix-jdk15on.jar</destFileName>
+							<type>jar</type>
+							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+						</dependency>
+						<dependency>
+							<groupId>org.bouncycastle</groupId>
+							<artifactId>bcprov-jdk15on</artifactId>
+							<version>${bouncycastle.version}</version>
+							<destFileName>bcprov-jdk15on.jar</destFileName>
+							<type>jar</type>
+							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+						</dependency>
+						<dependency>
+							<groupId>org.bouncycastle</groupId>
+							<artifactId>bcutil-jdk15on</artifactId>
+							<version>${bouncycastle.version}</version>
+							<destFileName>bcutil-jdk15on.jar</destFileName>
+							<type>jar</type>
+							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+						</dependency>
+						<dependency>
+							<groupId>org.bouncycastle</groupId>
+							<artifactId>bcprov-ext-jdk15on</artifactId>
+							<version>${bouncycastle.version}</version>
+							<destFileName>bcprov-ext-jdk15on.jar</destFileName>
+							<type>jar</type>
+							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+						</dependency>
 						<artifactItem>
 							<groupId>org.slf4j</groupId>
 							<artifactId>jul-to-slf4j</artifactId>
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
index 52957fc..ccfe277 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
@@ -37,6 +37,11 @@ public class FlinkContainerWithPulsarEnvironment extends FlinkContainerTestEnvir
                 resourcePath("pulsar-client-all.jar"),
                 resourcePath("pulsar-client-api.jar"),
                 resourcePath("pulsar-admin-api.jar"),
+                resourcePath("bouncy-castle-bc.jar"),
+                resourcePath("bcpkix-jdk15on.jar"),
+                resourcePath("bcprov-jdk15on.jar"),
+                resourcePath("bcutil-jdk15on.jar"),
+                resourcePath("bcprov-ext-jdk15on.jar"),
                 resourcePath("jul-to-slf4j.jar"));
     }
 
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
index 04298b4..273cee8 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
@@ -42,7 +42,7 @@ public class DockerImageVersions {
 
     public static final String LOCALSTACK = "localstack/localstack:0.13.3";
 
-    public static final String PULSAR = "apachepulsar/pulsar:2.8.0";
+    public static final String PULSAR = "apachepulsar/pulsar:2.9.1";
 
     public static final String CASSANDRA_3 = "cassandra:3.0";