You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/13 07:15:23 UTC

[pulsar] 01/02: [improve][test] force initialize field to avoid polluted by mocks (#17022)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 77877373a7a5801a48a1bcc169759430ad91f50c
Author: tison <wa...@gmail.com>
AuthorDate: Mon Aug 22 18:32:34 2022 +0800

    [improve][test] force initialize field to avoid polluted by mocks (#17022)
    
    Master Issue: #16912
    
    - [x] `doc-not-needed`
    
    cc @Shoothzj @eolivelli @nicoloboschi @Technoboy-
    
    Signed-off-by: tison <wa...@gmail.com>
---
 .../client/impl/BatchMessageContainerImpl.java     | 18 ++++-
 .../client/impl/BatchMessageContainerImplTest.java | 84 +++++++++-------------
 .../PulsarByteBufAllocatorDefaultTest.java         | 37 ++++------
 ...ulsarByteBufAllocatorOomThrowExceptionTest.java | 35 ++++-----
 4 files changed, 75 insertions(+), 99 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 7d95f0963ba..44fad489dac 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.client.impl;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.util.ReferenceCountUtil;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -61,7 +63,19 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
     // keep track of callbacks for individual messages being published in a batch
     protected SendCallback firstCallback;
 
+    private final ByteBufAllocator allocator;
+
     public BatchMessageContainerImpl() {
+        this(PulsarByteBufAllocator.DEFAULT);
+    }
+
+    /**
+     * This constructor is for testing only. The global allocator is always
+     * {@link PulsarByteBufAllocator#DEFAULT}.
+     */
+    @VisibleForTesting
+    BatchMessageContainerImpl(ByteBufAllocator allocator) {
+        this.allocator = allocator;
     }
 
     public BatchMessageContainerImpl(ProducerImpl<?> producer) {
@@ -84,8 +98,8 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
                 messageMetadata.setSequenceId(msg.getSequenceId());
                 lowestSequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
                 this.firstCallback = callback;
-                batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT
-                        .buffer(Math.min(maxBatchSize, ClientCnx.getMaxMessageSize()));
+                batchedMessageMetadataAndPayload = allocator.buffer(
+                        Math.min(maxBatchSize, ClientCnx.getMaxMessageSize()));
                 if (msg.getMessageBuilder().hasTxnidMostBits() && currentTxnidMostBits == -1) {
                     currentTxnidMostBits = msg.getMessageBuilder().getTxnidMostBits();
                 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
index 8fc018b3199..a4f0205c2cf 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
@@ -19,69 +19,51 @@
 package org.apache.pulsar.client.impl;
 
 import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
+import static org.testng.Assert.assertTrue;
+import io.netty.buffer.ByteBufAllocator;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.mockito.MockedConstruction;
-import org.mockito.Mockito;
-import org.powermock.reflect.Whitebox;
 import org.testng.annotations.Test;
 
 public class BatchMessageContainerImplTest {
 
     @Test
-    public void recoveryAfterOom() throws Exception {
-        AtomicBoolean called = new AtomicBoolean();
-        try (MockedConstruction<ByteBufAllocatorImpl> mocked = Mockito.mockConstruction(ByteBufAllocatorImpl.class,
-                (mockAllocator, context) -> {
-                    called.set(true);
-                    doThrow(new OutOfMemoryError("test")).when(mockAllocator).buffer(anyInt(), anyInt());
-                })) {
-            if (PulsarByteBufAllocator.DEFAULT != null && !called.get()) {
-                replaceByteBufAllocator();
-            }
-            final ProducerImpl producer = Mockito.mock(ProducerImpl.class);
-            final ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData();
-            producerConfigurationData.setCompressionType(CompressionType.NONE);
-            when(producer.getConfiguration()).thenReturn(producerConfigurationData);
-            final BatchMessageContainerImpl batchMessageContainer = new BatchMessageContainerImpl();
-            batchMessageContainer.setProducer(producer);
-            MessageMetadata messageMetadata1 = new MessageMetadata();
-            messageMetadata1.setSequenceId(1L);
-            messageMetadata1.setProducerName("producer1");
-            messageMetadata1.setPublishTime(System.currentTimeMillis());
-            ByteBuffer payload1 = ByteBuffer.wrap("payload1".getBytes(StandardCharsets.UTF_8));
-            final MessageImpl<byte[]> message1 = MessageImpl.create(messageMetadata1, payload1, Schema.BYTES, null);
-            batchMessageContainer.add(message1, null);
-            MessageMetadata messageMetadata2 = new MessageMetadata();
-            messageMetadata2.setSequenceId(1L);
-            messageMetadata2.setProducerName("producer1");
-            messageMetadata2.setPublishTime(System.currentTimeMillis());
-            ByteBuffer payload2 = ByteBuffer.wrap("payload2".getBytes(StandardCharsets.UTF_8));
-            final MessageImpl<byte[]> message2 = MessageImpl.create(messageMetadata2, payload2, Schema.BYTES, null);
-            // after oom, our add can self-healing, won't throw exception
-            batchMessageContainer.add(message2, null);
-        } finally {
-            replaceByteBufAllocator();
-        }
-
-    }
-
-    private void replaceByteBufAllocator() throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
-        Method createByteBufAllocatorMethod = PulsarByteBufAllocator.class.getDeclaredMethod("createByteBufAllocator");
-        createByteBufAllocatorMethod.setAccessible(true);
-        Whitebox.setInternalState(PulsarByteBufAllocator.class, "DEFAULT",
-                createByteBufAllocatorMethod.invoke(null));
+    public void recoveryAfterOom() {
+        final AtomicBoolean called = new AtomicBoolean();
+        final ProducerImpl<?> producer = mock(ProducerImpl.class);
+        final ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData();
+        producerConfigurationData.setCompressionType(CompressionType.NONE);
+        when(producer.getConfiguration()).thenReturn(producerConfigurationData);
+        final ByteBufAllocator mockAllocator = mock(ByteBufAllocator.class);
+        doAnswer((ignore) -> {
+            called.set(true);
+            throw new OutOfMemoryError("test");
+        }).when(mockAllocator).buffer(anyInt());
+        final BatchMessageContainerImpl batchMessageContainer = new BatchMessageContainerImpl(mockAllocator);
+        batchMessageContainer.setProducer(producer);
+        MessageMetadata messageMetadata1 = new MessageMetadata();
+        messageMetadata1.setSequenceId(1L);
+        messageMetadata1.setProducerName("producer1");
+        messageMetadata1.setPublishTime(System.currentTimeMillis());
+        ByteBuffer payload1 = ByteBuffer.wrap("payload1".getBytes(StandardCharsets.UTF_8));
+        final MessageImpl<byte[]> message1 = MessageImpl.create(messageMetadata1, payload1, Schema.BYTES, null);
+        batchMessageContainer.add(message1, null);
+        assertTrue(called.get());
+        MessageMetadata messageMetadata2 = new MessageMetadata();
+        messageMetadata2.setSequenceId(1L);
+        messageMetadata2.setProducerName("producer1");
+        messageMetadata2.setPublishTime(System.currentTimeMillis());
+        ByteBuffer payload2 = ByteBuffer.wrap("payload2".getBytes(StandardCharsets.UTF_8));
+        final MessageImpl<byte[]> message2 = MessageImpl.create(messageMetadata2, payload2, Schema.BYTES, null);
+        // after oom, our add can self-healing, won't throw exception
+        batchMessageContainer.add(message2, null);
     }
-
 }
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java
index fb81835c7c4..9c548ff25e4 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java
@@ -18,30 +18,30 @@
  */
 package org.apache.pulsar.common.allocator;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 import io.netty.buffer.ByteBufAllocator;
-import lombok.extern.slf4j.Slf4j;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
 import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
 import org.apache.bookkeeper.common.allocator.PoolingPolicy;
 import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl;
 import org.mockito.MockedConstruction;
 import org.mockito.Mockito;
-import org.powermock.reflect.Whitebox;
 import org.testng.annotations.Test;
 
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-@Slf4j
 public class PulsarByteBufAllocatorDefaultTest {
 
     @Test
-    public void testDefaultConfig() throws Exception {
+    public void testDefaultConfig() {
+        // Force initialize PulsarByteBufAllocator.DEFAULT before mock the ctor so that it is not polluted.
+        assertNotNull(PulsarByteBufAllocator.DEFAULT);
+
         AtomicBoolean called = new AtomicBoolean();
-        try (MockedConstruction<ByteBufAllocatorImpl> mocked = Mockito.mockConstruction(ByteBufAllocatorImpl.class,
+        try (MockedConstruction<ByteBufAllocatorImpl> ignored = Mockito.mockConstruction(ByteBufAllocatorImpl.class,
                 (mock, context) -> {
             called.set(true);
             final List<?> arguments = context.arguments();
@@ -49,21 +49,10 @@ public class PulsarByteBufAllocatorDefaultTest {
             assertEquals(arguments.get(2), PoolingPolicy.PooledDirect);
             assertEquals(arguments.get(4), OutOfMemoryPolicy.FallbackToHeap);
             assertEquals(arguments.get(6), LeakDetectionPolicy.Advanced);
-
         })) {
-            final ByteBufAllocatorImpl byteBufAllocator = (ByteBufAllocatorImpl) PulsarByteBufAllocator.DEFAULT;
-            // use the variable, in case the compiler optimization
-            log.trace("{}", byteBufAllocator);
-
-            if (!called.get()) {
-                // maybe PulsarByteBufAllocator static initialization has already been called by a previous test
-                // let's rerun the same method
-                PulsarByteBufAllocator.createByteBufAllocator();
-            }
+            assertFalse(called.get());
+            PulsarByteBufAllocator.createByteBufAllocator();
             assertTrue(called.get());
-        } finally {
-            Whitebox.setInternalState(PulsarByteBufAllocator.class, "DEFAULT",
-                    PulsarByteBufAllocator.createByteBufAllocator());
         }
     }
 
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java
index 20bbeb06590..e1a3176e5bd 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java
@@ -18,31 +18,31 @@
  */
 package org.apache.pulsar.common.allocator;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 import io.netty.buffer.ByteBufAllocator;
-import lombok.extern.slf4j.Slf4j;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
 import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
 import org.apache.bookkeeper.common.allocator.PoolingPolicy;
 import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl;
 import org.mockito.MockedConstruction;
 import org.mockito.Mockito;
-import org.powermock.reflect.Whitebox;
 import org.testng.annotations.Test;
 
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-@Slf4j
 public class PulsarByteBufAllocatorOomThrowExceptionTest {
 
     @Test
-    public void testDefaultConfig() throws Exception {
+    public void testDefaultConfig() {
+        // Force initialize PulsarByteBufAllocator.DEFAULT before mock the ctor so that it is not polluted.
+        assertNotNull(PulsarByteBufAllocator.DEFAULT);
+
         AtomicBoolean called = new AtomicBoolean();
         System.setProperty("pulsar.allocator.out_of_memory_policy", "ThrowException");
-        try (MockedConstruction<ByteBufAllocatorImpl> mocked = Mockito.mockConstruction(ByteBufAllocatorImpl.class,
+        try (MockedConstruction<ByteBufAllocatorImpl> ignored = Mockito.mockConstruction(ByteBufAllocatorImpl.class,
                 (mock, context) -> {
                     called.set(true);
                     final List<?> arguments = context.arguments();
@@ -50,21 +50,12 @@ public class PulsarByteBufAllocatorOomThrowExceptionTest {
                     assertEquals(arguments.get(2), PoolingPolicy.PooledDirect);
                     assertEquals(arguments.get(4), OutOfMemoryPolicy.ThrowException);
                     assertEquals(arguments.get(6), LeakDetectionPolicy.Advanced);
-
                 })) {
-            final ByteBufAllocatorImpl byteBufAllocator = (ByteBufAllocatorImpl) PulsarByteBufAllocator.DEFAULT;
-            // use the variable, in case the compiler optimization
-            log.trace("{}", byteBufAllocator);
-            if (!called.get()) {
-                // maybe PulsarByteBufAllocator static initialization has already been called by a previous test
-                // let's rerun the same method
-                PulsarByteBufAllocator.createByteBufAllocator();
-            }
+            assertFalse(called.get());
+            PulsarByteBufAllocator.createByteBufAllocator();
             assertTrue(called.get());
         } finally {
             System.clearProperty("pulsar.allocator.out_of_memory_policy");
-            Whitebox.setInternalState(PulsarByteBufAllocator.class, "DEFAULT",
-                    PulsarByteBufAllocator.createByteBufAllocator());
         }
     }