You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2022/09/30 04:29:18 UTC

[flink] 01/06: [FLINK-26182][Connector/Pulsar] Drop the embedded Pulsar runtime for in favor of mock Pulsar runtime with local metastore support.

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dd33ad5f7901f4cb04c101df2ca74ce84cadb817
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Tue Sep 27 18:04:23 2022 +0800

    [FLINK-26182][Connector/Pulsar] Drop the embedded Pulsar runtime for in favor of mock Pulsar runtime with local metastore support.
---
 .../pulsar/testutils/runtime/PulsarRuntime.java    |  17 +-
 .../runtime/embedded/PulsarEmbeddedRuntime.java    | 184 ---------------------
 2 files changed, 2 insertions(+), 199 deletions(-)

diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
index 92dd94c6657..6beb94a68c4 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
@@ -20,7 +20,6 @@ package org.apache.flink.connector.pulsar.testutils.runtime;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
 import org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime;
-import org.apache.flink.connector.pulsar.testutils.runtime.embedded.PulsarEmbeddedRuntime;
 import org.apache.flink.connector.pulsar.testutils.runtime.mock.PulsarMockRuntime;
 
 import org.testcontainers.containers.GenericContainer;
@@ -50,22 +49,10 @@ public interface PulsarRuntime {
         return new PulsarMockRuntime();
     }
 
