You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2022/04/18 04:41:22 UTC

[pinot] branch master updated: Ensure all records are pushed in Pulsar Consumer Test (#8554)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 82f7aefe3d Ensure all records are pushed in Pulsar Consumer Test (#8554)
82f7aefe3d is described below

commit 82f7aefe3d0e9e9cee5ba519279a1425259210ce
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Mon Apr 18 10:11:16 2022 +0530

    Ensure all records are pushed in Pulsar Consumer Test (#8554)
    
    * Use testcontainers instead of standalone CLI; Enable checks for all records to get pushed
    
    * remove pulsar-broker dependency
    
    * Add debug logging to pulsar consumer
    
    * Adding re-runs just to verify flakiness, will remove later
    
    * Increase timeout
    
    * lint fix
    
    * remove re-runs and increase timeout
    
    Co-authored-by: Kartik Khare <kh...@Kartiks-MacBook-Pro.local>
---
 .../pinot-stream-ingestion/pinot-pulsar/pom.xml    | 154 +---
 .../plugin/stream/pulsar/PulsarConsumerTest.java   | 128 ++-
 .../stream/pulsar/PulsarStandaloneCluster.java     | 132 ---
 .../src/test/resources/standalone.properties       | 904 ---------------------
 4 files changed, 111 insertions(+), 1207 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml
index d9446d1e53..3956d64284 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml
@@ -68,6 +68,12 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>pulsar</artifactId>
+      <version>1.17.1</version>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.pulsar</groupId>
       <artifactId>pulsar-client-original</artifactId>
@@ -108,154 +114,6 @@
       <artifactId>pulsar-client-admin-original</artifactId>
       <version>${pulsar.version}</version>
     </dependency>
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-broker</artifactId>
-      <version>${pulsar.version}</version>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.pulsar</groupId>
-          <artifactId>pulsar-client-original</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.pulsar</groupId>
-          <artifactId>pulsar-client-admin-original</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.netty</groupId>
-          <artifactId>netty-resolver</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>javax.ws.rs</groupId>
-          <artifactId>javax.ws.rs-api</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.prometheus</groupId>
-          <artifactId>simpleclient_common</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.google.api.grpc</groupId>
-          <artifactId>proto-google-common-protos</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.grpc</groupId>
-          <artifactId>grpc-context</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.netty</groupId>
-          <artifactId>netty-tcnative-boringssl-static</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>commons-codec</groupId>
-          <artifactId>commons-codec</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.google.errorprone</groupId>
-          <artifactId>error_prone_annotations</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.glassfish.jersey.containers</groupId>
-          <artifactId>jersey-container-servlet-core</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.prometheus</groupId>
-          <artifactId>simpleclient</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.eclipse.jetty</groupId>
-          <artifactId>jetty-server</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.eclipse.jetty</groupId>
-          <artifactId>jetty-servlet</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.eclipse.jetty</groupId>
-          <artifactId>jetty-util</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.eclipse.jetty</groupId>
-          <artifactId>jetty-io</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.eclipse.jetty</groupId>
-          <artifactId>jetty-http</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.squareup.okio</groupId>
-          <artifactId>okio</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.prometheus</groupId>
-          <artifactId>simpleclient_hotspot</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.swagger</groupId>
-          <artifactId>swagger-annotations</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.grpc</groupId>
-          <artifactId>grpc-protobuf-lite</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.opencensus</groupId>
-          <artifactId>opencensus-contrib-grpc-metrics</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.opencensus</groupId>
-          <artifactId>opencensus-api</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.commons</groupId>
-          <artifactId>commons-collections4</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>javax.annotation</groupId>
-          <artifactId>javax.annotation-api</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.codehaus.mojo</groupId>
-          <artifactId>animal-sniffer-annotations</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.github.ben-manes.caffeine</groupId>
-          <artifactId>caffeine</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.beust</groupId>
-          <artifactId>jcommander</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>javax.servlet</groupId>
-          <artifactId>javax.servlet-api</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.netty</groupId>
-          <artifactId>netty-codec-socks</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.netty</groupId>
-          <artifactId>netty-transport-native-unix-common</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>jakarta.ws.rs</groupId>
-          <artifactId>jakarta.ws.rs-api</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.bouncycastle</groupId>
-          <artifactId>bcpkix-jdk15on</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.bouncycastle</groupId>
-          <artifactId>bcprov-ext-jdk15on</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.google.protobuf</groupId>
-          <artifactId>protobuf-java</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
     <dependency>
       <groupId>javax.servlet</groupId>
       <artifactId>javax.servlet-api</artifactId>
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
index 53ad46a728..527b76bc27 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
@@ -18,10 +18,13 @@
  */
 package org.apache.pinot.plugin.stream.pulsar;
 
+import com.google.common.base.Function;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.PartitionGroupConsumer;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
@@ -30,6 +33,7 @@ import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRouter;
@@ -40,6 +44,11 @@ import org.apache.pulsar.client.api.TopicMetadata;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.internal.DefaultImplementation;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.PulsarContainer;
+import org.testcontainers.utility.DockerImageName;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -47,7 +56,9 @@ import org.testng.annotations.Test;
 
 
 public class PulsarConsumerTest {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PulsarConsumerTest.class);
 
+  private static final DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:2.7.2");
   public static final String TABLE_NAME_WITH_TYPE = "tableName_REALTIME";
   public static final String TEST_TOPIC = "test-topic";
   public static final String TEST_TOPIC_BATCH = "test-topic-batch";
@@ -57,9 +68,10 @@ public class PulsarConsumerTest {
   public static final int NUM_PARTITION = 1;
   public static final int NUM_RECORDS_PER_PARTITION = 1000;
   public static final int BATCH_SIZE = 10;
+  public static final int CONSUMER_FETCH_TIMEOUT_MILLIS = (int) Duration.ofMinutes(5).toMillis();
 
   private PulsarClient _pulsarClient;
-  private PulsarStandaloneCluster _pulsarStandaloneCluster;
+  private PulsarContainer _pulsar = null;
   private HashMap<Integer, MessageId> _partitionToFirstMessageIdMap = new HashMap<>();
   private HashMap<Integer, MessageId> _partitionToFirstMessageIdMapBatch = new HashMap<>();
 
@@ -67,35 +79,79 @@ public class PulsarConsumerTest {
   public void setUp()
       throws Exception {
     try {
-      _pulsarStandaloneCluster = new PulsarStandaloneCluster();
+      _pulsar = new PulsarContainer(PULSAR_IMAGE).withStartupTimeout(Duration.ofMinutes(5));
+      _pulsar.start();
 
-      _pulsarStandaloneCluster.start();
+      // Waiting for namespace to be created.
+      // There should be a better approach.
+      Thread.sleep(20 * 1000L);
 
-      PulsarAdmin admin =
-          PulsarAdmin.builder().serviceHttpUrl("http://localhost:" + _pulsarStandaloneCluster.getAdminPort()).build();
+      PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(_pulsar.getHttpServiceUrl()).build();
 
-      String bootstrapServer = "pulsar://localhost:" + _pulsarStandaloneCluster.getBrokerPort();
+      String bootstrapServer = _pulsar.getPulsarBrokerUrl();
 
       _pulsarClient = PulsarClient.builder().serviceUrl(bootstrapServer).build();
 
-      admin.topics().createPartitionedTopic(TEST_TOPIC, NUM_PARTITION);
-      admin.topics().createPartitionedTopic(TEST_TOPIC_BATCH, NUM_PARTITION);
+      createTopics(admin);
 
       publishRecords();
       publishRecordsBatch();
+
+      waitForMessagesToPublish(admin, TEST_TOPIC);
+      waitForMessagesToPublish(admin, TEST_TOPIC_BATCH);
+
+      admin.close();
     } catch (Exception e) {
-      if (_pulsarStandaloneCluster != null) {
-        _pulsarStandaloneCluster.stop();
+      if (_pulsar != null) {
+        _pulsar.stop();
+        _pulsar = null;
       }
       throw new RuntimeException("Failed to setUp test environment", e);
     }
   }
 
+  private void createTopics(PulsarAdmin admin)
+      throws PulsarAdminException {
+    InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies();
+    inactiveTopicPolicies.setDeleteWhileInactive(false);
+    admin.namespaces().setInactiveTopicPolicies("public/default", inactiveTopicPolicies);
+
+    admin.topics().createPartitionedTopic(TEST_TOPIC, NUM_PARTITION);
+    admin.topics().createPartitionedTopic(TEST_TOPIC_BATCH, NUM_PARTITION);
+  }
+
+  private void waitForMessagesToPublish(PulsarAdmin admin, String topicName) {
+    waitForCondition(new Function<Void, Boolean>() {
+      @Nullable
+      @Override
+      public Boolean apply(@Nullable Void aVoid) {
+        try {
+          return getNumberOfEntries(admin, topicName) == NUM_RECORDS_PER_PARTITION * NUM_PARTITION;
+        } catch (Exception e) {
+          LOGGER.warn("Could not fetch number of messages in pulsar topic " + topicName, e);
+          return null;
+        }
+      }
+    }, 2000L, 60 * 1000L, "Failed to produce " + NUM_RECORDS_PER_PARTITION * NUM_PARTITION + " messages", true);
+  }
+
+  private long getNumberOfEntries(PulsarAdmin admin, String topicName) {
+    try {
+      return admin.topics().getPartitionedStats(topicName, false).msgInCounter;
+    } catch (Exception e) {
+      e.printStackTrace();
+      LOGGER.warn("Could not fetch number of rows in pulsar topic " + topicName, e);
+    }
+    return -1;
+  }
+
   @AfterClass
   public void tearDown()
       throws Exception {
-    if (_pulsarStandaloneCluster != null) {
-      _pulsarStandaloneCluster.stop();
+    if (_pulsar != null) {
+      _pulsar.stop();
+      _pulsarClient.close();
+      _pulsar = null;
     }
   }
 
@@ -149,7 +205,7 @@ public class PulsarConsumerTest {
 
   public StreamConfig getStreamConfig(String topicName) {
     String streamType = "pulsar";
-    String streamPulsarBrokerList = "pulsar://localhost:" + _pulsarStandaloneCluster.getBrokerPort();
+    String streamPulsarBrokerList = _pulsar.getPulsarBrokerUrl();
     String streamPulsarConsumerType = "simple";
     String tableNameWithType = TABLE_NAME_WITH_TYPE;
 
@@ -180,7 +236,8 @@ public class PulsarConsumerTest {
     final StreamConsumerFactory streamConsumerFactory =
         StreamConsumerFactoryProvider.create(getStreamConfig(TEST_TOPIC));
     int numPartitions =
-        new PulsarStreamMetadataProvider(CLIENT_ID, getStreamConfig(TEST_TOPIC)).fetchPartitionCount(10000);
+        new PulsarStreamMetadataProvider(CLIENT_ID, getStreamConfig(TEST_TOPIC)).fetchPartitionCount(
+            CONSUMER_FETCH_TIMEOUT_MILLIS);
 
     for (int partition = 0; partition < numPartitions; partition++) {
       PartitionGroupConsumptionStatus partitionGroupConsumptionStatus =
@@ -192,7 +249,8 @@ public class PulsarConsumerTest {
       final PartitionGroupConsumer consumer =
           streamConsumerFactory.createPartitionGroupConsumer(CLIENT_ID, partitionGroupConsumptionStatus);
       final MessageBatch messageBatch1 = consumer.fetchMessages(new MessageIdStreamOffset(MessageId.earliest),
-          new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 500)), 10000);
+          new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 500)),
+          CONSUMER_FETCH_TIMEOUT_MILLIS);
       Assert.assertEquals(messageBatch1.getMessageCount(), 500);
       for (int i = 0; i < messageBatch1.getMessageCount(); i++) {
         final byte[] msg = (byte[]) messageBatch1.getMessageAtIndex(i);
@@ -202,7 +260,7 @@ public class PulsarConsumerTest {
 
       final MessageBatch messageBatch2 =
           consumer.fetchMessages(new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 500)), null,
-              10000);
+              CONSUMER_FETCH_TIMEOUT_MILLIS);
       Assert.assertEquals(messageBatch2.getMessageCount(), 500);
       for (int i = 0; i < messageBatch2.getMessageCount(); i++) {
         final byte[] msg = (byte[]) messageBatch2.getMessageAtIndex(i);
@@ -212,7 +270,8 @@ public class PulsarConsumerTest {
 
       final MessageBatch messageBatch3 =
           consumer.fetchMessages(new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 10)),
-              new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 35)), 10000);
+              new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 35)),
+              CONSUMER_FETCH_TIMEOUT_MILLIS);
       Assert.assertEquals(messageBatch3.getMessageCount(), 25);
       for (int i = 0; i < messageBatch3.getMessageCount(); i++) {
         final byte[] msg = (byte[]) messageBatch3.getMessageAtIndex(i);
@@ -230,7 +289,8 @@ public class PulsarConsumerTest {
     final StreamConsumerFactory streamConsumerFactory =
         StreamConsumerFactoryProvider.create(getStreamConfig(TEST_TOPIC_BATCH));
     int numPartitions =
-        new PulsarStreamMetadataProvider(CLIENT_ID, getStreamConfig(TEST_TOPIC_BATCH)).fetchPartitionCount(10000);
+        new PulsarStreamMetadataProvider(CLIENT_ID, getStreamConfig(TEST_TOPIC_BATCH))
+            .fetchPartitionCount(CONSUMER_FETCH_TIMEOUT_MILLIS);
 
     for (int partition = 0; partition < numPartitions; partition++) {
       PartitionGroupConsumptionStatus partitionGroupConsumptionStatus =
@@ -241,8 +301,10 @@ public class PulsarConsumerTest {
 
       final PartitionGroupConsumer consumer =
           streamConsumerFactory.createPartitionGroupConsumer(CLIENT_ID, partitionGroupConsumptionStatus);
+      //TODO: This test failed, check it out.
       final MessageBatch messageBatch1 = consumer.fetchMessages(new MessageIdStreamOffset(MessageId.earliest),
-          new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 500)), 10000);
+          new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 500)),
+          CONSUMER_FETCH_TIMEOUT_MILLIS);
       Assert.assertEquals(messageBatch1.getMessageCount(), 500);
       for (int i = 0; i < messageBatch1.getMessageCount(); i++) {
         final byte[] msg = (byte[]) messageBatch1.getMessageAtIndex(i);
@@ -252,7 +314,7 @@ public class PulsarConsumerTest {
 
       final MessageBatch messageBatch2 =
           consumer.fetchMessages(new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 500)), null,
-              10000);
+              CONSUMER_FETCH_TIMEOUT_MILLIS);
       Assert.assertEquals(messageBatch2.getMessageCount(), 500);
       for (int i = 0; i < messageBatch2.getMessageCount(); i++) {
         final byte[] msg = (byte[]) messageBatch2.getMessageAtIndex(i);
@@ -262,7 +324,8 @@ public class PulsarConsumerTest {
 
       final MessageBatch messageBatch3 =
           consumer.fetchMessages(new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 10)),
-              new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 35)), 10000);
+              new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 35)),
+              CONSUMER_FETCH_TIMEOUT_MILLIS);
       Assert.assertEquals(messageBatch3.getMessageCount(), 25);
       for (int i = 0; i < messageBatch3.getMessageCount(); i++) {
         final byte[] msg = (byte[]) messageBatch3.getMessageAtIndex(i);
@@ -273,16 +336,35 @@ public class PulsarConsumerTest {
     }
   }
 
