You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/13 07:15:23 UTC
[pulsar] 01/02: [improve][test] force initialize field to avoid polluted by mocks (#17022)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 77877373a7a5801a48a1bcc169759430ad91f50c
Author: tison <wa...@gmail.com>
AuthorDate: Mon Aug 22 18:32:34 2022 +0800
[improve][test] force initialize field to avoid polluted by mocks (#17022)
Master Issue: #16912
- [x] `doc-not-needed`
cc @Shoothzj @eolivelli @nicoloboschi @Technoboy-
Signed-off-by: tison <wa...@gmail.com>
---
.../client/impl/BatchMessageContainerImpl.java | 18 ++++-
.../client/impl/BatchMessageContainerImplTest.java | 84 +++++++++-------------
.../PulsarByteBufAllocatorDefaultTest.java | 37 ++++------
...ulsarByteBufAllocatorOomThrowExceptionTest.java | 35 ++++-----
4 files changed, 75 insertions(+), 99 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 7d95f0963ba..44fad489dac 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -18,7 +18,9 @@
*/
package org.apache.pulsar.client.impl;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.util.ArrayList;
@@ -61,7 +63,19 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
// keep track of callbacks for individual messages being published in a batch
protected SendCallback firstCallback;
+ private final ByteBufAllocator allocator;
+
public BatchMessageContainerImpl() {
+ this(PulsarByteBufAllocator.DEFAULT);
+ }
+
+ /**
+ * This constructor is for testing only. The global allocator is always
+ * {@link PulsarByteBufAllocator#DEFAULT}.
+ */
+ @VisibleForTesting
+ BatchMessageContainerImpl(ByteBufAllocator allocator) {
+ this.allocator = allocator;
}
public BatchMessageContainerImpl(ProducerImpl<?> producer) {
@@ -84,8 +98,8 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
messageMetadata.setSequenceId(msg.getSequenceId());
lowestSequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
this.firstCallback = callback;
- batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT
- .buffer(Math.min(maxBatchSize, ClientCnx.getMaxMessageSize()));
+ batchedMessageMetadataAndPayload = allocator.buffer(
+ Math.min(maxBatchSize, ClientCnx.getMaxMessageSize()));
if (msg.getMessageBuilder().hasTxnidMostBits() && currentTxnidMostBits == -1) {
currentTxnidMostBits = msg.getMessageBuilder().getTxnidMostBits();
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
index 8fc018b3199..a4f0205c2cf 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
@@ -19,69 +19,51 @@
package org.apache.pulsar.client.impl;
import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
+import static org.testng.Assert.assertTrue;
+import io.netty.buffer.ByteBufAllocator;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.mockito.MockedConstruction;
-import org.mockito.Mockito;
-import org.powermock.reflect.Whitebox;
import org.testng.annotations.Test;
public class BatchMessageContainerImplTest {
@Test
- public void recoveryAfterOom() throws Exception {
- AtomicBoolean called = new AtomicBoolean();
- try (MockedConstruction<ByteBufAllocatorImpl> mocked = Mockito.mockConstruction(ByteBufAllocatorImpl.class,
- (mockAllocator, context) -> {
- called.set(true);
- doThrow(new OutOfMemoryError("test")).when(mockAllocator).buffer(anyInt(), anyInt());
- })) {
- if (PulsarByteBufAllocator.DEFAULT != null && !called.get()) {
- replaceByteBufAllocator();
- }
- final ProducerImpl producer = Mockito.mock(ProducerImpl.class);
- final ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData();
- producerConfigurationData.setCompressionType(CompressionType.NONE);
- when(producer.getConfiguration()).thenReturn(producerConfigurationData);
- final BatchMessageContainerImpl batchMessageContainer = new BatchMessageContainerImpl();
- batchMessageContainer.setProducer(producer);
- MessageMetadata messageMetadata1 = new MessageMetadata();
- messageMetadata1.setSequenceId(1L);
- messageMetadata1.setProducerName("producer1");
- messageMetadata1.setPublishTime(System.currentTimeMillis());
- ByteBuffer payload1 = ByteBuffer.wrap("payload1".getBytes(StandardCharsets.UTF_8));
- final MessageImpl<byte[]> message1 = MessageImpl.create(messageMetadata1, payload1, Schema.BYTES, null);
- batchMessageContainer.add(message1, null);
- MessageMetadata messageMetadata2 = new MessageMetadata();
- messageMetadata2.setSequenceId(1L);
- messageMetadata2.setProducerName("producer1");
- messageMetadata2.setPublishTime(System.currentTimeMillis());
- ByteBuffer payload2 = ByteBuffer.wrap("payload2".getBytes(StandardCharsets.UTF_8));
- final MessageImpl<byte[]> message2 = MessageImpl.create(messageMetadata2, payload2, Schema.BYTES, null);
- // after oom, our add can self-healing, won't throw exception
- batchMessageContainer.add(message2, null);
- } finally {
- replaceByteBufAllocator();
- }
-
- }
-
- private void replaceByteBufAllocator() throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
- Method createByteBufAllocatorMethod = PulsarByteBufAllocator.class.getDeclaredMethod("createByteBufAllocator");
- createByteBufAllocatorMethod.setAccessible(true);
- Whitebox.setInternalState(PulsarByteBufAllocator.class, "DEFAULT",
- createByteBufAllocatorMethod.invoke(null));
+ public void recoveryAfterOom() {
+ final AtomicBoolean called = new AtomicBoolean();
+ final ProducerImpl<?> producer = mock(ProducerImpl.class);
+ final ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData();
+ producerConfigurationData.setCompressionType(CompressionType.NONE);
+ when(producer.getConfiguration()).thenReturn(producerConfigurationData);
+ final ByteBufAllocator mockAllocator = mock(ByteBufAllocator.class);
+ doAnswer((ignore) -> {
+ called.set(true);
+ throw new OutOfMemoryError("test");
+ }).when(mockAllocator).buffer(anyInt());
+ final BatchMessageContainerImpl batchMessageContainer = new BatchMessageContainerImpl(mockAllocator);
+ batchMessageContainer.setProducer(producer);
+ MessageMetadata messageMetadata1 = new MessageMetadata();
+ messageMetadata1.setSequenceId(1L);
+ messageMetadata1.setProducerName("producer1");
+ messageMetadata1.setPublishTime(System.currentTimeMillis());
+ ByteBuffer payload1 = ByteBuffer.wrap("payload1".getBytes(StandardCharsets.UTF_8));
+ final MessageImpl<byte[]> message1 = MessageImpl.create(messageMetadata1, payload1, Schema.BYTES, null);
+ batchMessageContainer.add(message1, null);
+ assertTrue(called.get());
+ MessageMetadata messageMetadata2 = new MessageMetadata();
+ messageMetadata2.setSequenceId(1L);
+ messageMetadata2.setProducerName("producer1");
+ messageMetadata2.setPublishTime(System.currentTimeMillis());
+ ByteBuffer payload2 = ByteBuffer.wrap("payload2".getBytes(StandardCharsets.UTF_8));
+ final MessageImpl<byte[]> message2 = MessageImpl.create(messageMetadata2, payload2, Schema.BYTES, null);
+ // after oom, our add can self-healing, won't throw exception
+ batchMessageContainer.add(message2, null);
}
-
}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java
index fb81835c7c4..9c548ff25e4 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java
@@ -18,30 +18,30 @@
*/
package org.apache.pulsar.common.allocator;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBufAllocator;
-import lombok.extern.slf4j.Slf4j;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
import org.apache.bookkeeper.common.allocator.PoolingPolicy;
import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl;
import org.mockito.MockedConstruction;
import org.mockito.Mockito;
-import org.powermock.reflect.Whitebox;
import org.testng.annotations.Test;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-@Slf4j
public class PulsarByteBufAllocatorDefaultTest {
@Test
- public void testDefaultConfig() throws Exception {
+ public void testDefaultConfig() {
+ // Force initialize PulsarByteBufAllocator.DEFAULT before mock the ctor so that it is not polluted.
+ assertNotNull(PulsarByteBufAllocator.DEFAULT);
+
AtomicBoolean called = new AtomicBoolean();
- try (MockedConstruction<ByteBufAllocatorImpl> mocked = Mockito.mockConstruction(ByteBufAllocatorImpl.class,
+ try (MockedConstruction<ByteBufAllocatorImpl> ignored = Mockito.mockConstruction(ByteBufAllocatorImpl.class,
(mock, context) -> {
called.set(true);
final List<?> arguments = context.arguments();
@@ -49,21 +49,10 @@ public class PulsarByteBufAllocatorDefaultTest {
assertEquals(arguments.get(2), PoolingPolicy.PooledDirect);
assertEquals(arguments.get(4), OutOfMemoryPolicy.FallbackToHeap);
assertEquals(arguments.get(6), LeakDetectionPolicy.Advanced);
-
})) {
- final ByteBufAllocatorImpl byteBufAllocator = (ByteBufAllocatorImpl) PulsarByteBufAllocator.DEFAULT;
- // use the variable, in case the compiler optimization
- log.trace("{}", byteBufAllocator);
-
- if (!called.get()) {
- // maybe PulsarByteBufAllocator static initialization has already been called by a previous test
- // let's rerun the same method
- PulsarByteBufAllocator.createByteBufAllocator();
- }
+ assertFalse(called.get());
+ PulsarByteBufAllocator.createByteBufAllocator();
assertTrue(called.get());
- } finally {
- Whitebox.setInternalState(PulsarByteBufAllocator.class, "DEFAULT",
- PulsarByteBufAllocator.createByteBufAllocator());
}
}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java
index 20bbeb06590..e1a3176e5bd 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java
@@ -18,31 +18,31 @@
*/
package org.apache.pulsar.common.allocator;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBufAllocator;
-import lombok.extern.slf4j.Slf4j;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
import org.apache.bookkeeper.common.allocator.PoolingPolicy;
import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl;
import org.mockito.MockedConstruction;
import org.mockito.Mockito;
-import org.powermock.reflect.Whitebox;
import org.testng.annotations.Test;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-@Slf4j
public class PulsarByteBufAllocatorOomThrowExceptionTest {
@Test
- public void testDefaultConfig() throws Exception {
+ public void testDefaultConfig() {
+ // Force initialize PulsarByteBufAllocator.DEFAULT before mock the ctor so that it is not polluted.
+ assertNotNull(PulsarByteBufAllocator.DEFAULT);
+
AtomicBoolean called = new AtomicBoolean();
System.setProperty("pulsar.allocator.out_of_memory_policy", "ThrowException");
- try (MockedConstruction<ByteBufAllocatorImpl> mocked = Mockito.mockConstruction(ByteBufAllocatorImpl.class,
+ try (MockedConstruction<ByteBufAllocatorImpl> ignored = Mockito.mockConstruction(ByteBufAllocatorImpl.class,
(mock, context) -> {
called.set(true);
final List<?> arguments = context.arguments();
@@ -50,21 +50,12 @@ public class PulsarByteBufAllocatorOomThrowExceptionTest {
assertEquals(arguments.get(2), PoolingPolicy.PooledDirect);
assertEquals(arguments.get(4), OutOfMemoryPolicy.ThrowException);
assertEquals(arguments.get(6), LeakDetectionPolicy.Advanced);
-
})) {
- final ByteBufAllocatorImpl byteBufAllocator = (ByteBufAllocatorImpl) PulsarByteBufAllocator.DEFAULT;
- // use the variable, in case the compiler optimization
- log.trace("{}", byteBufAllocator);
- if (!called.get()) {
- // maybe PulsarByteBufAllocator static initialization has already been called by a previous test
- // let's rerun the same method
- PulsarByteBufAllocator.createByteBufAllocator();
- }
+ assertFalse(called.get());
+ PulsarByteBufAllocator.createByteBufAllocator();
assertTrue(called.get());
} finally {
System.clearProperty("pulsar.allocator.out_of_memory_policy");
- Whitebox.setInternalState(PulsarByteBufAllocator.class, "DEFAULT",
- PulsarByteBufAllocator.createByteBufAllocator());
}
}