You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/23 02:33:54 UTC

[pulsar] branch branch-2.6 updated (1d82d22 -> fbbc251)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 1d82d22  add if for SubscriptionBusyException (#9017)
     new 5febead  remove duplicated broker prometheus metrics type (#8995)
     new fbbc251  Issue 8974: Peeking at compressed messages throws an exception (Readonly buffers not supported by Airlift) (#8990)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../prometheus/PrometheusMetricsGenerator.java     | 12 +++--
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 53 ++++++++++++++++++++--
 ...CompressionCodecNone.java => AirliftUtils.java} | 26 +++++------
 .../common/compression/CompressionCodecLZ4.java    |  3 +-
 .../common/compression/CompressionCodecSnappy.java |  1 +
 .../common/compression/CompressionCodecZstd.java   |  2 +-
 .../common/compression/CompressorCodecTest.java    | 31 +++++++++++++
 7 files changed, 105 insertions(+), 23 deletions(-)
 copy pulsar-common/src/main/java/org/apache/pulsar/common/compression/{CompressionCodecNone.java => AirliftUtils.java} (63%)


[pulsar] 01/02: remove duplicated broker prometheus metrics type (#8995)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5febeada1e46d66ef506bf741e603e2a973ee440
Author: Ming <it...@gmail.com>
AuthorDate: Tue Dec 22 12:53:03 2020 -0500

    remove duplicated broker prometheus metrics type (#8995)
    
    ### Motivation
    
    If there are multiple topics from different namespaces, the broker prometheus metrics will print out duplicated `# TYPE` definition for pulsar_ml_AddEntryBytesRate and other managed ledger metrics.
    
    In fact, this problem can be verified by `promtool` https://github.com/prometheus/prometheus#building-from-source
    
    On the broker, run this command to check validity of Pulsar broker metric format.
    `curl localhost:8080/metrics/ | ~/go/bin/promtool check metrics`
    
    ### Modifications
    
    To prevent duplicated metrics type definition, the definition is now tracked and only printed out once. It leverages the existing metrics name Set already defined under parseMetricsToPrometheusMetrics() in PrometheusMetricsGenerator.java
    
    ### Verifying this change
    
    - [ x] Make sure that the change passes the CI checks.
    
    This change added tests and can be verified as follows:
    
    Added two topics under new namespaces to trigger conditions that duplicated prometheus type could happen previously under testManagedLedgerStats() of PrometheusMetricsTest.java. Updated test cases checks this duplicated type problem.
    
    (cherry picked from commit 73198195efc6f25e162840451e054473daf25f17)
---
 .../prometheus/PrometheusMetricsGenerator.java     | 12 +++--
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 53 ++++++++++++++++++++--
 2 files changed, 58 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 745eef8..710cee5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -140,9 +140,15 @@ public class PrometheusMetricsGenerator {
                         continue;
                     }
                 } else {
-                    stream.write("# TYPE ").write(entry.getKey().replace("brk_", "pulsar_")).write(' ')
-                            .write(getTypeStr(metricType)).write('\n');
-                    stream.write(entry.getKey().replace("brk_", "pulsar_"))
+
+
+                    String name = entry.getKey();
+                    if (!names.contains(name)) {
+                        stream.write("# TYPE ").write(entry.getKey().replace("brk_", "pulsar_")).write(' ')
+                                .write(getTypeStr(metricType)).write('\n');
+                        names.add(name);
+                    }
+                    stream.write(name.replace("brk_", "pulsar_"))
                             .write("{cluster=\"").write(cluster).write('"');
                 }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 05b68f4..c6200009 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -471,10 +471,14 @@ public class PrometheusMetricsTest extends BrokerTestBase {
     public void testManagedLedgerStats() throws Exception {
         Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
         Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
+        Producer<byte[]> p3 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns2/my-topic1").create();
+        Producer<byte[]> p4 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns2/my-topic2").create();
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
             p1.send(message.getBytes());
             p2.send(message.getBytes());
+            p3.send(message.getBytes());
+            p4.send(message.getBytes());
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
@@ -487,18 +491,59 @@ public class PrometheusMetricsTest extends BrokerTestBase {
                 System.out.println(e.getKey() + ": " + e.getValue())
         );
 
+        Map<String, String> typeDefs = new HashMap<String, String>();
+        Map<String, String> metricNames = new HashMap<String, String>();
+
+        Pattern typePattern = Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)");
+        Pattern metricNamePattern = Pattern.compile("^(\\w+)\\{.+");
+
+        Splitter.on("\n").split(metricsStr).forEach(line -> {
+            if (line.isEmpty()) {
+                return;
+            }
+            if (line.startsWith("#")) {
+                // Check for duplicate type definitions
+                Matcher typeMatcher = typePattern.matcher(line);
+                checkArgument(typeMatcher.matches());
+                String metricName = typeMatcher.group(1);
+                String type = typeMatcher.group(2);
+
+                // From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
+                // "Only one TYPE line may exist for a given metric name."
+                if (!typeDefs.containsKey(metricName)) {
+                    typeDefs.put(metricName, type);
+                } else {
+                    fail("Duplicate type definition found for TYPE definition " + metricName);
+                }
+                // From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
+                // "The TYPE line for a metric name must appear before the first sample is reported for that metric name."
+                if (metricNames.containsKey(metricName)) {
+                    fail("TYPE definition for " + metricName + " appears after first sample");
+                }
+            } else {
+                Matcher metricMatcher = metricNamePattern.matcher(line);
+                checkArgument(metricMatcher.matches());
+                String metricName = metricMatcher.group(1);
+                metricNames.put(metricName, metricName);
+            }
+        });
+
         List<Metric> cm = (List<Metric>) metrics.get("pulsar_ml_AddEntryBytesRate");
-        assertEquals(cm.size(), 1);
+        assertEquals(cm.size(), 2);
         assertEquals(cm.get(0).tags.get("cluster"), "test");
-        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+        String ns = cm.get(0).tags.get("namespace");
+        assertEquals(ns.equals("my-property/use/my-ns") || ns.equals("my-property/use/my-ns2"), true);
 
         cm = (List<Metric>) metrics.get("pulsar_ml_AddEntryMessagesRate");
-        assertEquals(cm.size(), 1);
+        assertEquals(cm.size(), 2);
         assertEquals(cm.get(0).tags.get("cluster"), "test");
-        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+        ns = cm.get(0).tags.get("namespace");
+        assertEquals(ns.equals("my-property/use/my-ns") || ns.equals("my-property/use/my-ns2"), true);
 
         p1.close();
         p2.close();
+        p3.close();
+        p4.close();
     }
 
     @Test


[pulsar] 02/02: Issue 8974: Peeking at compressed messages throws an exception (Readonly buffers not supported by Airlift) (#8990)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fbbc251de7e312b63382ad3940b62593a112a82e
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Tue Dec 22 18:53:27 2020 +0100

    Issue 8974: Peeking at compressed messages throws an exception (Readonly buffers not supported by Airlift) (#8990)
    
    Fixes #8974
    
    ### Motivation
    In certain cases peeking messages on compresses topics return an error, see #8974 because Airlift does not support readonly ByteBuffers, because they do not give access to the underlying array)
    
    ### Modifications
    
    Copy the ByteByffer in case of unsupported buffer type
    
    ### Verifying this change
    
    This change adds new tests that reproduce the error and demonstrate that the problem is fixed.
    
    (cherry picked from commit cbc606b0b0e836c1238ea1ba92400b3f14e5b349)
---
 .../pulsar/common/compression/AirliftUtils.java    | 38 ++++++++++++++++++++++
 .../common/compression/CompressionCodecLZ4.java    |  3 +-
 .../common/compression/CompressionCodecSnappy.java |  1 +
 .../common/compression/CompressionCodecZstd.java   |  2 +-
 .../common/compression/CompressorCodecTest.java    | 31 ++++++++++++++++++
 5 files changed, 73 insertions(+), 2 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/AirliftUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/AirliftUtils.java
new file mode 100644
index 0000000..3bfc609
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/AirliftUtils.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.compression;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Utilities.
+ */
+public abstract class AirliftUtils {
+
+    static ByteBuffer ensureAirliftSupported(ByteBuffer encodedNio, int uncompressedLength) {
+        if (!encodedNio.isDirect() && !encodedNio.hasArray()) {
+            // airlift needs a raw ByteArray
+            ByteBuffer copy = ByteBuffer.allocate(uncompressedLength);
+            copy.put(encodedNio);
+            copy.flip();
+            encodedNio = copy;
+        }
+        return encodedNio;
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java
index 2493af4..12a03d1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java
@@ -96,11 +96,12 @@ public class CompressionCodecLZ4 implements CompressionCodec {
         } else {
             ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength);
             ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes());
-
+            encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio, uncompressedLength);
             LZ4_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio);
         }
 
         uncompressed.writerIndex(uncompressedLength);
         return uncompressed;
     }
+
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
index 517f1ca..1e31edc 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
@@ -103,6 +103,7 @@ public class CompressionCodecSnappy implements CompressionCodec {
             ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength);
             ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes());
 
+            encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio, uncompressedLength);
             SNAPPY_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio);
         }
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java
index 944e1e5..18caee6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java
@@ -97,7 +97,7 @@ public class CompressionCodecZstd implements CompressionCodec {
         } else {
             ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength);
             ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes());