-  public MessageId getMessageIdForPartitionAndIndex(int partitionNum, int index) {
+  private MessageId getMessageIdForPartitionAndIndex(int partitionNum, int index) {
     MessageId startMessageIdRaw = _partitionToFirstMessageIdMap.get(partitionNum);
     MessageIdImpl startMessageId = MessageIdImpl.convertToMessageIdImpl(startMessageIdRaw);
     return DefaultImplementation.newMessageId(startMessageId.getLedgerId(), index, partitionNum);
   }
 
-  public MessageId getBatchMessageIdForPartitionAndIndex(int partitionNum, int index) {
+  private MessageId getBatchMessageIdForPartitionAndIndex(int partitionNum, int index) {
     MessageId startMessageIdRaw = _partitionToFirstMessageIdMapBatch.get(partitionNum);
     BatchMessageIdImpl startMessageId = (BatchMessageIdImpl) MessageIdImpl.convertToMessageIdImpl(startMessageIdRaw);
     return new BatchMessageIdImpl(startMessageId.getLedgerId(), index / BATCH_SIZE, partitionNum, index % BATCH_SIZE,
         startMessageId.getBatchSize(), startMessageId.getAcker());
   }
+
+  private void waitForCondition(Function<Void, Boolean> condition, long checkIntervalMs, long timeoutMs,
+      @Nullable String errorMessage, boolean raiseError) {
+    long endTime = System.currentTimeMillis() + timeoutMs;
+    String errorMessageSuffix = errorMessage != null ? ", error message: " + errorMessage : "";
+    while (System.currentTimeMillis() < endTime) {
+      try {
+        if (Boolean.TRUE.equals(condition.apply(null))) {
+          return;
+        }
+        Thread.sleep(checkIntervalMs);
+      } catch (Exception e) {
+        Assert.fail("Caught exception while checking the condition" + errorMessageSuffix, e);
+      }
+    }
+    if (raiseError) {
+      Assert.fail("Failed to meet condition in " + timeoutMs + "ms" + errorMessageSuffix);
+    }
+  }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarStandaloneCluster.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarStandaloneCluster.java
deleted file mode 100644
index d933f7f268..0000000000
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarStandaloneCluster.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.stream.pulsar;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.util.Optional;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.PulsarStandalone;
-import org.apache.pulsar.PulsarStandaloneBuilder;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class PulsarStandaloneCluster {
-  private static final Logger LOGGER = LoggerFactory.getLogger(PulsarStandaloneCluster.class);
-
-  public static final String DEFAULT_STANDALONE_CONF = "standalone.properties";
-  public static final String DEFAULT_ZK_DIR = "pulsar-zk";
-  public static final String DEFAULT_BK_DIR = "pulsar-bookeeper";
-
-  private Integer _brokerPort;
-  private Integer _adminPort;
-  private String _zkDir;
-  private String _bkDir;
-
-  private PulsarStandalone _pulsarStandalone;
-  private File _tempDir;
-
-  public void setBrokerPort(Integer brokerPort) {
-    _brokerPort = brokerPort;
-  }
-
-  public void setAdminPort(Integer adminPort) {
-    _adminPort = adminPort;
-  }
-
-  public void setZkDir(String zkDir) {
-    _zkDir = zkDir;
-  }
-
-  public void setBkDir(String bkDir) {
-    _bkDir = bkDir;
-  }
-
-  public Integer getBrokerPort() {
-    return _brokerPort;
-  }
-
-  public Integer getAdminPort() {
-    return _adminPort;
-  }
-
-  public void start()
-      throws Exception {
-    final File clusterConfigFile = new File(getClass().getClassLoader().getResource(DEFAULT_STANDALONE_CONF).toURI());
-
-    String zkDir = StringUtils.isBlank(_zkDir) ? DEFAULT_ZK_DIR : _zkDir;
-    String bkDir = StringUtils.isBlank(_bkDir) ? DEFAULT_BK_DIR : _bkDir;
-    _tempDir = FileUtils.getTempDirectory();
-    File zkDirFile = new File(_tempDir, zkDir);
-    File bkDirFile = new File(_tempDir, bkDir);
-    zkDirFile.mkdirs();
-    bkDirFile.mkdirs();
-
-    ServiceConfiguration config =
-        PulsarConfigurationLoader.create((new FileInputStream(clusterConfigFile)), ServiceConfiguration.class);
-    config.setManagedLedgerDefaultEnsembleSize(1);
-    config.setManagedLedgerDefaultWriteQuorum(1);
-    config.setManagedLedgerDefaultAckQuorum(1);
-    String zkServers = "127.0.0.1";
-    config.setAdvertisedAddress("localhost");
-
-    _pulsarStandalone = PulsarStandaloneBuilder.instance().withConfig(config).withNoStreamStorage(true).build();
-    _pulsarStandalone.setZkDir(zkDirFile.getAbsolutePath());
-    _pulsarStandalone.setBkDir(bkDirFile.getAbsolutePath());
-
-    if (config.getZookeeperServers() != null) {
-      _pulsarStandalone.setZkPort(Integer.parseInt(config.getZookeeperServers().split(":")[1]));
-    }
-
-    config.setZookeeperServers(zkServers + ":" + _pulsarStandalone.getZkPort());
-    config.setConfigurationStoreServers(zkServers + ":" + _pulsarStandalone.getZkPort());
-
-    config.setRunningStandalone(true);
-
-    if (_brokerPort != null) {
-      config.setBrokerServicePort(Optional.of(_brokerPort));
-    } else {
-      _brokerPort = config.getBrokerServicePort().get();
-    }
-
-    if (_adminPort != null) {
-      config.setWebServicePort(Optional.of(_adminPort));
-    } else {
-      _adminPort = config.getWebServicePort().get();
-    }
-
-    _pulsarStandalone.setConfigFile(clusterConfigFile.getAbsolutePath());
-    _pulsarStandalone.setConfig(config);
-
-    _pulsarStandalone.start();
-  }
-
-  public void stop() {
-    try {
-      _pulsarStandalone.close();
-      _tempDir.delete();
-    } catch (Exception e) {
-      LOGGER.warn("Failed to stop embedded pulsar and zookeeper", e);
-    }
-  }
-}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/resources/standalone.properties b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/resources/standalone.properties
deleted file mode 100644
index 452161b570..0000000000
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/resources/standalone.properties
+++ /dev/null
@@ -1,904 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-### --- General broker settings --- ###
-
-# Zookeeper quorum connection string
-zookeeperServers=
-
-# Configuration Store connection string
-configurationStoreServers=
-
-brokerServicePort=6650
-
-# Port to use to server HTTP request
-webServicePort=8080
-
-# Hostname or IP address the service binds on, default is 0.0.0.0.
-bindAddress=0.0.0.0
-
-# Hostname or IP address the service advertises to the outside world. If not set, the value of
-# InetAddress.getLocalHost().getHostName() is used.
-advertisedAddress=
-
-# Enable or disable the HAProxy protocol.
-haProxyProtocolEnabled=false
-
-# Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors()
-numIOThreads=
-
-# Number of threads to use for ordered executor. The ordered executor is used to operate with zookeeper,
-# such as init zookeeper client, get namespace policies from zookeeper etc. It also used to split bundle. Default is 8
-numOrderedExecutorThreads=8
-
-# Number of threads to use for HTTP requests processing.
-# Default is set to 2 * Runtime.getRuntime().availableProcessors()
-numHttpServerThreads=
-
-# Number of thread pool size to use for pulsar broker service.
-# The executor in thread pool will do basic broker operation like load/unload bundle, update managedLedgerConfig,
-# update topic/subscription/replicator message dispatch rate, do leader election etc.
-# Default is Runtime.getRuntime().availableProcessors()
-numExecutorThreadPoolSize=
-
-# Number of thread pool size to use for pulsar zookeeper callback service
-# The cache executor thread pool is used for restarting global zookeeper session.
-# Default is 10
-numCacheExecutorThreadPoolSize=10
-
-# Max concurrent web requests
-maxConcurrentHttpRequests=1024
-
-# Name of the cluster to which this broker belongs to
-clusterName=standalone
-
-# Enable cluster's failure-domain which can distribute brokers into logical region
-failureDomainsEnabled=false
-
-# Zookeeper session timeout in milliseconds
-zooKeeperSessionTimeoutMillis=30000
-
-# ZooKeeper operation timeout in seconds
-zooKeeperOperationTimeoutSeconds=30
-
-# ZooKeeper cache expiry time in seconds
-zooKeeperCacheExpirySeconds=300
-
-# Time to wait for broker graceful shutdown. After this time elapses, the process will be killed
-brokerShutdownTimeoutMs=60000
-
-# Flag to skip broker shutdown when broker handles Out of memory error
-skipBrokerShutdownOnOOM=false
-
-# Enable backlog quota check. Enforces action on topic when the quota is reached
-backlogQuotaCheckEnabled=true
-
-# How often to check for topics that have reached the quota
-backlogQuotaCheckIntervalInSeconds=60
-
-# Default per-topic backlog quota limit
-backlogQuotaDefaultLimitGB=10
-
-# Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0)
-ttlDurationDefaultInSeconds=0
-
-# Enable the deletion of inactive topics
-brokerDeleteInactiveTopicsEnabled=false
-
-# How often to check for inactive topics
-brokerDeleteInactiveTopicsFrequencySeconds=60
-
-# Max pending publish requests per connection to avoid keeping large number of pending
-# requests in memory. Default: 1000
-maxPendingPublishdRequestsPerConnection=1000
-
-# How frequently to proactively check and purge expired messages
-messageExpiryCheckIntervalInMinutes=1000
-
-# How long to delay rewinding cursor and dispatching messages when active consumer is changed
-activeConsumerFailoverDelayTimeMillis=1000
-
-# How long to delete inactive subscriptions from last consuming
-# When it is 0, inactive subscriptions are not deleted automatically
-subscriptionExpirationTimeMinutes=0
-
-# Enable subscription message redelivery tracker to send redelivery count to consumer (default is enabled)
-subscriptionRedeliveryTrackerEnabled=true
-
-# On KeyShared subscriptions, with default AUTO_SPLIT mode, use splitting ranges or
-# consistent hashing to reassign keys to new consumers
-subscriptionKeySharedUseConsistentHashing=false
-
-# On KeyShared subscriptions, number of points in the consistent-hashing ring.
-# The higher the number, the more equal the assignment of keys to consumers
-subscriptionKeySharedConsistentHashingReplicaPoints=100
-
-# How frequently to proactively check and purge expired subscription
-subscriptionExpiryCheckIntervalInMinutes=5
-
-# Set the default behavior for message deduplication in the broker
-# This can be overridden per-namespace. If enabled, broker will reject
-# messages that were already stored in the topic
-brokerDeduplicationEnabled=false
-
-# Maximum number of producer information that it's going to be
-# persisted for deduplication purposes
-brokerDeduplicationMaxNumberOfProducers=10000
-
-# Number of entries after which a dedup info snapshot is taken.
-# A bigger interval will lead to less snapshots being taken though it would
-# increase the topic recovery time, when the entries published after the
-# snapshot need to be replayed
-brokerDeduplicationEntriesInterval=1000
-
-# Time of inactivity after which the broker will discard the deduplication information
-# relative to a disconnected producer. Default is 6 hours.
-brokerDeduplicationProducerInactivityTimeoutMinutes=360
-
-# When a namespace is created without specifying the number of bundle, this
-# value will be used as the default
-defaultNumberOfNamespaceBundles=4
-
-# Enable check for minimum allowed client library version
-clientLibraryVersionCheckEnabled=false
-
-# Path for the file used to determine the rotation status for the broker when responding
-# to service discovery health checks
-statusFilePath=/usr/local/apache/htdocs
-
-# Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will
-# stop sending messages to consumer once, this limit reaches until consumer starts acknowledging messages back
-# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
-maxUnackedMessagesPerConsumer=50000
-
-# Max number of unacknowledged messages allowed per shared subscription. Broker will stop dispatching messages to
-# all consumers of the subscription once this limit reaches until consumer starts acknowledging messages back and
-# unack count reaches to limit/2. Using a value of 0, is disabling unackedMessage-limit
-# check and dispatcher can dispatch messages without any restriction
-maxUnackedMessagesPerSubscription=200000
-
-# Max number of unacknowledged messages allowed per broker. Once this limit reaches, broker will stop dispatching
-# messages to all shared subscription which has higher number of unack messages until subscriptions start
-# acknowledging messages back and unack count reaches to limit/2. Using a value of 0, is disabling
-# unackedMessage-limit check and broker doesn't block dispatchers
-maxUnackedMessagesPerBroker=0
-
-# Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher unacked messages
-# than this percentage limit and subscription will not receive any new messages until that subscription acks back
-# limit/2 messages
-maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16
-
-# Tick time to schedule task that checks topic publish rate limiting across all topics
-# Reducing to lower value can give more accuracy while throttling publish but
-# it uses more CPU to perform frequent check. (Disable publish throttling with value 0)
-topicPublisherThrottlingTickTimeMillis=2
-
-# Tick time to schedule task that checks broker publish rate limiting across all topics
-# Reducing to lower value can give more accuracy while throttling publish but
-# it uses more CPU to perform frequent check. (Disable publish throttling with value 0)
-brokerPublisherThrottlingTickTimeMillis=50
-
-# Max Rate(in 1 seconds) of Message allowed to publish for a broker if broker publish rate limiting enabled
-# (Disable message rate limit with value 0)
-brokerPublisherThrottlingMaxMessageRate=0
-
-# Max Rate(in 1 seconds) of Byte allowed to publish for a broker if broker publish rate limiting enabled
-# (Disable byte rate limit with value 0)
-brokerPublisherThrottlingMaxByteRate=0
-
-# Default messages per second dispatch throttling-limit for every topic. Using a value of 0, is disabling default
-# message dispatch-throttling
-dispatchThrottlingRatePerTopicInMsg=0
-
-# Default bytes per second dispatch throttling-limit for every topic. Using a value of 0, is disabling
-# default message-byte dispatch-throttling
-dispatchThrottlingRatePerTopicInByte=0
-
-# Dispatch rate-limiting relative to publish rate.
-# (Enabling flag will make broker to dynamically update dispatch-rate relatively to publish-rate:
-# throttle-dispatch-rate = (publish-rate + configured dispatch-rate).
-dispatchThrottlingRateRelativeToPublishRate=false
-
-# By default we enable dispatch-throttling for both caught up consumers as well as consumers who have
-# backlog.
-dispatchThrottlingOnNonBacklogConsumerEnabled=true
-
-# Precise dispathcer flow control according to history message number of each entry
-preciseDispatcherFlowControl=false
-
-# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
-maxConcurrentLookupRequest=50000
-
-# Max number of concurrent topic loading request broker allows to control number of zk-operations
-maxConcurrentTopicLoadRequest=5000
-
-# Max concurrent non-persistent message can be processed per connection
-maxConcurrentNonPersistentMessagePerConnection=1000
-
-# Number of worker threads to serve non-persistent topic
-numWorkerThreadsForNonPersistentTopic=8
-
-# Enable broker to load persistent topics
-enablePersistentTopics=true
-
-# Enable broker to load non-persistent topics
-enableNonPersistentTopics=true
-
-# Max number of producers allowed to connect to topic. Once this limit reaches, Broker will reject new producers
-# until the number of connected producers decrease.
-# Using a value of 0, is disabling maxProducersPerTopic-limit check.
-maxProducersPerTopic=0
-
-# Enforce producer to publish encrypted messages.(default disable).
-encryptionRequireOnProducer=false
-
-# Max number of consumers allowed to connect to topic. Once this limit reaches, Broker will reject new consumers
-# until the number of connected consumers decrease.
-# Using a value of 0, is disabling maxConsumersPerTopic-limit check.
-maxConsumersPerTopic=0
-
-# Max number of subscriptions allowed to subscribe to topic. Once this limit reaches, broker will reject
-# new subscription until the number of subscribed subscriptions decrease.
-# Using a value of 0, is disabling maxSubscriptionsPerTopic limit check.
-maxSubscriptionsPerTopic=0
-
-# Max number of consumers allowed to connect to subscription. Once this limit reaches, Broker will reject new consumers
-# until the number of connected consumers decrease.
-# Using a value of 0, is disabling maxConsumersPerSubscription-limit check.
-maxConsumersPerSubscription=0
-
-# Max number of partitions per partitioned topic
-# Use 0 or negative number to disable the check
-maxNumPartitionsPerPartitionedTopic=0
-
-### --- TLS --- ###
-# Deprecated - Use webServicePortTls and brokerServicePortTls instead
-tlsEnabled=false
-
-# Tls cert refresh duration in seconds (set 0 to check on every new connection)
-tlsCertRefreshCheckDurationSec=300
-
-# Path for the TLS certificate file
-tlsCertificateFilePath=
-
-# Path for the TLS private key file
-tlsKeyFilePath=
-
-# Path for the trusted TLS certificate file.
-# This cert is used to verify that any certs presented by connecting clients
-# are signed by a certificate authority. If this verification
-# fails, then the certs are untrusted and the connections are dropped.
-tlsTrustCertsFilePath=
-
-# Accept untrusted TLS certificate from client.
-# If true, a client with a cert which cannot be verified with the
-# 'tlsTrustCertsFilePath' cert will allowed to connect to the server,
-# though the cert will not be used for client authentication.
-tlsAllowInsecureConnection=false
-
-# Specify the tls protocols the broker will use to negotiate during TLS handshake
-# (a comma-separated list of protocol names).
-# Examples:- [TLSv1.2, TLSv1.1, TLSv1]
-tlsProtocols=
-
-# Specify the tls cipher the broker will use to negotiate during TLS Handshake
-# (a comma-separated list of ciphers).
-# Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
-tlsCiphers=
-
-# Trusted client certificates are required for to connect TLS
-# Reject the Connection if the Client Certificate is not trusted.
-# In effect, this requires that all connecting clients perform TLS client
-# authentication.
-tlsRequireTrustedClientCertOnConnect=false
-
-### --- KeyStore TLS config variables --- ###
-# Enable TLS with KeyStore type configuration in broker.
-tlsEnabledWithKeyStore=false
-
-# TLS Provider for KeyStore type
-tlsProvider=
-
-# TLS KeyStore type configuration in broker: JKS, PKCS12
-tlsKeyStoreType=JKS
-
-# TLS KeyStore path in broker
-tlsKeyStore=
-
-# TLS KeyStore password for broker
-tlsKeyStorePassword=
-
-# TLS TrustStore type configuration in broker: JKS, PKCS12
-tlsTrustStoreType=JKS
-
-# TLS TrustStore path in broker
-tlsTrustStore=
-
-# TLS TrustStore password for broker
-tlsTrustStorePassword=
-
-# Whether internal client use KeyStore type to authenticate with Pulsar brokers
-brokerClientTlsEnabledWithKeyStore=false
-
-# The TLS Provider used by internal client to authenticate with other Pulsar brokers
-brokerClientSslProvider=
-
-# TLS TrustStore type configuration for internal client: JKS, PKCS12
-# used by the internal client to authenticate with Pulsar brokers
-brokerClientTlsTrustStoreType=JKS
-
-# TLS TrustStore path for internal client
-# used by the internal client to authenticate with Pulsar brokers
-brokerClientTlsTrustStore=
-
-# TLS TrustStore password for internal client,
-# used by the internal client to authenticate with Pulsar brokers
-brokerClientTlsTrustStorePassword=
-
-# Specify the tls cipher the internal client will use to negotiate during TLS Handshake
-# (a comma-separated list of ciphers)
-# e.g.  [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256].
-# used by the internal client to authenticate with Pulsar brokers
-brokerClientTlsCiphers=
-
-# Specify the tls protocols the broker will use to negotiate during TLS handshake
-# (a comma-separated list of protocol names).
-# e.g.  [TLSv1.2, TLSv1.1, TLSv1]
-# used by the internal client to authenticate with Pulsar brokers
-brokerClientTlsProtocols=
-
-# Enable or disable system topic
-systemTopicEnabled=false
-
-# Enable or disable topic level policies, topic level policies depends on the system topic
-# Please enable the system topic first.
-topicLevelPoliciesEnabled=false
-
-# If a topic remains fenced for this number of seconds, it will be closed forcefully.
-# If it is set to 0 or a negative number, the fenced topic will not be closed.
-topicFencingTimeoutSeconds=0
-
-### --- Authentication --- ###
-# Role names that are treated as "proxy roles". If the broker sees a request with
-#role as proxyRoles - it will demand to see a valid original principal.
-proxyRoles=
-
-# If this flag is set then the broker authenticates the original Auth data
-# else it just accepts the originalPrincipal and authorizes it (if required).
-authenticateOriginalAuthData=false
-
-# Enable authentication
-authenticationEnabled=false
-
-# Autentication provider name list, which is comma separated list of class names
-authenticationProviders=
-
-# Enforce authorization
-authorizationEnabled=false
-
-# Authorization provider fully qualified class-name
-authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
-
-# Allow wildcard matching in authorization
-# (wildcard matching only applicable if wildcard-char:
-# * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
-authorizationAllowWildcardsMatching=false
-
-# Role names that are treated as "super-user", meaning they will be able to do all admin
-# operations and publish/consume from all topics
-superUserRoles=
-
-# Authentication settings of the broker itself. Used when the broker connects to other brokers,
-# either in same or other clusters
-brokerClientAuthenticationPlugin=
-brokerClientAuthenticationParameters=
-
-# Supported Athenz provider domain names(comma separated) for authentication
-athenzDomainNames=
-
-# When this parameter is not empty, unauthenticated users perform as anonymousUserRole
-anonymousUserRole=
-
-# The token "claim" that will be interpreted as the authentication "role" or "principal" by AuthenticationProviderToken
-# (defaults to "sub" if blank)
-tokenAuthClaim=
-
-# The token audience "claim" name, e.g. "aud", that will be used to get the audience from token.
-# If not set, audience will not be verified.
-tokenAudienceClaim=
-
-# The token audience stands for this broker. The field `tokenAudienceClaim` of a valid token, need contains this.
-tokenAudience=
-
-### --- BookKeeper Client --- ###
-
-# Authentication plugin to use when connecting to bookies
-bookkeeperClientAuthenticationPlugin=
-
-# BookKeeper auth plugin implementatation specifics parameters name and values
-bookkeeperClientAuthenticationParametersName=
-bookkeeperClientAuthenticationParameters=
-
-# Timeout for BK add / read operations
-bookkeeperClientTimeoutInSeconds=30
-
-# Speculative reads are initiated if a read request doesn't complete within a certain time
-# Using a value of 0, is disabling the speculative reads
-bookkeeperClientSpeculativeReadTimeoutInMillis=0
-
-# Number of channels per bookie
-bookkeeperNumberOfChannelsPerBookie=16
-
-# Enable bookies health check. Bookies that have more than the configured number of failure within
-# the interval will be quarantined for some time. During this period, new ledgers won't be created
-# on these bookies
-bookkeeperClientHealthCheckEnabled=true
-bookkeeperClientHealthCheckIntervalSeconds=60
-bookkeeperClientHealthCheckErrorThresholdPerInterval=5
-bookkeeperClientHealthCheckQuarantineTimeInSeconds=1800
-
-#bookie quarantine ratio to avoid all clients quarantine the high pressure bookie servers at the same time
-bookkeeperClientQuarantineRatio=1.0
-
-# Enable rack-aware bookie selection policy. BK will chose bookies from different racks when
-# forming a new bookie ensemble
-# This parameter related to ensemblePlacementPolicy in conf/bookkeeper.conf, if enabled, ensemblePlacementPolicy
-# should be set to org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy
-bookkeeperClientRackawarePolicyEnabled=true
-
-# Enable region-aware bookie selection policy. BK will chose bookies from
-# different regions and racks when forming a new bookie ensemble.
-# If enabled, the value of bookkeeperClientRackawarePolicyEnabled is ignored
-# This parameter related to ensemblePlacementPolicy in conf/bookkeeper.conf, if enabled, ensemblePlacementPolicy
-# should be set to org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy
-bookkeeperClientRegionawarePolicyEnabled=false
-
-# Minimum number of racks per write quorum. BK rack-aware bookie selection policy will try to
-# get bookies from at least 'bookkeeperClientMinNumRacksPerWriteQuorum' racks for a write quorum.
-bookkeeperClientMinNumRacksPerWriteQuorum=1
-
-# Enforces rack-aware bookie selection policy to pick bookies from 'bookkeeperClientMinNumRacksPerWriteQuorum'
-# racks for a writeQuorum.
-# If BK can't find bookie then it would throw BKNotEnoughBookiesException instead of picking random one.
-bookkeeperClientEnforceMinNumRacksPerWriteQuorum=false
-
-# Enable/disable reordering read sequence on reading entries.
-bookkeeperClientReorderReadSequenceEnabled=false
-
-# Enable bookie isolation by specifying a list of bookie groups to choose from. Any bookie
-# outside the specified groups will not be used by the broker
-bookkeeperClientIsolationGroups=
-
-# Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't
-# have enough bookie available.
-bookkeeperClientSecondaryIsolationGroups=
-
-# Minimum bookies that should be available as part of bookkeeperClientIsolationGroups
-# else broker will include bookkeeperClientSecondaryIsolationGroups bookies in isolated list.
-bookkeeperClientMinAvailableBookiesInIsolationGroups=
-
-# Set the client security provider factory class name.
-# Default: org.apache.bookkeeper.tls.TLSContextFactory
-bookkeeperTLSProviderFactoryClass=org.apache.bookkeeper.tls.TLSContextFactory
-
-# Enable tls authentication with bookie
-bookkeeperTLSClientAuthentication=false
-
-# Supported type: PEM, JKS, PKCS12. Default value: PEM
-bookkeeperTLSKeyFileType=PEM
-
-#Supported type: PEM, JKS, PKCS12. Default value: PEM
-bookkeeperTLSTrustCertTypes=PEM
-
-# Path to file containing keystore password, if the client keystore is password protected.
-bookkeeperTLSKeyStorePasswordPath=
-
-# Path to file containing truststore password, if the client truststore is password protected.
-bookkeeperTLSTrustStorePasswordPath=
-
-# Path for the TLS private key file
-bookkeeperTLSKeyFilePath=
-
-# Path for the TLS certificate file
-bookkeeperTLSCertificateFilePath=
-
-# Path for the trusted TLS certificate file
-bookkeeperTLSTrustCertsFilePath=
-
-# Enable/disable disk weight based placement. Default is false
-bookkeeperDiskWeightBasedPlacementEnabled=false
-
-# Set the interval to check the need for sending an explicit LAC
-# A value of '0' disables sending any explicit LACs. Default is 0.
-bookkeeperExplicitLacIntervalInMills=0
-
-# Use older Bookkeeper wire protocol with bookie
-bookkeeperUseV2WireProtocol=true
-
-# Expose bookkeeper client managed ledger stats to prometheus. default is false
-# bookkeeperClientExposeStatsToPrometheus=false
-
-### --- Managed Ledger --- ###
-
-# Number of bookies to use when creating a ledger
-managedLedgerDefaultEnsembleSize=1
-
-# Number of copies to store for each message
-managedLedgerDefaultWriteQuorum=1
-
-# Number of guaranteed copies (acks to wait before write is complete)
-managedLedgerDefaultAckQuorum=1
-
-# How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds).
-# Default is 60 seconds
-managedLedgerCursorPositionFlushSeconds=60
-
-# Default type of checksum to use when writing to BookKeeper. Default is "CRC32C"
-# Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum).
-managedLedgerDigestType=CRC32C
-
-# Number of threads to be used for managed ledger tasks dispatching
-managedLedgerNumWorkerThreads=4
-
-# Number of threads to be used for managed ledger scheduled tasks
-managedLedgerNumSchedulerThreads=4
-
-# Amount of memory to use for caching data payload in managed ledger. This memory
-# is allocated from JVM direct memory and it's shared across all the topics
-# running  in the same broker. By default, uses 1/5th of available direct memory
-managedLedgerCacheSizeMB=
-
-# Whether we should make a copy of the entry payloads when inserting in cache
-managedLedgerCacheCopyEntries=false
-
-# Threshold to which bring down the cache level when eviction is triggered
-managedLedgerCacheEvictionWatermark=0.9
-
-# Configure the cache eviction frequency for the managed ledger cache (evictions/sec)
-managedLedgerCacheEvictionFrequency=100.0
-
-# All entries that have stayed in cache for more than the configured time, will be evicted
-managedLedgerCacheEvictionTimeThresholdMillis=1000
-
-# Configure the threshold (in number of entries) from where a cursor should be considered 'backlogged'
-# and thus should be set as inactive.
-managedLedgerCursorBackloggedThreshold=1000
-
-# Rate limit the amount of writes generated by consumer acking the messages
-managedLedgerDefaultMarkDeleteRateLimit=0.1
-
-# Max number of entries to append to a ledger before triggering a rollover
-# A ledger rollover is triggered on these conditions
-#  * Either the max rollover time has been reached
-#  * or max entries have been written to the ledged and at least min-time
-#    has passed
-managedLedgerMaxEntriesPerLedger=50000
-
-# Minimum time between ledger rollover for a topic
-managedLedgerMinLedgerRolloverTimeMinutes=10
-
-# Maximum time before forcing a ledger rollover for a topic
-managedLedgerMaxLedgerRolloverTimeMinutes=240
-
-# Max number of entries to append to a cursor ledger
-managedLedgerCursorMaxEntriesPerLedger=50000
-
-# Max time before triggering a rollover on a cursor ledger
-managedLedgerCursorRolloverTimeInSeconds=14400
-
-# Maximum ledger size before triggering a rollover for a topic (MB)
-managedLedgerMaxSizePerLedgerMbytes=2048
-
-# Max number of "acknowledgment holes" that are going to be persistently stored.
-# When acknowledging out of order, a consumer will leave holes that are supposed
-# to be quickly filled by acking all the messages. The information of which
-# messages are acknowledged is persisted by compressing in "ranges" of messages
-# that were acknowledged. After the max number of ranges is reached, the information
-# will only be tracked in memory and messages will be redelivered in case of
-# crashes.
-managedLedgerMaxUnackedRangesToPersist=10000
-
-# Max number of "acknowledgment holes" that can be stored in Zookeeper. If number of unack message range is higher
-# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
-# zookeeper.
-managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000
-
-# Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets
-# corrupted at bookkeeper and managed-cursor is stuck at that ledger.
-autoSkipNonRecoverableData=false
-
-# operation timeout while updating managed-ledger metadata.
-managedLedgerMetadataOperationsTimeoutSeconds=60
-
-# Read entries timeout when broker tries to read messages from bookkeeper.
-managedLedgerReadEntryTimeoutSeconds=0
-
-# Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it).
-managedLedgerAddEntryTimeoutSeconds=0
-
-# New entries check delay for the cursor under the managed ledger.
-# If no new messages in the topic, the cursor will try to check again after the delay time.
-# For consumption latency sensitive scenario, can set to a smaller value or set to 0.
-# Of course, use a smaller value may degrade consumption throughput. Default is 10ms.
-managedLedgerNewEntriesCheckDelayInMillis=10
-
-# Use Open Range-Set to cache unacked messages
-managedLedgerUnackedRangesOpenCacheSetEnabled=true
-
-# Managed ledger prometheus stats latency rollover seconds (default: 60s)
-managedLedgerPrometheusStatsLatencyRolloverSeconds=60
-
-# Whether trace managed ledger task execution time
-managedLedgerTraceTaskExecution=true
-
-### --- Load balancer --- ###
-
-loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager
-
-# Enable load balancer
-loadBalancerEnabled=false
-
-# Percentage of change to trigger load report update
-loadBalancerReportUpdateThresholdPercentage=10
-
-# maximum interval to update load report
-loadBalancerReportUpdateMaxIntervalMinutes=15
-
-# Frequency of report to collect
-loadBalancerHostUsageCheckIntervalMinutes=1
-
-# Load shedding interval. Broker periodically checks whether some traffic should be offload from
-# some over-loaded broker to other under-loaded brokers
-loadBalancerSheddingIntervalMinutes=1
-
-# Prevent the same topics to be shed and moved to other broker more that once within this timeframe
-loadBalancerSheddingGracePeriodMinutes=30
-
-# Usage threshold to allocate max number of topics to broker
-loadBalancerBrokerMaxTopics=50000
-
-# Interval to flush dynamic resource quota to ZooKeeper
-loadBalancerResourceQuotaUpdateIntervalMinutes=15
-
-# enable/disable namespace bundle auto split
-loadBalancerAutoBundleSplitEnabled=true
-
-# enable/disable automatic unloading of split bundles
-loadBalancerAutoUnloadSplitBundlesEnabled=true
-
-# maximum topics in a bundle, otherwise bundle split will be triggered
-loadBalancerNamespaceBundleMaxTopics=1000
-
-# maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggered
-loadBalancerNamespaceBundleMaxSessions=1000
-
-# maximum msgRate (in + out) in a bundle, otherwise bundle split will be triggered
-loadBalancerNamespaceBundleMaxMsgRate=30000
-
-# maximum bandwidth (in + out) in a bundle, otherwise bundle split will be triggered
-loadBalancerNamespaceBundleMaxBandwidthMbytes=100
-
-# maximum number of bundles in a namespace
-loadBalancerNamespaceMaximumBundles=128
-
-# The broker resource usage threshold.
-# When the broker resource usage is gratter than the pulsar cluster average resource usge,
-# the threshold shedder will be triggered to offload bundles from the broker.
-# It only take effect in ThresholdSheddler strategy.
-loadBalancerBrokerThresholdShedderPercentage=10
-
-# When calculating new resource usage, the history usage accounts for.
-# It only take effect in ThresholdSheddler strategy.
-loadBalancerHistoryResourcePercentage=0.9
-
-# The BandWithIn usage weight when calculating new resourde usage.
-# It only take effect in ThresholdShedder strategy.
-loadBalancerBandwithInResourceWeight=1.0
-
-# The BandWithOut usage weight when calculating new resourde usage.
-# It only take effect in ThresholdShedder strategy.
-loadBalancerBandwithOutResourceWeight=1.0
-
-# The CPU usage weight when calculating new resourde usage.
-# It only take effect in ThresholdShedder strategy.
-loadBalancerCPUResourceWeight=1.0
-
-# The heap memory usage weight when calculating new resourde usage.
-# It only take effect in ThresholdShedder strategy.
-loadBalancerMemoryResourceWeight=1.0
-
-# The direct memory usage weight when calculating new resourde usage.
-# It only take effect in ThresholdShedder strategy.
-loadBalancerDirectMemoryResourceWeight=1.0
-
-# Bundle unload minimum throughput threshold (MB), avoding bundle unload frequently.
-# It only take effect in ThresholdShedder strategy.
-loadBalancerBundleUnloadMinThroughputThreshold=10
-
-### --- Replication --- ###
-
-# Enable replication metrics
-replicationMetricsEnabled=true
-
-# Max number of connections to open for each broker in a remote cluster
-# More connections host-to-host lead to better throughput over high-latency
-# links.
-replicationConnectionsPerBroker=16
-
-# Replicator producer queue size
-replicationProducerQueueSize=1000
-
-# Duration to check replication policy to avoid replicator inconsistency
-# due to missing ZooKeeper watch (disable with value 0)
-replicationPolicyCheckDurationSeconds=600
-
-# Default message retention time
-defaultRetentionTimeInMinutes=0
-
-# Default retention size
-defaultRetentionSizeInMB=0
-
-# How often to check whether the connections are still alive
-keepAliveIntervalSeconds=30
-
-### --- WebSocket --- ###
-
-# Enable the WebSocket API service in broker
-webSocketServiceEnabled=true
-
-# Number of IO threads in Pulsar Client used in WebSocket proxy
-webSocketNumIoThreads=8
-
-# Number of connections per Broker in Pulsar Client used in WebSocket proxy
-webSocketConnectionsPerBroker=8
-
-# Time in milliseconds that idle WebSocket session times out
-webSocketSessionIdleTimeoutMillis=300000
-
-# The maximum size of a text message during parsing in WebSocket proxy
-webSocketMaxTextFrameSize=1048576
-
-### --- Metrics --- ###
-
-# Enable topic level metrics
-exposeTopicLevelMetricsInPrometheus=true
-
-# Classname of Pluggable JVM GC metrics logger that can log GC specific metrics
-# jvmGCMetricsLoggerClassName=
-
-### --- Broker Web Stats --- ###
-
-# Enable topic level metrics
-exposePublisherStats=true
-
-# Enable expose the precise backlog stats.
-# Set false to use published counter and consumed counter to calculate, this would be more efficient but may be
-# inaccurate. Default is false.
-exposePreciseBacklogInPrometheus=false
-
-### --- Deprecated config variables --- ###
-
-# Deprecated. Use configurationStoreServers
-globalZookeeperServers=
-
-# Deprecated. Use brokerDeleteInactiveTopicsFrequencySeconds
-brokerServicePurgeInactiveFrequencyInSeconds=60
-
-### --- BookKeeper Configuration --- #####
-
-ledgerStorageClass=org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage
-
-# The maximum netty frame size in bytes. Any message received larger than this will be rejected. The default value is
-# 5MB.
-nettyMaxFrameSizeBytes=5253120
-
-# Size of Write Cache. Memory is allocated from JVM direct memory.
-# Write cache is used to buffer entries before flushing into the entry log
-# For good performance, it should be big enough to hold a substantial amount
-# of entries in the flush interval
-# By default it will be allocated to 1/4th of the available direct memory
-dbStorage_writeCacheMaxSizeMb=
-
-# Size of Read cache. Memory is allocated from JVM direct memory.
-# This read cache is pre-filled doing read-ahead whenever a cache miss happens
-# By default it will be allocated to 1/4th of the available direct memory
-dbStorage_readAheadCacheMaxSizeMb=
-
-# How many entries to pre-fill in cache after a read cache miss
-dbStorage_readAheadCacheBatchSize=1000
-
-flushInterval=60000
-
-## RocksDB specific configurations
-## DbLedgerStorage uses RocksDB to store the indexes from
-## (ledgerId, entryId) -> (entryLog, offset)
-
-# Size of RocksDB block-cache. For best performance, this cache
-# should be big enough to hold a significant portion of the index
-# database which can reach ~2GB in some cases
-# Default is to use 10% of the direct memory size
-dbStorage_rocksDB_blockCacheSize=
-
-# Other RocksDB specific tunables
-dbStorage_rocksDB_writeBufferSizeMB=4
-dbStorage_rocksDB_sstSizeInMB=4
-dbStorage_rocksDB_blockSize=4096
-dbStorage_rocksDB_bloomFilterBitsPerKey=10
-dbStorage_rocksDB_numLevels=-1
-dbStorage_rocksDB_numFilesInLevel0=4
-dbStorage_rocksDB_maxSizeInLevel1MB=256
-
-# Maximum latency to impose on a journal write to achieve grouping
-journalMaxGroupWaitMSec=1
-
-# Should the data be fsynced on journal before acknowledgment.
-journalSyncData=false
-
-
-# For each ledger dir, maximum disk space which can be used.
-# Default is 0.95f. i.e. 95% of disk can be used at most after which nothing will
-# be written to that partition. If all ledger dir partions are full, then bookie
-# will turn to readonly mode if 'readOnlyModeEnabled=true' is set, else it will
-# shutdown.
-# Valid values should be in between 0 and 1 (exclusive).
-diskUsageThreshold=0.99
-
-# The disk free space low water mark threshold.
-# Disk is considered full when usage threshold is exceeded.
-# Disk returns back to non-full state when usage is below low water mark threshold.
-# This prevents it from going back and forth between these states frequently
-# when concurrent writes and compaction are happening. This also prevent bookie from
-# switching frequently between read-only and read-writes states in the same cases.
-diskUsageWarnThreshold=0.99
-
-# Whether the bookie allowed to use a loopback interface as its primary
-# interface(i.e. the interface it uses to establish its identity)?
-# By default, loopback interfaces are not allowed as the primary
-# interface.
-# Using a loopback interface as the primary interface usually indicates
-# a configuration error. For example, its fairly common in some VPS setups
-# to not configure a hostname, or to have the hostname resolve to
-# 127.0.0.1. If this is the case, then all bookies in the cluster will
-# establish their identities as 127.0.0.1:3181, and only one will be able
-# to join the cluster. For VPSs configured like this, you should explicitly
-# set the listening interface.
-allowLoopback=true
-
-# How long the interval to trigger next garbage collection, in milliseconds
-# Since garbage collection is running in background, too frequent gc
-# will heart performance. It is better to give a higher number of gc
-# interval if there is enough disk capacity.
-gcWaitTime=300000
-
-# Enable topic auto creation if new producer or consumer connected (disable auto creation with value false)
-allowAutoTopicCreation=true
-
-# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
-allowAutoTopicCreationType=non-partitioned
-
-# Enable subscription auto creation if new consumer connected (disable auto creation with value false)
-allowAutoSubscriptionCreation=true
-
-# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is
-# partitioned.
-defaultNumPartitions=1
-
-### --- Transaction config variables --- ###
-transactionMetadataStoreProviderClassName=\
-  org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org