You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/12/12 18:33:18 UTC

[flink-connector-pulsar] 13/31: [FLINK-26025][connector/pulsar] Replace MockPulsar with new Pulsar test tools based on PulsarStandalone.

This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit 9c98b83104a15a58d16cb0c753e201f33044a903
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Wed Feb 9 15:09:50 2022 +0800

    [FLINK-26025][connector/pulsar] Replace MockPulsar with new Pulsar test tools based on PulsarStandalone.
    
    1. Drop some unused fields in test classes.
    2. Fix the checkstyle issues for source test.
    3. Fix violations for Pulsar connector according to the flink-architecture-tests.
    4. Create a standalone Pulsar for test.
    5. Add new methods to PulsarRuntimeOperator.
    6. Fix the bug in PulsarContainerRuntime, support running tests in E2E environment.
    7. Create PulsarContainerTestEnvironment for supporting E2E tests.
    8. Add a lot of comments for Pulsar testing tools.
    9. Drop mocked Pulsar service, use standalone Pulsar instead.
---
 .../util/pulsar/PulsarSourceOrderedE2ECase.java    |  7 ++---
 .../util/pulsar/PulsarSourceUnorderedE2ECase.java  |  7 ++---
 .../pulsar/cases/ExclusiveSubscriptionContext.java | 14 ----------
 .../pulsar/cases/FailoverSubscriptionContext.java  | 14 ----------
 .../pulsar/cases/KeySharedSubscriptionContext.java |  7 ++---
 .../pulsar/cases/SharedSubscriptionContext.java    |  7 ++---
 .../common/PulsarContainerTestEnvironment.java     | 31 ++++++++++++++++++++++
 7 files changed, 39 insertions(+), 48 deletions(-)

diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
index 7d22e80..502b41d 100644
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.tests.util.pulsar;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -29,8 +28,7 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.tests.util.pulsar.cases.ExclusiveSubscriptionContext;
 import org.apache.flink.tests.util.pulsar.cases.FailoverSubscriptionContext;
 import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