-    /**
-     * Create a standalone Pulsar instance in test thread. We would start an embedded zookeeper and
-     * bookkeeper. The stream storage for bookkeeper is disabled. The function worker is disabled on
-     * Pulsar broker.
-     *
-     * <p>This runtime would be faster than {@link #container()} and behaves the same as the {@link
-     * #container()}.
-     */
-    static PulsarRuntime embedded() {
-        return new PulsarEmbeddedRuntime();
-    }
-
     /**
      * Create a Pulsar instance in docker. We would start a standalone Pulsar in TestContainers.
      * This runtime is often used in end-to-end tests. The performance may be a bit of slower than
-     * {@link #embedded()}. The stream storage for bookkeeper is disabled. The function worker is
+     * {@link #mock()}. The stream storage for bookkeeper is disabled. The function worker is
      * disabled on Pulsar broker.
      */
     static PulsarRuntime container() {
@@ -75,7 +62,7 @@ public interface PulsarRuntime {
     /**
      * Create a Pulsar instance in docker. We would start a standalone Pulsar in TestContainers.
      * This runtime is often used in end-to-end tests. The performance may be a bit of slower than
-     * {@link #embedded()}. The stream storage for bookkeeper is disabled. The function worker is
+     * {@link #mock()}. The stream storage for bookkeeper is disabled. The function worker is
      * disabled on Pulsar broker.
      *
      * <p>We would link the created Pulsar docker instance with the given flink instance. This would
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/embedded/PulsarEmbeddedRuntime.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/embedded/PulsarEmbeddedRuntime.java
deleted file mode 100644
index 2ca9a51f3c5..00000000000
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/embedded/PulsarEmbeddedRuntime.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.runtime.embedded;
-
-import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
-import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
-import org.apache.flink.util.FileUtils;
-
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
-import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Optional;
-
-import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeUtils.initializePulsarEnvironment;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.pulsar.broker.ServiceConfigurationUtils.brokerUrl;
-import static org.apache.pulsar.broker.ServiceConfigurationUtils.webServiceUrl;
-
-/** Providing a embedded pulsar server. We use this runtime for transaction related tests. */
-public class PulsarEmbeddedRuntime implements PulsarRuntime {
-    private static final Logger LOG = LoggerFactory.getLogger(PulsarEmbeddedRuntime.class);
-
-    private static final String CONFIG_FILE_PATH;
-
-    static {
-        // Find the absolute path for containers/txnStandalone.conf
-        ClassLoader classLoader = PulsarEmbeddedRuntime.class.getClassLoader();
-        URL resource = classLoader.getResource("containers/txnStandalone.conf");
-        File file = new File(checkNotNull(resource).getFile());
-        CONFIG_FILE_PATH = file.getAbsolutePath();
-    }
-
-    private final Path tempDir;
-
-    private LocalBookkeeperEnsemble bookkeeper;
-    private PulsarService pulsarService;
-    private PulsarRuntimeOperator operator;
-
-    public PulsarEmbeddedRuntime() {
-        this.tempDir = createTempDir();
-    }
-
-    @Override
-    public void startUp() {
-        try {
-            startBookkeeper();
-            startPulsarService();
-
-            // Create the operator.
-            this.operator = new PulsarRuntimeOperator(serviceUrl(), adminUrl());
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-    @Override
-    public void tearDown() {
-        try {
-            if (operator != null) {
-                operator.close();
-            }
-            if (pulsarService != null) {
-                pulsarService.close();
-            }
-            if (bookkeeper != null) {
-                bookkeeper.stop();
-            }
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        } finally {
-            removeTempDir(tempDir);
-        }
-    }
-
-    @Override
-    public PulsarRuntimeOperator operator() {
-        return checkNotNull(operator, "You should start this embedded Pulsar first.");
-    }
-
-    private Path createTempDir() {
-        try {
-            return Files.createTempDirectory("pulsar");
-        } catch (IOException e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-    private void removeTempDir(Path tempDir) {
-        try {
-            FileUtils.deleteDirectory(tempDir.normalize().toFile());
-        } catch (IOException e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-    public void startBookkeeper() throws Exception {
-        Path zkPath = Paths.get("data", "standalone", "zookeeper");
-        Path bkPath = Paths.get("data", "standalone", "bookkeeper");
-
-        String zkDir = tempDir.resolve(zkPath).normalize().toString();
-        String bkDir = tempDir.resolve(bkPath).normalize().toString();
-
-        ServerConfiguration bkServerConf = new ServerConfiguration();
-        bkServerConf.loadConf(new File(CONFIG_FILE_PATH).toURI().toURL());
-        this.bookkeeper = new LocalBookkeeperEnsemble(1, 0, 0, zkDir, bkDir, true, "127.0.0.1");
-
-        // Start Bookkeeper & zookeeper.
-        bookkeeper.startStandalone(bkServerConf, false);
-    }
-
-    private void startPulsarService() throws Exception {
-        ServiceConfiguration config;
-        try (FileInputStream inputStream = new FileInputStream(CONFIG_FILE_PATH)) {
-            config = PulsarConfigurationLoader.create(inputStream, ServiceConfiguration.class);
-        } catch (IOException e) {
-            throw new IllegalStateException(e);
-        }
-
-        // Use runtime dynamic ports for broker.
-        config.setAdvertisedAddress("127.0.0.1");
-        config.setClusterName("standalone");
-
-        // Use random port.
-        config.setBrokerServicePort(Optional.of(0));
-        config.setWebServicePort(Optional.of(0));
-
-        // Select available port for bookkeeper and zookeeper.
-        int zkPort = getZkPort();
-        String zkConnect = "127.0.0.1" + ":" + zkPort;
-        config.setZookeeperServers(zkConnect);
-        config.setConfigurationStoreServers(zkConnect);
-        config.setRunningStandalone(true);
-
-        this.pulsarService = new PulsarService(config);
-
-        // Start Pulsar Broker.
-        pulsarService.start();
-
-        // Create sample data environment.
-        initializePulsarEnvironment(config, serviceUrl(), adminUrl());
-    }
-
-    private int getZkPort() {
-        return checkNotNull(bookkeeper).getZookeeperPort();
-    }
-
-    private String serviceUrl() {
-        Integer port = pulsarService.getBrokerListenPort().orElseThrow(IllegalStateException::new);
-        return brokerUrl("127.0.0.1", port);
-    }
-
-    private String adminUrl() {
-        Integer port = pulsarService.getListenPortHTTP().orElseThrow(IllegalArgumentException::new);
-        return webServiceUrl("127.0.0.1", port);
-    }
-}