You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/09/02 04:42:05 UTC

[pulsar] branch branch-2.10 updated (18cd7f4e04e -> 3c25a1f19e1)

This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 18cd7f4e04e [fix][broker] Support loadBalancerSheddingIntervalMinutes dynamic configuration (#16408)
     new 6db6679b419 [Fix][Flaky-test] Fix testConsumeTxnMessage (#16981)
     new 3c25a1f19e1 [fix][flaky-test] ElasticSearchClientTests.testBulkBlocking (#16920)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../io/elasticsearch/ElasticSearchClientTests.java | 91 ++++++++++++----------
 .../testclient/PerformanceTransactionTest.java     | 12 ++-
 2 files changed, 59 insertions(+), 44 deletions(-)


[pulsar] 02/02: [fix][flaky-test] ElasticSearchClientTests.testBulkBlocking (#16920)

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3c25a1f19e1b4267c920cf593e4571a454171a86
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Tue Aug 9 14:07:31 2022 +0800

    [fix][flaky-test] ElasticSearchClientTests.testBulkBlocking (#16920)
    
    (cherry picked from commit f02679d83402ac07e06c48570505f343e350a491)
---
 .../io/elasticsearch/ElasticSearchClientTests.java | 91 ++++++++++++----------
 1 file changed, 49 insertions(+), 42 deletions(-)

diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
index 599a68dcfaf..3403baf4d82 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
@@ -18,7 +18,20 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
 import eu.rekawek.toxiproxy.model.ToxicDirection;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.atomic.LongAdder;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.schema.GenericObject;
@@ -34,20 +47,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import java.io.IOException;
-import java.util.Optional;
-import java.util.UUID;
-
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertThrows;
-import static org.testng.Assert.assertTrue;
-
 @Slf4j
 public class ElasticSearchClientTests extends ElasticSearchTestBase {
 
@@ -67,8 +66,16 @@ public class ElasticSearchClientTests extends ElasticSearchTestBase {
     }
 
     static class MockRecord<T> implements Record<T> {
-        int acked = 0;
-        int failed = 0;
+        LongAdder acked = new LongAdder();
+        LongAdder failed = new LongAdder();
+
+        public int getAcked() {
+            return acked.intValue();
+        }
+
+        public int getFailed() {
+            return failed.intValue();
+        }
 
         @Override
         public T getValue() {
@@ -77,12 +84,12 @@ public class ElasticSearchClientTests extends ElasticSearchTestBase {
 
         @Override
         public void ack() {
-            acked++;
+            acked.increment();
         }
 
         @Override
         public void fail() {
-            failed++;
+            failed.increment();
         }
     }
 
@@ -166,13 +173,13 @@ public class ElasticSearchClientTests extends ElasticSearchTestBase {
             try {
                 MockRecord<GenericObject> mockRecord = new MockRecord<>();
                 client.indexDocument(mockRecord, Pair.of("1", "{ \"a\":1}"));
-                assertEquals(mockRecord.acked, 1);
-                assertEquals(mockRecord.failed, 0);
+                assertEquals(mockRecord.getAcked(), 1);
+                assertEquals(mockRecord.getFailed(), 0);
                 assertEquals(client.totalHits(index), 1);
 
                 client.deleteDocument(mockRecord, "1");
-                assertEquals(mockRecord.acked, 2);
-                assertEquals(mockRecord.failed, 0);
+                assertEquals(mockRecord.getAcked(), 2);
+                assertEquals(mockRecord.getFailed(), 0);
                 assertEquals(client.totalHits(index), 0);
             } finally {
                 client.delete(index);
@@ -229,11 +236,11 @@ public class ElasticSearchClientTests extends ElasticSearchTestBase {
             client.flush();
             assertNotNull(client.irrecoverableError.get());
             assertTrue(client.irrecoverableError.get().getMessage().contains("mapper_parsing_exception"));
-            assertEquals(mockRecord.acked, 1);
-            assertEquals(mockRecord.failed, 1);
+            assertEquals(mockRecord.getAcked(), 1);
+            assertEquals(mockRecord.getFailed(), 1);
             assertThrows(Exception.class, () -> client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}")));
-            assertEquals(mockRecord.acked, 1);
-            assertEquals(mockRecord.failed, 2);
+            assertEquals(mockRecord.getAcked(), 1);
+            assertEquals(mockRecord.getFailed(), 2);
         }
     }
 
@@ -251,8 +258,8 @@ public class ElasticSearchClientTests extends ElasticSearchTestBase {
             client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":\"toto\"}"));
             client.flush();
             assertNull(client.irrecoverableError.get());
-            assertEquals(mockRecord.acked, 1);
-            assertEquals(mockRecord.failed, 1);
+            assertEquals(mockRecord.getAcked(), 1);
+            assertEquals(mockRecord.getFailed(), 1);
         }
     }
 
@@ -278,8 +285,8 @@ public class ElasticSearchClientTests extends ElasticSearchTestBase {
                     MockRecord<GenericObject> mockRecord = new MockRecord<>();
                     client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
                     client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":2}"));
-                    assertEquals(mockRecord.acked, 2);
-                    assertEquals(mockRecord.failed, 0);
+                    assertEquals(mockRecord.getAcked(), 2);
+                    assertEquals(mockRecord.getFailed(), 0);
                     assertEquals(client.totalHits(index), 2);
 
                     log.info("starting the toxic");
@@ -288,13 +295,13 @@ public class ElasticSearchClientTests extends ElasticSearchTestBase {
                     toxiproxy.removeToxicAfterDelay("elasticpause", 15000);
 
                     client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}"));
-                    assertEquals(mockRecord.acked, 2);
-                    assertEquals(mockRecord.failed, 0);
+                    assertEquals(mockRecord.getAcked(), 2);
+                    assertEquals(mockRecord.getFailed(), 0);
                     assertEquals(client.totalHits(index), 2);
 
                     client.flush();
-                    assertEquals(mockRecord.acked, 3);
-                    assertEquals(mockRecord.failed, 0);
+                    assertEquals(mockRecord.getAcked(), 3);
+                    assertEquals(mockRecord.getFailed(), 0);
                     assertEquals(client.totalHits(index), 3);
                 } finally {
                     client.delete(index);
@@ -328,14 +335,14 @@ public class ElasticSearchClientTests extends ElasticSearchTestBase {
                     }
 
                     Awaitility.await().untilAsserted(() -> {
-                        assertThat("acked record", mockRecord.acked, greaterThanOrEqualTo(4));
-                        assertEquals(mockRecord.failed, 0);
+                        assertThat("acked record", mockRecord.getAcked(), greaterThanOrEqualTo(4));
+                        assertEquals(mockRecord.getFailed(), 0);
                         assertThat("totalHits", client.totalHits(index), greaterThanOrEqualTo(4L));
                     });
                     client.flush();
                     Awaitility.await().untilAsserted(() -> {
-                        assertEquals(mockRecord.failed, 0);
-                        assertEquals(mockRecord.acked, 5);
+                        assertEquals(mockRecord.getFailed(), 0);
+                        assertEquals(mockRecord.getAcked(), 5);
                         assertEquals(client.totalHits(index), 5);
                     });
 
@@ -355,10 +362,10 @@ public class ElasticSearchClientTests extends ElasticSearchTestBase {
                     log.info("elapsed = {}", elapsed);
                     assertTrue(elapsed > 29000); // bulkIndex was blocking while elasticsearch was down or busy
 
-                    Thread.sleep(3000L);
-                    assertEquals(mockRecord.acked, 15);
-                    assertEquals(mockRecord.failed, 0);
-                    assertEquals(client.records.size(), 0);
+                    Awaitility.await().untilAsserted(() -> {
+                        assertEquals(mockRecord.getAcked(), 15);
+                        assertEquals(mockRecord.getFailed(), 0);
+                    });
 
                 } finally {
                     client.delete(index);


[pulsar] 01/02: [Fix][Flaky-test] Fix testConsumeTxnMessage (#16981)

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6db6679b419fc0951f42f43331e13cabe90c6130
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Tue Aug 9 10:45:48 2022 +0800

    [Fix][Flaky-test] Fix testConsumeTxnMessage (#16981)
    
    * [Fix][Flaky-test] Fix testConsumeTxnMessage
    Master https://github.com/apache/pulsar/issues/14109
    ## Motivation
    The transaction commit is async, so the consumer can still receive message when the consumer rebuilds.
    ## Modification
    Add  Awaitility.await() for check-ing whether the ongoingTxns = 0.
    
    (cherry picked from commit c29503e7c8704132f13ae8021a76735b065940b9)
---
 .../apache/pulsar/testclient/PerformanceTransactionTest.java | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
index 936f70bedf3..1fcb925a5e2 100644
--- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
+++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -194,7 +195,7 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test
-    public void testConsumeTxnMessage() throws InterruptedException, PulsarClientException {
+    public void testConsumeTxnMessage() throws Exception {
         String argString = "%s -r 10 -u %s -txn -ss %s -st %s -sp %s -ntxn %d";
         String subName = "sub";
         String topic = testTopic + UUID.randomUUID();
@@ -222,11 +223,18 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
         });
         thread.start();
         thread.join();
+
+        Awaitility.await().untilAsserted(() -> {
+            admin.transactions().getCoordinatorStats().forEach((integer, transactionCoordinatorStats) -> {
+                Assert.assertEquals(transactionCoordinatorStats.ongoingTxnSize, 0);
+            });
+        });
+
         Consumer<byte[]> consumer = pulsarClient.newConsumer().subscriptionName(subName).topic(topic)
                 .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscriptionType(SubscriptionType.Exclusive)
                 .enableBatchIndexAcknowledgment(false)
-               .subscribe();
+                .subscribe();
         for (int i = 0; i < 5; i++) {
             Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
             Assert.assertNotNull(message);