You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/04/25 12:02:26 UTC

(pulsar) branch branch-3.1 updated: [fix][broker] Fix BufferOverflowException and EOFException bugs in /metrics gzip compression (#22576)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new fe05e089c97 [fix][broker] Fix BufferOverflowException and EOFException bugs in /metrics gzip compression (#22576)
fe05e089c97 is described below

commit fe05e089c97fb798b478b3ea0ad3877768c4fcee
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Apr 25 14:12:33 2024 +0300

    [fix][broker] Fix BufferOverflowException and EOFException bugs in /metrics gzip compression (#22576)
    
    (cherry picked from commit 997c8b95e1798cee08c56d92b77eb70056dfca8f)
---
 .../prometheus/PrometheusMetricsGenerator.java     | 74 +++++++++++++------
 .../prometheus/PrometheusMetricsGeneratorTest.java | 85 ++++++++++++++++++++++
 2 files changed, 138 insertions(+), 21 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index bfcbb5ec89d..9dfa7673fe8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -24,6 +24,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.CompositeByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
 import io.prometheus.client.Collector;
 import java.io.BufferedOutputStream;
 import java.io.IOException;
@@ -192,8 +193,8 @@ public class PrometheusMetricsGenerator implements AutoCloseable {
             crc = new CRC32();
             this.bufferSize = Math.max(Math.min(resolveChunkSize(bufAllocator), readableBytes), 8192);
             this.bufAllocator = bufAllocator;
-            this.resultBuffer = bufAllocator.compositeDirectBuffer(readableBytes / bufferSize + 1);
-            allocateBuffer();
+            this.resultBuffer = bufAllocator.compositeDirectBuffer(readableBytes / bufferSize + 2);
+            allocateCompressBuffer();
         }
 
         /**
@@ -218,37 +219,66 @@ public class PrometheusMetricsGenerator implements AutoCloseable {
                 // write gzip header
                 compressBuffer.put(GZIP_HEADER);
             }
+            // update the CRC32 checksum calculation
             nioBuffer.mark();
             crc.update(nioBuffer);
             nioBuffer.reset();
+            // pass the input buffer to the deflater
             deflater.setInput(nioBuffer);
+            // when the input buffer is the last one, set the flag to finish the deflater
             if (isLast) {
                 deflater.finish();
             }
-            while (!deflater.needsInput() && !deflater.finished()) {
-                int written = deflater.deflate(compressBuffer);
-                if (written == 0 && !compressBuffer.hasRemaining()) {
-                    backingCompressBuffer.setIndex(0, compressBuffer.position());
-                    resultBuffer.addComponent(true, backingCompressBuffer);
-                    allocateBuffer();
+            int written = -1;
+            // the deflater may need multiple calls to deflate the input buffer
+            // the completion is checked by the deflater.needsInput() method for buffers that aren't the last buffer
+            // for the last buffer, the completion is checked by the deflater.finished() method
+            while (!isLast && !deflater.needsInput() || isLast && !deflater.finished()) {
+                // when the previous deflater.deflate call returns 0 (and needsInput/finished returns false),
+                // it means that the output buffer is full.
+                // append the compressed buffer to the result buffer and allocate a new buffer.
+                if (written == 0) {
+                    if (compressBuffer.position() > 0) {
+                        appendCompressBufferToResultBuffer();
+                        allocateCompressBuffer();
+                    } else {
+                        // this is an unexpected case, throw an exception to prevent an infinite loop
+                        throw new IllegalStateException(
+                                "Deflater didn't write any bytes while the compress buffer is empty.");
+                    }
                 }
+                written = deflater.deflate(compressBuffer);
             }
             if (isLast) {
-                // write gzip footer, integer values are in little endian byte order
-                compressBuffer.order(ByteOrder.LITTLE_ENDIAN);
-                // write CRC32 checksum
-                compressBuffer.putInt((int) crc.getValue());
-                // write uncompressed size
-                compressBuffer.putInt(deflater.getTotalIn());
-                // append the last compressed buffer
-                backingCompressBuffer.setIndex(0, compressBuffer.position());
-                resultBuffer.addComponent(true, backingCompressBuffer);
+                // append the last compressed buffer when it is not empty
+                if (compressBuffer.position() > 0) {
+                    appendCompressBufferToResultBuffer();
+                } else {
+                    // release an unused empty buffer
+                    backingCompressBuffer.release();
+                }
                 backingCompressBuffer = null;
                 compressBuffer = null;
+
+                // write gzip trailer, 2 integers (CRC32 checksum and uncompressed size)
+                ByteBuffer trailerBuf = ByteBuffer.allocate(2 * Integer.BYTES);
+                // integer values are in little endian byte order
+                trailerBuf.order(ByteOrder.LITTLE_ENDIAN);
+                // write CRC32 checksum
+                trailerBuf.putInt((int) crc.getValue());
+                // write uncompressed size
+                trailerBuf.putInt(deflater.getTotalIn());
+                trailerBuf.flip();
+                resultBuffer.addComponent(true, Unpooled.wrappedBuffer(trailerBuf));
             }
         }
 
-        private void allocateBuffer() {
+        private void appendCompressBufferToResultBuffer() {
+            backingCompressBuffer.setIndex(0, compressBuffer.position());
+            resultBuffer.addComponent(true, backingCompressBuffer);
+        }
+
+        private void allocateCompressBuffer() {
             backingCompressBuffer = bufAllocator.directBuffer(bufferSize);
             compressBuffer = backingCompressBuffer.nioBuffer(0, bufferSize);
         }
@@ -283,7 +313,7 @@ public class PrometheusMetricsGenerator implements AutoCloseable {
         this.clock = clock;
     }
 
-    private ByteBuf generate0(List<PrometheusRawMetricsProvider> metricsProviders) {
+    protected ByteBuf generateMetrics(List<PrometheusRawMetricsProvider> metricsProviders) {
         ByteBuf buf = allocateMultipartCompositeDirectBuffer();
         boolean exceptionHappens = false;
         //Used in namespace/topic and transaction aggregators as share metric names
@@ -343,7 +373,9 @@ public class PrometheusMetricsGenerator implements AutoCloseable {
         int totalLen = 0;
         while (totalLen < initialBufferSize) {
             totalLen += chunkSize;
-            buf.addComponent(false, byteBufAllocator.directBuffer(chunkSize));
+            // increase the capacity in increments of chunkSize to preallocate the buffers
+            // in the composite buffer
+            buf.capacity(totalLen);
         }
         return buf;
     }
@@ -493,7 +525,7 @@ public class PrometheusMetricsGenerator implements AutoCloseable {
                     CompletableFuture<ResponseBuffer> bufferFuture = newMetricsBuffer.getBufferFuture();
                     executor.execute(() -> {
                         try {
-                            bufferFuture.complete(new ResponseBuffer(generate0(metricsProviders)));
+                            bufferFuture.complete(new ResponseBuffer(generateMetrics(metricsProviders)));
                         } catch (Exception e) {
                             bufferFuture.completeExceptionally(e);
                         } finally {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorTest.java
new file mode 100644
index 00000000000..ed5c5a6335c
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.time.Clock;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.zip.GZIPInputStream;
+import org.apache.commons.io.IOUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.testng.annotations.Test;
+
+public class PrometheusMetricsGeneratorTest {
+
+    // reproduce issue #22575
+    @Test
+    public void testReproducingBufferOverflowExceptionAndEOFExceptionBugsInGzipCompression()
+            throws ExecutionException, InterruptedException, IOException {
+        PulsarService pulsar = mock(PulsarService.class);
+        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
+        when(pulsar.getConfiguration()).thenReturn(serviceConfiguration);
+
+        // generate a random byte buffer which is 8 bytes less than the minimum compress buffer size limit
+        // this will trigger the BufferOverflowException bug in writing the gzip trailer
+        // it will also trigger another bug in finishing the gzip compression stream when the compress buffer is full
+        // which results in EOFException
+        Random random = new Random();
+        byte[] inputBytes = new byte[8192 - 8];
+        random.nextBytes(inputBytes);
+        ByteBuf byteBuf = Unpooled.wrappedBuffer(inputBytes);
+
+        PrometheusMetricsGenerator generator =
+                new PrometheusMetricsGenerator(pulsar, false, false, false, false, Clock.systemUTC()) {
+                    // override the generateMetrics method to return the random byte buffer for gzip compression
+                    // instead of the actual metrics
+                    @Override
+                    protected ByteBuf generateMetrics(List<PrometheusRawMetricsProvider> metricsProviders) {
+                        return byteBuf;
+                    }
+                };
+
+        PrometheusMetricsGenerator.MetricsBuffer metricsBuffer =
+                generator.renderToBuffer(MoreExecutors.directExecutor(), Collections.emptyList());
+        try {
+            PrometheusMetricsGenerator.ResponseBuffer responseBuffer = metricsBuffer.getBufferFuture().get();
+
+            ByteBuf compressed = responseBuffer.getCompressedBuffer(MoreExecutors.directExecutor()).get();
+            byte[] compressedBytes = new byte[compressed.readableBytes()];
+            compressed.readBytes(compressedBytes);
+            try (GZIPInputStream gzipInputStream = new GZIPInputStream(new ByteArrayInputStream(compressedBytes))) {
+                byte[] uncompressedBytes = IOUtils.toByteArray(gzipInputStream);
+                assertEquals(uncompressedBytes, inputBytes);
+            }
+        } finally {
+            metricsBuffer.release();
+        }
+    }
+}
\ No newline at end of file