-
-import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.container;
+import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
 
 /**
  * Pulsar E2E test based on connector testing framework. It's used for Failover & Exclusive
@@ -48,8 +46,7 @@ public class PulsarSourceOrderedE2ECase extends SourceTestSuiteBase<String> {
 
     // Defines ConnectorExternalSystem.
     @TestExternalSystem
-    PulsarTestEnvironment pulsar =
-            new PulsarTestEnvironment(container(flink.getFlinkContainers().getJobManager()));
+    PulsarContainerTestEnvironment pulsar = new PulsarContainerTestEnvironment(flink);
 
     // Defines a set of external context Factories for different test cases.
     @TestContext
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
index d14d8f9..5039048 100644
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.tests.util.pulsar;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -28,10 +27,9 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.tests.util.pulsar.cases.KeySharedSubscriptionContext;
 import org.apache.flink.tests.util.pulsar.cases.SharedSubscriptionContext;
 import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
+import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
 import org.apache.flink.tests.util.pulsar.common.UnorderedSourceTestSuiteBase;
 
-import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.container;
-
 /**
  * Pulsar E2E test based on connector testing framework. It's used for Shared & Key_Shared
  * subscription.
@@ -48,8 +46,7 @@ public class PulsarSourceUnorderedE2ECase extends UnorderedSourceTestSuiteBase<S
 
     // Defines ConnectorExternalSystem.
     @TestExternalSystem
-    PulsarTestEnvironment pulsar =
-            new PulsarTestEnvironment(container(flink.getFlinkContainers().getJobManager()));
+    PulsarContainerTestEnvironment pulsar = new PulsarContainerTestEnvironment(flink);
 
     // Defines a set of external context Factories for different test cases.
     @SuppressWarnings("unused")
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
index 1245e14..6fea0c9 100644
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
@@ -27,12 +27,8 @@ import java.net.URL;
 import java.util.Collections;
 import java.util.List;
 
-import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
-import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
-
 /** We would consume from test splits by using {@link SubscriptionType#Exclusive} subscription. */
 public class ExclusiveSubscriptionContext extends MultipleTopicTemplateContext {
-    private static final long serialVersionUID = 1L;
 
     public ExclusiveSubscriptionContext(PulsarTestEnvironment environment) {
         this(environment, Collections.emptyList());
@@ -57,14 +53,4 @@ public class ExclusiveSubscriptionContext extends MultipleTopicTemplateContext {
     protected SubscriptionType subscriptionType() {
         return SubscriptionType.Exclusive;
     }
-
-    @Override
-    protected String serviceUrl() {
-        return PULSAR_SERVICE_URL;
-    }
-
-    @Override
-    protected String adminUrl() {
-        return PULSAR_ADMIN_URL;
-    }
 }
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
index 8ec1685..c473488 100644
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
@@ -27,12 +27,8 @@ import java.net.URL;
 import java.util.Collections;
 import java.util.List;
 
-import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
-import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
-
 /** We would consume from test splits by using {@link SubscriptionType#Failover} subscription. */
 public class FailoverSubscriptionContext extends MultipleTopicTemplateContext {
-    private static final long serialVersionUID = 1L;
 
     public FailoverSubscriptionContext(PulsarTestEnvironment environment) {
         this(environment, Collections.emptyList());
@@ -57,14 +53,4 @@ public class FailoverSubscriptionContext extends MultipleTopicTemplateContext {
     protected SubscriptionType subscriptionType() {
         return SubscriptionType.Failover;
     }
-
-    @Override
-    protected String serviceUrl() {
-        return PULSAR_SERVICE_URL;
-    }
-
-    @Override
-    protected String adminUrl() {
-        return PULSAR_ADMIN_URL;
-    }
 }
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
index 303783a..5ad369b 100644
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
@@ -46,13 +46,10 @@ import static java.util.Collections.singletonList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE;
 import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
-import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
-import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
 import static org.apache.pulsar.client.api.Schema.STRING;
 
 /** We would consume from test splits by using {@link SubscriptionType#Key_Shared} subscription. */
 public class KeySharedSubscriptionContext extends PulsarTestContext<String> {
-    private static final long serialVersionUID = 1L;
 
     private int index = 0;
 
@@ -92,8 +89,8 @@ public class KeySharedSubscriptionContext extends PulsarTestContext<String> {
         PulsarSourceBuilder<String> builder =
                 PulsarSource.builder()
                         .setDeserializationSchema(pulsarSchema(STRING))
-                        .setServiceUrl(PULSAR_SERVICE_URL)
-                        .setAdminUrl(PULSAR_ADMIN_URL)
+                        .setServiceUrl(operator.serviceUrl())
+                        .setAdminUrl(operator.adminUrl())
                         .setTopicPattern(
                                 "pulsar-[0-9]+-key-shared", RegexSubscriptionMode.AllTopics)
                         .setSubscriptionType(SubscriptionType.Key_Shared)
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
index de53595..1a2db66 100644
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
@@ -40,13 +40,10 @@ import java.util.Collections;
 import java.util.List;
 
 import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
-import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
-import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
 import static org.apache.pulsar.client.api.Schema.STRING;
 
 /** We would consuming from test splits by using {@link SubscriptionType#Shared} subscription. */
 public class SharedSubscriptionContext extends PulsarTestContext<String> {
-    private static final long serialVersionUID = 1L;
 
     private int index = 0;
 
@@ -71,8 +68,8 @@ public class SharedSubscriptionContext extends PulsarTestContext<String> {
         PulsarSourceBuilder<String> builder =
                 PulsarSource.builder()
                         .setDeserializationSchema(pulsarSchema(STRING))
-                        .setServiceUrl(PULSAR_SERVICE_URL)
-                        .setAdminUrl(PULSAR_ADMIN_URL)
+                        .setServiceUrl(operator.serviceUrl())
+                        .setAdminUrl(operator.adminUrl())
                         .setTopicPattern("pulsar-[0-9]+-shared", RegexSubscriptionMode.AllTopics)
                         .setSubscriptionType(SubscriptionType.Shared)
                         .setSubscriptionName("pulsar-shared");
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestEnvironment.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestEnvironment.java
new file mode 100644
index 0000000..654347b
--- /dev/null
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestEnvironment.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.common;
+
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+
+import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.container;
+
+/** This test environment is used for create a Pulsar standalone instance for e2e tests. */
+public class PulsarContainerTestEnvironment extends PulsarTestEnvironment {
+
+    public PulsarContainerTestEnvironment(FlinkContainerWithPulsarEnvironment flinkEnvironment) {
+        super(container(flinkEnvironment.getFlinkContainers().getJobManager()));
+    }
+}