You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/04/25 11:12:39 UTC
(pulsar) branch master updated: [fix][broker] Fix BufferOverflowException and EOFException bugs in /metrics gzip compression (#22576)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 997c8b95e17 [fix][broker] Fix BufferOverflowException and EOFException bugs in /metrics gzip compression (#22576)
997c8b95e17 is described below
commit 997c8b95e1798cee08c56d92b77eb70056dfca8f
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Apr 25 14:12:33 2024 +0300
[fix][broker] Fix BufferOverflowException and EOFException bugs in /metrics gzip compression (#22576)
---
.../prometheus/PrometheusMetricsGenerator.java | 74 +++++++++++++------
.../prometheus/PrometheusMetricsGeneratorTest.java | 85 ++++++++++++++++++++++
2 files changed, 138 insertions(+), 21 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 8cd68caf1ee..6b4d08c359d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -24,6 +24,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
import io.prometheus.client.Collector;
import java.io.BufferedOutputStream;
import java.io.IOException;
@@ -191,8 +192,8 @@ public class PrometheusMetricsGenerator implements AutoCloseable {
crc = new CRC32();
this.bufferSize = Math.max(Math.min(resolveChunkSize(bufAllocator), readableBytes), 8192);
this.bufAllocator = bufAllocator;
- this.resultBuffer = bufAllocator.compositeDirectBuffer(readableBytes / bufferSize + 1);
- allocateBuffer();
+ this.resultBuffer = bufAllocator.compositeDirectBuffer(readableBytes / bufferSize + 2);
+ allocateCompressBuffer();
}
/**
@@ -217,37 +218,66 @@ public class PrometheusMetricsGenerator implements AutoCloseable {
// write gzip header
compressBuffer.put(GZIP_HEADER);
}
+ // update the CRC32 checksum calculation
nioBuffer.mark();
crc.update(nioBuffer);
nioBuffer.reset();
+ // pass the input buffer to the deflater
deflater.setInput(nioBuffer);
+ // when the input buffer is the last one, set the flag to finish the deflater
if (isLast) {
deflater.finish();
}
- while (!deflater.needsInput() && !deflater.finished()) {
- int written = deflater.deflate(compressBuffer);
- if (written == 0 && !compressBuffer.hasRemaining()) {
- backingCompressBuffer.setIndex(0, compressBuffer.position());
- resultBuffer.addComponent(true, backingCompressBuffer);
- allocateBuffer();
+ int written = -1;
+ // the deflater may need multiple calls to deflate the input buffer
+ // the completion is checked by the deflater.needsInput() method for buffers that aren't the last buffer
+ // for the last buffer, the completion is checked by the deflater.finished() method
+ while (!isLast && !deflater.needsInput() || isLast && !deflater.finished()) {
+ // when the previous deflater.deflate call returns 0 (and needsInput/finished returns false),
+ // it means that the output buffer is full.
+ // append the compressed buffer to the result buffer and allocate a new buffer.
+ if (written == 0) {
+ if (compressBuffer.position() > 0) {
+ appendCompressBufferToResultBuffer();
+ allocateCompressBuffer();
+ } else {
+ // this is an unexpected case, throw an exception to prevent an infinite loop
+ throw new IllegalStateException(
+ "Deflater didn't write any bytes while the compress buffer is empty.");
+ }
}
+ written = deflater.deflate(compressBuffer);
}
if (isLast) {
- // write gzip footer, integer values are in little endian byte order
- compressBuffer.order(ByteOrder.LITTLE_ENDIAN);
- // write CRC32 checksum
- compressBuffer.putInt((int) crc.getValue());
- // write uncompressed size
- compressBuffer.putInt(deflater.getTotalIn());
- // append the last compressed buffer
- backingCompressBuffer.setIndex(0, compressBuffer.position());
- resultBuffer.addComponent(true, backingCompressBuffer);
+ // append the last compressed buffer when it is not empty
+ if (compressBuffer.position() > 0) {
+ appendCompressBufferToResultBuffer();
+ } else {
+ // release an unused empty buffer
+ backingCompressBuffer.release();
+ }
backingCompressBuffer = null;
compressBuffer = null;
+
+ // write gzip trailer, 2 integers (CRC32 checksum and uncompressed size)
+ ByteBuffer trailerBuf = ByteBuffer.allocate(2 * Integer.BYTES);
+ // integer values are in little endian byte order
+ trailerBuf.order(ByteOrder.LITTLE_ENDIAN);
+ // write CRC32 checksum
+ trailerBuf.putInt((int) crc.getValue());
+ // write uncompressed size
+ trailerBuf.putInt(deflater.getTotalIn());
+ trailerBuf.flip();
+ resultBuffer.addComponent(true, Unpooled.wrappedBuffer(trailerBuf));
}
}
- private void allocateBuffer() {
+ private void appendCompressBufferToResultBuffer() {
+ backingCompressBuffer.setIndex(0, compressBuffer.position());
+ resultBuffer.addComponent(true, backingCompressBuffer);
+ }
+
+ private void allocateCompressBuffer() {
backingCompressBuffer = bufAllocator.directBuffer(bufferSize);
compressBuffer = backingCompressBuffer.nioBuffer(0, bufferSize);
}
@@ -282,7 +312,7 @@ public class PrometheusMetricsGenerator implements AutoCloseable {
this.clock = clock;
}
- private ByteBuf generate0(List<PrometheusRawMetricsProvider> metricsProviders) {
+ protected ByteBuf generateMetrics(List<PrometheusRawMetricsProvider> metricsProviders) {
ByteBuf buf = allocateMultipartCompositeDirectBuffer();
boolean exceptionHappens = false;
//Used in namespace/topic and transaction aggregators as share metric names
@@ -342,7 +372,9 @@ public class PrometheusMetricsGenerator implements AutoCloseable {
int totalLen = 0;
while (totalLen < initialBufferSize) {
totalLen += chunkSize;
- buf.addComponent(false, byteBufAllocator.directBuffer(chunkSize));
+ // increase the capacity in increments of chunkSize to preallocate the buffers
+ // in the composite buffer
+ buf.capacity(totalLen);
}
return buf;
}
@@ -492,7 +524,7 @@ public class PrometheusMetricsGenerator implements AutoCloseable {
CompletableFuture<ResponseBuffer> bufferFuture = newMetricsBuffer.getBufferFuture();
executor.execute(() -> {
try {
- bufferFuture.complete(new ResponseBuffer(generate0(metricsProviders)));
+ bufferFuture.complete(new ResponseBuffer(generateMetrics(metricsProviders)));
} catch (Exception e) {
bufferFuture.completeExceptionally(e);
} finally {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorTest.java
new file mode 100644
index 00000000000..ed5c5a6335c
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.time.Clock;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.zip.GZIPInputStream;
+import org.apache.commons.io.IOUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.testng.annotations.Test;
+
+public class PrometheusMetricsGeneratorTest {
+
+ // reproduce issue #22575
+ @Test
+ public void testReproducingBufferOverflowExceptionAndEOFExceptionBugsInGzipCompression()
+ throws ExecutionException, InterruptedException, IOException {
+ PulsarService pulsar = mock(PulsarService.class);
+ ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
+ when(pulsar.getConfiguration()).thenReturn(serviceConfiguration);
+
+ // generate a random byte buffer which is 8 bytes less than the minimum compress buffer size limit
+ // this will trigger the BufferOverflowException bug in writing the gzip trailer
+ // it will also trigger another bug in finishing the gzip compression stream when the compress buffer is full
+ // which results in EOFException
+ Random random = new Random();
+ byte[] inputBytes = new byte[8192 - 8];
+ random.nextBytes(inputBytes);
+ ByteBuf byteBuf = Unpooled.wrappedBuffer(inputBytes);
+
+ PrometheusMetricsGenerator generator =
+ new PrometheusMetricsGenerator(pulsar, false, false, false, false, Clock.systemUTC()) {
+ // override the generateMetrics method to return the random byte buffer for gzip compression
+ // instead of the actual metrics
+ @Override
+ protected ByteBuf generateMetrics(List<PrometheusRawMetricsProvider> metricsProviders) {
+ return byteBuf;
+ }
+ };
+
+ PrometheusMetricsGenerator.MetricsBuffer metricsBuffer =
+ generator.renderToBuffer(MoreExecutors.directExecutor(), Collections.emptyList());
+ try {
+ PrometheusMetricsGenerator.ResponseBuffer responseBuffer = metricsBuffer.getBufferFuture().get();
+
+ ByteBuf compressed = responseBuffer.getCompressedBuffer(MoreExecutors.directExecutor()).get();
+ byte[] compressedBytes = new byte[compressed.readableBytes()];
+ compressed.readBytes(compressedBytes);
+ try (GZIPInputStream gzipInputStream = new GZIPInputStream(new ByteArrayInputStream(compressedBytes))) {
+ byte[] uncompressedBytes = IOUtils.toByteArray(gzipInputStream);
+ assertEquals(uncompressedBytes, inputBytes);
+ }
+ } finally {
+ metricsBuffer.release();
+ }
+ }
+}
\ No newline at end of file