-
+            encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio, uncompressedLength);
             ZSTD_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio);
         }
 
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
index 46e5718..ec84741 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
@@ -84,6 +84,37 @@ public class CompressorCodecTest {
     }
 
     @Test(dataProvider = "codec")
+    void testDecompressReadonlyByteBuf(CompressionType type, String compressedText) throws IOException {
+        CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(type);
+        byte[] data = text.getBytes();
+        ByteBuf raw = PulsarByteBufAllocator.DEFAULT.directBuffer();
+        raw.writeBytes(data);
+
+        ByteBuf compressed = codec.encode(raw);
+        assertEquals(raw.readableBytes(), data.length);
+
+        int compressedSize = compressed.readableBytes();
+        // Readonly ByteBuffers are not supported by AirLift
+        // https://github.com/apache/pulsar/issues/8974
+        ByteBuf compressedComplexByteBuf = compressed.asReadOnly();
+        ByteBuf uncompressed = codec.decode(compressedComplexByteBuf, data.length);
+
+        assertEquals(compressed.readableBytes(), compressedSize);
+
+        assertEquals(uncompressed.readableBytes(), data.length);
+        assertEquals(uncompressed, raw);
+
+        raw.release();
+        compressed.release();
+        uncompressed.release();
+
+        // Verify compression codecs have the same behavior with buffers ref counting
+        assertEquals(raw.refCnt(), 0);
+        assertEquals(compressed.refCnt(), 0);
+        assertEquals(compressed.refCnt(), 0);
+    }
+
+    @Test(dataProvider = "codec")
     void testEmptyInput(CompressionType type, String compressedText) throws IOException {
         CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(type);