You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/04/18 02:43:12 UTC

[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8554: Ensure all records are pushed in Pulsar Consumer Test

xiangfu0 commented on code in PR #8554:
URL: https://github.com/apache/pinot/pull/8554#discussion_r851847264


##########
pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java:
##########
@@ -57,45 +68,90 @@ public class PulsarConsumerTest {
   public static final int NUM_PARTITION = 1;
   public static final int NUM_RECORDS_PER_PARTITION = 1000;
   public static final int BATCH_SIZE = 10;
+  public static final int CONSUMER_FETCH_TIMEOUT_MILLIS = (int) Duration.ofMinutes(5).toMillis();
 
   private PulsarClient _pulsarClient;
-  private PulsarStandaloneCluster _pulsarStandaloneCluster;
+  private PulsarContainer _pulsar = null;
   private HashMap<Integer, MessageId> _partitionToFirstMessageIdMap = new HashMap<>();
   private HashMap<Integer, MessageId> _partitionToFirstMessageIdMapBatch = new HashMap<>();
 
   @BeforeClass
   public void setUp()
       throws Exception {
     try {
-      _pulsarStandaloneCluster = new PulsarStandaloneCluster();
+      _pulsar = new PulsarContainer(PULSAR_IMAGE).withStartupTimeout(Duration.ofMinutes(5));
+      _pulsar.start();
 
-      _pulsarStandaloneCluster.start();
+      // Waiting for namespace to be created.
+      // There should be a better approach.
+      Thread.sleep(20 * 1000L);
 
-      PulsarAdmin admin =
-          PulsarAdmin.builder().serviceHttpUrl("http://localhost:" + _pulsarStandaloneCluster.getAdminPort()).build();
+      PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(_pulsar.getHttpServiceUrl()).build();
 
-      String bootstrapServer = "pulsar://localhost:" + _pulsarStandaloneCluster.getBrokerPort();
+      String bootstrapServer = _pulsar.getPulsarBrokerUrl();
 
       _pulsarClient = PulsarClient.builder().serviceUrl(bootstrapServer).build();
 
-      admin.topics().createPartitionedTopic(TEST_TOPIC, NUM_PARTITION);
-      admin.topics().createPartitionedTopic(TEST_TOPIC_BATCH, NUM_PARTITION);
+      createTopics(admin);
 
       publishRecords();
       publishRecordsBatch();
+
+      waitForMessagesToPublish(admin, TEST_TOPIC);
+      waitForMessagesToPublish(admin, TEST_TOPIC_BATCH);
+
+      admin.close();
     } catch (Exception e) {
-      if (_pulsarStandaloneCluster != null) {
-        _pulsarStandaloneCluster.stop();
+      if (_pulsar != null) {
+        _pulsar.stop();
+        _pulsar = null;
       }
       throw new RuntimeException("Failed to setUp test environment", e);
     }
   }
 
+  private void createTopics(PulsarAdmin admin)
+      throws PulsarAdminException {
+    InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies();
+    inactiveTopicPolicies.setDeleteWhileInactive(false);
+    admin.namespaces().setInactiveTopicPolicies("public/default", inactiveTopicPolicies);
+
+    admin.topics().createPartitionedTopic(TEST_TOPIC, NUM_PARTITION);
+    admin.topics().createPartitionedTopic(TEST_TOPIC_BATCH, NUM_PARTITION);
+  }
+
+  private void waitForMessagesToPublish(PulsarAdmin admin, String topicName) {
+    waitForCondition(new Function<Void, Boolean>() {

Review Comment:
   We already have `TestUtils.waitForCondition`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org