You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/04 02:44:38 UTC

[pulsar] branch branch-2.7 updated (b14b301 -> 13c4f9b)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from b14b301  Configurable data source for offloaded messages (#8717)
     new 657f117  [#9177] add metrics server to go function (#9318)
     new 5fc5f65  Fix setting backlogQuota will always succeed (#9382)
     new d48a575  [pulsar-client-cpp] Fix broken replication msg to specific cluster (#9372)
     new 6b9df95  [go-functions] fix metrics server handler error (#9394)
     new 732c839  [tiered-storage] Allow AWS credentials to be refreshed (#9387)
     new 5b7f99f  [pulsar-broker]Add alerts for expired/expiring soon tokens (#9321)
     new dfef57d  Skip clear delayed messages while dispatch does not init (#9378)
     new 297a1b9  fix narExtractionDirectory not set (#9319)
     new 13c4f9b  Compression must be applied during deferred schema preparation and enableBatching is enabled (#9396)

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../AuthenticationProviderToken.java               |  30 +++++-
 .../service/persistent/PersistentSubscription.java |   4 +-
 .../apache/pulsar/broker/admin/AdminApiTest2.java  |   9 +-
 .../pulsar/broker/admin/TopicPoliciesTest.java     |  23 +++++
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 110 +++++++++++++++++++++
 .../client/api/SimpleProducerConsumerTest.java     |  48 +++++++++
 pulsar-client-cpp/lib/Commands.cc                  |   5 +
 .../apache/pulsar/client/impl/ProducerImpl.java    |  28 ++++--
 .../apache/pulsar/common/nar/NarClassLoader.java   |   2 +-
 pulsar-function-go/conf/conf.go                    |   2 +
 pulsar-function-go/conf/conf.yaml                  |   2 +
 pulsar-function-go/go.mod                          |   3 +-
 pulsar-function-go/go.sum                          |   6 ++
 pulsar-function-go/pf/context.go                   |   4 +
 pulsar-function-go/pf/instance.go                  |   3 +
 pulsar-function-go/pf/instanceConf.go              |   2 +
 pulsar-function-go/pf/instanceConf_test.go         |   1 +
 pulsar-function-go/pf/stats.go                     |  49 ++++++++-
 pulsar-function-go/pf/stats_test.go                |  30 ++++++
 .../functions/instance/go/GoInstanceConfig.java    |   2 +
 .../pulsar/functions/runtime/RuntimeUtils.java     |   9 +-
 .../pulsar/functions/runtime/RuntimeUtilsTest.java |   3 +-
 .../runtime/kubernetes/KubernetesRuntimeTest.java  |   1 +
 .../jcloud/provider/JCloudBlobStoreProvider.java   |  50 ++++++----
 .../provider/TieredStorageConfigurationTests.java  |  54 +++++-----
 25 files changed, 419 insertions(+), 61 deletions(-)


[pulsar] 03/09: [pulsar-client-cpp] Fix broken replication msg to specific cluster (#9372)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d48a575dfc7e2f54700543c55faf95a11b9fdfd4
Author: k2la <mz...@gmail.com>
AuthorDate: Mon Feb 1 11:31:39 2021 +0900

    [pulsar-client-cpp] Fix broken replication msg to specific cluster (#9372)
    
    Same as https://github.com/apache/pulsar/pull/4930 .
    
    (cherry picked from commit 6c914d4352a48ffa65f2f38ef59dea06e6393e50)
---
 pulsar-client-cpp/lib/Commands.cc | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 0f2b8cf..239b9f0 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -653,6 +653,11 @@ void Commands::initBatchMessageMetadata(const Message& msg, pulsar::proto::Messa
     if (metadata.has_replicated_from()) {
         batchMetadata.set_replicated_from(metadata.replicated_from());
     }
+    if (metadata.replicate_to_size() > 0) {
+        for (int i = 0; i < metadata.replicate_to_size(); i++) {
+            batchMetadata.add_replicate_to(metadata.replicate_to(i));
+        }
+    }
     // TODO: set other optional fields
 }
 


[pulsar] 09/09: Compression must be applied during deferred schema preparation and enableBatching is enabled (#9396)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 13c4f9b741a538c9c92c3298ec4954b732b2814f
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Thu Feb 4 00:44:07 2021 +0100

    Compression must be applied during deferred schema preparation and enableBatching is enabled (#9396)
    
    * Compression must be applied during deferred schema preparation and enableBatching is enabled
    if you do not set an initial schema to the Producer the schema must be prepared at the first message with a Schema.
    There is a bug and compression is not applied in this case, and the consumer receives an uncompressed message, failing
    
    * address Matteo's comments
    
    Co-authored-by: Enrico Olivelli <eo...@apache.org>
    (cherry picked from commit 014385061ec788c981c9dc78a207ed7797103221)
---
 .../client/api/SimpleProducerConsumerTest.java     | 48 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    | 28 ++++++++++---
 2 files changed, 70 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 8eda0d0..214ec31 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -70,12 +70,15 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import lombok.Cleanup;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConsumerBase;
 import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -3746,4 +3749,49 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
             Assert.assertEquals(size, 0);
         });
     }
+
+
+    @Data
+    @EqualsAndHashCode
+    public static class MyBean {
+        private String field;
+    }
+
+    @DataProvider(name = "enableBatching")
+    public static Object[] isEnableBatching() {
+        return new Object[]{false, true};
+    }
+
+    @Test(dataProvider = "enableBatching")
+    public void testSendCompressedWithDeferredSchemaSetup(boolean enableBatching) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final String topic = "persistent://my-property/my-ns/deferredSchemaCompressed";
+        Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionName("testsub")
+                .subscribe();
+
+        // initially we are not setting a Schema in the producer
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
+                .enableBatching(enableBatching)
+                .compressionType(CompressionType.LZ4)
+                .create();
+        MyBean payload = new MyBean();
+        payload.setField("aaaaaaaaaaaaaaaaaaaaaaaaa");
+
+        // now we send with a schema, but we have enabled compression and batching
+        // the producer will have to setup the schema and resume the send
+        producer.newMessage(Schema.AVRO(MyBean.class)).value(payload).send();
+        producer.close();
+
+        GenericRecord res = consumer.receive().getValue();
+        consumer.close();
+        for (org.apache.pulsar.client.api.schema.Field f : res.getFields()) {
+            log.info("field {} {}", f.getName(), res.getField(f));
+            assertEquals("field", f.getName());
+            assertEquals("aaaaaaaaaaaaaaaaaaaaaaaaa", res.getField(f));
+        }
+        assertEquals(1, res.getFields().size());
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index cc76656..528efca 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -356,6 +356,17 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         }
     }
 
+    /**
+     * Compress the payload if compression is configured
+     * @param payload
+     * @return a new payload
+     */
+    private ByteBuf applyCompression(ByteBuf payload) {
+        ByteBuf compressedPayload = compressor.encode(payload);
+        payload.release();
+        return compressedPayload;
+    }
+
     public void sendAsync(Message<?> message, SendCallback callback) {
         checkArgument(message instanceof MessageImpl);
 
@@ -374,12 +385,12 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         // If compression is enabled, we are compressing, otherwise it will simply use the same buffer
         int uncompressedSize = payload.readableBytes();
         ByteBuf compressedPayload = payload;
+        boolean compressed = false;
         // Batch will be compressed when closed
         // If a message has a delayed delivery time, we'll always send it individually
         if (!isBatchMessagingEnabled() || msgMetadataBuilder.hasDeliverAtTime()) {
-            compressedPayload = compressor.encode(payload);
-            payload.release();
-
+            compressedPayload = applyCompression(payload);
+            compressed = true;
             // validate msg-size (For batching this will be check at the batch completion size)
             int compressedSize = compressedPayload.readableBytes();
             if (compressedSize > ClientCnx.getMaxMessageSize() && !this.conf.isChunkingEnabled()) {
@@ -433,7 +444,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                 String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
                 for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
                     serializeAndSendMessage(msg, msgMetadataBuilder, payload, sequenceId, uuid, chunkId, totalChunks,
-                            readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload,
+                            readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload, compressed,
                             compressedPayload.readableBytes(), uncompressedSize, callback);
                     readStartIndex = ((chunkId + 1) * ClientCnx.getMaxMessageSize());
                 }
@@ -448,7 +459,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
     private void serializeAndSendMessage(MessageImpl<?> msg, Builder msgMetadataBuilder, ByteBuf payload,
             long sequenceId, String uuid, int chunkId, int totalChunks, int readStartIndex, int chunkMaxSizeInBytes, ByteBuf compressedPayload,
-            int compressedPayloadSize,
+            boolean compressed, int compressedPayloadSize,
             int uncompressedSize, SendCallback callback) throws IOException, InterruptedException {
         ByteBuf chunkPayload = compressedPayload;
         Builder chunkMsgMetadataBuilder = msgMetadataBuilder;
@@ -516,7 +527,12 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                 doBatchSendAndAdd(msg, callback, payload);
             }
         } else {
-            ByteBuf encryptedPayload = encryptMessage(chunkMsgMetadataBuilder, chunkPayload);
+            // in this case compression has not been applied by the caller
+            // but we have to compress the payload if compression is configured
+            if (!compressed) {
+                chunkPayload = applyCompression(chunkPayload);
+            }
+            ByteBuf encryptedPayload = encryptMessage(msgMetadataBuilder, chunkPayload);
 
             MessageMetadata msgMetadata = chunkMsgMetadataBuilder.build();
             // When publishing during replication, we need to set the correct number of message in batch


[pulsar] 04/09: [go-functions] fix metrics server handler error (#9394)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6b9df95068bca5977c1c7626906d9f482a4781cf
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Tue Feb 2 17:52:23 2021 +0800

    [go-functions] fix metrics server handler error (#9394)
    
    ### Motivation
    
    #9318 add metrics server to go function, but didnt serve `"/"` endpoint in metrics server, which will cause some metrics calls failed.
    
    ### Modifications
    
    add handler for `"/"` in `MetricsServicer`
    
    ### Verifying this change
    
    - [x] Make sure that the change passes the CI checks.
    
    (cherry picked from commit c99e1a0ec93a660a12b68ad7a529e951604be58e)
---
 pulsar-function-go/pf/stats.go      |  6 ++++--
 pulsar-function-go/pf/stats_test.go | 14 +++++++++++++-
 2 files changed, 17 insertions(+), 3 deletions(-)

diff --git a/pulsar-function-go/pf/stats.go b/pulsar-function-go/pf/stats.go
index 3aac194..8159cac 100644
--- a/pulsar-function-go/pf/stats.go
+++ b/pulsar-function-go/pf/stats.go
@@ -316,12 +316,14 @@ func (stat *StatWithLabelValues) reset() {
 
 func NewMetricsServicer(goInstance *goInstance) *MetricsServicer {
 	serveMux := http.NewServeMux()
-	serveMux.Handle("/metrics", promhttp.HandlerFor(
+	pHandler := promhttp.HandlerFor(
 		reg,
 		promhttp.HandlerOpts{
 			EnableOpenMetrics: true,
 		},
-	))
+	)
+	serveMux.Handle("/", pHandler)
+	serveMux.Handle("/metrics", pHandler)
 	server := &http.Server{
 		Addr:    fmt.Sprintf(":%d", goInstance.context.GetMetricsPort()),
 		Handler: serveMux,
diff --git a/pulsar-function-go/pf/stats_test.go b/pulsar-function-go/pf/stats_test.go
index 09b93b9..3e38d10 100644
--- a/pulsar-function-go/pf/stats_test.go
+++ b/pulsar-function-go/pf/stats_test.go
@@ -25,6 +25,7 @@ import (
 	"math"
 	"net/http"
 	"testing"
+	"time"
 
 	"github.com/golang/protobuf/proto"
 	"github.com/prometheus/client_golang/prometheus"
@@ -193,12 +194,23 @@ func TestMetricsServer(t *testing.T) {
 	metricsServicer := NewMetricsServicer(gi)
 	metricsServicer.serve()
 	gi.stats.incrTotalReceived()
+	time.Sleep(time.Second * 1)
 
-	resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", gi.context.GetMetricsPort()))
+	resp, err := http.Get(fmt.Sprintf("http://localhost:%d/", gi.context.GetMetricsPort()))
 	assert.Equal(t, nil, err)
+	assert.NotEqual(t, nil, resp)
 	assert.Equal(t, 200, resp.StatusCode)
 	body, err := ioutil.ReadAll(resp.Body)
 	assert.Equal(t, nil, err)
 	assert.NotEmpty(t, body)
 	resp.Body.Close()
+
+	resp, err = http.Get(fmt.Sprintf("http://localhost:%d/metrics", gi.context.GetMetricsPort()))
+	assert.Equal(t, nil, err)
+	assert.NotEqual(t, nil, resp)
+	assert.Equal(t, 200, resp.StatusCode)
+	body, err = ioutil.ReadAll(resp.Body)
+	assert.Equal(t, nil, err)
+	assert.NotEmpty(t, body)
+	resp.Body.Close()
 }


[pulsar] 01/09: [#9177] add metrics server to go function (#9318)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 657f117ce86925fb43aa866e36ec2f96f3ad09ae
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Sat Jan 30 23:56:52 2021 +0800

    [#9177] add metrics server to go function (#9318)
    
    Fixes #9177
    
    go function added metrics collector by #6105, but havnt pass `metricsPort` to go function, also not init & start prometheus http server. As the result, function worker will keep trying to access to the metrics port to collect data, which will cause massive log errors in log history.
    
    - expose `metricsPort` to go function
    - add prometheus http server to go function
    
    - [x] Make sure that the change passes the CI checks.
    
    (cherry picked from commit 211a125387797620b22ed973cdab0a61ffec7510)
---
 pulsar-function-go/conf/conf.go                    |  2 +
 pulsar-function-go/conf/conf.yaml                  |  2 +
 pulsar-function-go/go.mod                          |  3 +-
 pulsar-function-go/go.sum                          |  6 +++
 pulsar-function-go/pf/context.go                   |  4 ++
 pulsar-function-go/pf/instance.go                  |  3 ++
 pulsar-function-go/pf/instanceConf.go              |  2 +
 pulsar-function-go/pf/instanceConf_test.go         |  1 +
 pulsar-function-go/pf/stats.go                     | 47 +++++++++++++++++++++-
 pulsar-function-go/pf/stats_test.go                | 18 +++++++++
 .../functions/instance/go/GoInstanceConfig.java    |  2 +
 .../pulsar/functions/runtime/RuntimeUtils.java     |  9 ++++-
 .../pulsar/functions/runtime/RuntimeUtilsTest.java |  3 +-
 .../runtime/kubernetes/KubernetesRuntimeTest.java  |  1 +
 14 files changed, 98 insertions(+), 5 deletions(-)

diff --git a/pulsar-function-go/conf/conf.go b/pulsar-function-go/conf/conf.go
index 3beeaf3..c0d1262 100644
--- a/pulsar-function-go/conf/conf.go
+++ b/pulsar-function-go/conf/conf.go
@@ -73,6 +73,8 @@ type Conf struct {
 	DeadLetterTopic             string `json:"deadLetterTopic" yaml:"deadLetterTopic"`
 	ExpectedHealthCheckInterval int32  `json:"expectedHealthCheckInterval" yaml:"expectedHealthCheckInterval"`
 	UserConfig                  string `json:"userConfig" yaml:"userConfig"`
+	//metrics config
+	MetricsPort int `json:"metricsPort" yaml:"metricsPort"`
 }
 
 var (
diff --git a/pulsar-function-go/conf/conf.yaml b/pulsar-function-go/conf/conf.yaml
index 8aecd57..7809ab5 100644
--- a/pulsar-function-go/conf/conf.yaml
+++ b/pulsar-function-go/conf/conf.yaml
@@ -57,3 +57,5 @@ disk: 0
 maxMessageRetries: 0
 deadLetterTopic: ""
 expectedHealthCheckInterval: 3
+# metrics config
+metricsPort: 50001
\ No newline at end of file
diff --git a/pulsar-function-go/go.mod b/pulsar-function-go/go.mod
index 0171f64..5608345 100644
--- a/pulsar-function-go/go.mod
+++ b/pulsar-function-go/go.mod
@@ -9,7 +9,8 @@ require (
 	github.com/prometheus/client_model v0.2.0
 	github.com/sirupsen/logrus v1.4.2
 	github.com/stretchr/testify v1.4.0
-	google.golang.org/grpc v1.26.0
+	google.golang.org/grpc v1.27.0
+	google.golang.org/protobuf v1.25.0
 	gopkg.in/yaml.v2 v2.3.0
 )
 
diff --git a/pulsar-function-go/go.sum b/pulsar-function-go/go.sum
index 5a6ecb2..501e4b2 100644
--- a/pulsar-function-go/go.sum
+++ b/pulsar-function-go/go.sum
@@ -14,6 +14,7 @@ github.com/apache/pulsar-client-go v0.2.0 h1:7teu0FaXzzKPjDdUNjA7dVYKFjCy6OVX5as
 github.com/apache/pulsar-client-go v0.2.0/go.mod h1:POSPPmXv1RuoM7FzHaS3NurCSOopwin2ekGK2PcOgVM=
 github.com/apache/pulsar-client-go/oauth2 v0.0.0-20200715083626-b9f8c5cedefb h1:E1P0FudxDdj2RhbveZC9i3PwukLCA/4XQSkBS/dw6/I=
 github.com/apache/pulsar-client-go/oauth2 v0.0.0-20200715083626-b9f8c5cedefb/go.mod h1:0UtvvETGDdvXNDCHa8ZQpxl+w3HbdFtfYZvDHLgWGTY=
+github.com/apache/pulsar-client-go/oauth2 v0.0.0-20201120111947-b8bd55bc02bd h1:P5kM7jcXJ7TaftX0/EMKiSJgvQc/ct+Fw0KMvcH3WuY=
 github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4=
 github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI=
 github.com/ardielle/ardielle-tools v1.5.4/go.mod h1:oZN+JRMnqGiIhrzkRN9l26Cej9dEx4jeNG6A+AdkShk=
@@ -30,6 +31,7 @@ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
 github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
 github.com/danieljoos/wincred v1.0.2 h1:zf4bhty2iLuwgjgpraD2E9UbvO+fe54XXGJbOwe23fU=
 github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3EJbmjhLdK9U=
+github.com/datadog/zstd v1.4.6-0.20200617134701-89f69fb7df32 h1:QWqadCIHYA5zja4b6h9uGQn93u1vL+G/aewImumdg/M=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -69,6 +71,7 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W
 github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
 github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
 github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
 github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
 github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
 github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
@@ -104,6 +107,7 @@ github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn
 github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
 github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/linkedin/goavro/v2 v2.9.8 h1:jN50elxBsGBDGVDEKqUlDuU1cFwJ11K/yrJCBMe/7Wg=
 github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
 github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
 github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
@@ -243,10 +247,12 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
 google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
 google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE=
 google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
+google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY=
 google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
 google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
 google.golang.org/grpc v1.26.0 h1:2dTRdpdFEEhJYQD8EMLB61nnrzSCTbG38PhqdhvOltg=
 google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
+google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg=
 google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
 google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
 google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
diff --git a/pulsar-function-go/pf/context.go b/pulsar-function-go/pf/context.go
index 7b13c88..599b2eb 100644
--- a/pulsar-function-go/pf/context.go
+++ b/pulsar-function-go/pf/context.go
@@ -167,7 +167,11 @@ func (c *FunctionContext) SetCurrentRecord(record pulsar.Message) {
 // GetCurrentRecord gets the current message from the function context
 func (c *FunctionContext) GetCurrentRecord() pulsar.Message {
 	return c.record
+}
 
+//GetMetricsPort returns the port the pulsar function metrics listen on
+func (c *FunctionContext) GetMetricsPort() int {
+	return c.instanceConf.metricsPort
 }
 
 // An unexported type to be used as the key for types in this package. This
diff --git a/pulsar-function-go/pf/instance.go b/pulsar-function-go/pf/instance.go
index 029272f..afbf299 100644
--- a/pulsar-function-go/pf/instance.go
+++ b/pulsar-function-go/pf/instance.go
@@ -144,6 +144,9 @@ func (gi *goInstance) startFunction(function function) error {
 	servicer := InstanceControlServicer{goInstance: gi}
 	servicer.serve(gi)
 
+	metricsServicer := NewMetricsServicer(gi)
+	metricsServicer.serve()
+	defer metricsServicer.close()
 CLOSE:
 	for {
 		idleTimer.Reset(idleDuration)
diff --git a/pulsar-function-go/pf/instanceConf.go b/pulsar-function-go/pf/instanceConf.go
index 72cf778..1227024 100644
--- a/pulsar-function-go/pf/instanceConf.go
+++ b/pulsar-function-go/pf/instanceConf.go
@@ -40,6 +40,7 @@ type instanceConf struct {
 	pulsarServiceURL            string
 	killAfterIdle               time.Duration
 	expectedHealthCheckInterval int32
+	metricsPort                 int
 }
 
 func newInstanceConf() *instanceConf {
@@ -59,6 +60,7 @@ func newInstanceConf() *instanceConf {
 		pulsarServiceURL:            cfg.PulsarServiceURL,
 		killAfterIdle:               cfg.KillAfterIdleMs,
 		expectedHealthCheckInterval: cfg.ExpectedHealthCheckInterval,
+		metricsPort:                 cfg.MetricsPort,
 		funcDetails: pb.FunctionDetails{
 			Tenant:               cfg.Tenant,
 			Namespace:            cfg.NameSpace,
diff --git a/pulsar-function-go/pf/instanceConf_test.go b/pulsar-function-go/pf/instanceConf_test.go
index 5b386f3..be93239 100644
--- a/pulsar-function-go/pf/instanceConf_test.go
+++ b/pulsar-function-go/pf/instanceConf_test.go
@@ -39,6 +39,7 @@ func Test_newInstanceConf(t *testing.T) {
 			pulsarServiceURL:            "pulsar://localhost:6650",
 			killAfterIdle:               50000,
 			expectedHealthCheckInterval: 3,
+			metricsPort:                 50001,
 			funcDetails: pb.FunctionDetails{Tenant: "",
 				Namespace:            "",
 				Name:                 "go-function",
diff --git a/pulsar-function-go/pf/stats.go b/pulsar-function-go/pf/stats.go
index f7952fb..3aac194 100644
--- a/pulsar-function-go/pf/stats.go
+++ b/pulsar-function-go/pf/stats.go
@@ -20,10 +20,14 @@
 package pf
 
 import (
+	"fmt"
+	"net/http"
 	"time"
 
-	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promhttp"
 
+	log "github.com/apache/pulsar/pulsar-function-go/logutil"
+	"github.com/prometheus/client_golang/prometheus"
 	prometheus_client "github.com/prometheus/client_model/go"
 )
 
@@ -120,6 +124,11 @@ var (
 			Help: "Exception from system code."}, exceptionMetricsLabelNames)
 )
 
+type MetricsServicer struct {
+	goInstance *goInstance
+	server     *http.Server
+}
+
 var reg *prometheus.Registry
 
 func init() {
@@ -304,3 +313,39 @@ func (stat *StatWithLabelValues) reset() {
 	stat.statTotalSysExceptions1min.Set(0.0)
 	stat.statTotalReceived1min.Set(0.0)
 }
+
+func NewMetricsServicer(goInstance *goInstance) *MetricsServicer {
+	serveMux := http.NewServeMux()
+	serveMux.Handle("/metrics", promhttp.HandlerFor(
+		reg,
+		promhttp.HandlerOpts{
+			EnableOpenMetrics: true,
+		},
+	))
+	server := &http.Server{
+		Addr:    fmt.Sprintf(":%d", goInstance.context.GetMetricsPort()),
+		Handler: serveMux,
+	}
+	return &MetricsServicer{
+		goInstance,
+		server,
+	}
+}
+
+func (s *MetricsServicer) serve() {
+	go func() {
+		// create a listener on metrics port
+		log.Infof("Starting metrics server on port %d", s.goInstance.context.GetMetricsPort())
+		err := s.server.ListenAndServe()
+		if err != nil {
+			log.Fatalf("failed to start metrics server: %v", err)
+		}
+	}()
+}
+
+func (s *MetricsServicer) close() {
+	err := s.server.Close()
+	if err != nil {
+		log.Fatalf("failed to close metrics server: %v", err)
+	}
+}
diff --git a/pulsar-function-go/pf/stats_test.go b/pulsar-function-go/pf/stats_test.go
index f4e3bb2..09b93b9 100644
--- a/pulsar-function-go/pf/stats_test.go
+++ b/pulsar-function-go/pf/stats_test.go
@@ -20,7 +20,10 @@
 package pf
 
 import (
+	"fmt"
+	"io/ioutil"
 	"math"
+	"net/http"
 	"testing"
 
 	"github.com/golang/protobuf/proto"
@@ -184,3 +187,18 @@ func TestExampleSummaryVec_Pulsar(t *testing.T) {
 	assert.Equal(t, 61925, int(*sum))
 	assert.Equal(t, 2000, int(*count))
 }
+
+func TestMetricsServer(t *testing.T) {
+	gi := newGoInstance()
+	metricsServicer := NewMetricsServicer(gi)
+	metricsServicer.serve()
+	gi.stats.incrTotalReceived()
+
+	resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", gi.context.GetMetricsPort()))
+	assert.Equal(t, nil, err)
+	assert.Equal(t, 200, resp.StatusCode)
+	body, err := ioutil.ReadAll(resp.Body)
+	assert.Equal(t, nil, err)
+	assert.NotEmpty(t, body)
+	resp.Body.Close()
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
index 710b4fd..0cb1de2 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
@@ -65,4 +65,6 @@ public class GoInstanceConfig {
 
     private int maxMessageRetries;
     private String deadLetterTopic = "";
+
+    private int metricsPort;
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 56ddaee..db41eb6 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -119,7 +119,8 @@ public class RuntimeUtils {
     public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
                                                 String originalCodeFileName,
                                                 String pulsarServiceUrl,
-                                                boolean k8sRuntime) throws IOException {
+                                                boolean k8sRuntime,
+                                                int metricsPort) throws IOException {
         final List<String> args = new LinkedList<>();
         GoInstanceConfig goInstanceConfig = new GoInstanceConfig();
 
@@ -219,6 +220,10 @@ public class RuntimeUtils {
             goInstanceConfig.setMaxMessageRetries(instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
         }
 
+        if (metricsPort > 0 && metricsPort < 65536) {
+            goInstanceConfig.setMetricsPort(metricsPort);
+        }
+
         goInstanceConfig.setKillAfterIdleMs(0);
         goInstanceConfig.setPort(instanceConfig.getPort());
 
@@ -261,7 +266,7 @@ public class RuntimeUtils {
         final List<String> args = new LinkedList<>();
 
         if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.GO) {
-            return getGoInstanceCmd(instanceConfig, originalCodeFileName, pulsarServiceUrl, k8sRuntime);
+            return getGoInstanceCmd(instanceConfig, originalCodeFileName, pulsarServiceUrl, k8sRuntime, metricsPort);
         }
 
         if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
index 2212db6..72dd61b 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
@@ -108,7 +108,7 @@ public class RuntimeUtilsTest {
 
         instanceConfig.setFunctionDetails(functionDetails);
 
-        List<String> commands = RuntimeUtils.getGoInstanceCmd(instanceConfig, "config", "pulsar://localhost:6650", k8sRuntime);
+        List<String> commands = RuntimeUtils.getGoInstanceCmd(instanceConfig, "config", "pulsar://localhost:6650", k8sRuntime, 60000);
         if (k8sRuntime) {
             goInstanceConfig = new ObjectMapper().readValue(commands.get(2).replaceAll("^\'|\'$", ""), HashMap.class);
         } else {
@@ -151,6 +151,7 @@ public class RuntimeUtilsTest {
         Assert.assertEquals(goInstanceConfig.get("expectedHealthCheckInterval"), 0);
         Assert.assertEquals(goInstanceConfig.get("deadLetterTopic"), "go-func-deadletter");
         Assert.assertEquals(goInstanceConfig.get("userConfig"), userConfig.toString());
+        Assert.assertEquals(goInstanceConfig.get("metricsPort"), 60000);
     }
 
     @DataProvider(name = "k8sRuntime")
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index cf59b26..a360023 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -831,6 +831,7 @@ public class KubernetesRuntimeTest {
         assertEquals(goInstanceConfig.get("name"), TEST_NAME);
         assertEquals(goInstanceConfig.get("expectedHealthCheckInterval"), 0);
         assertEquals(goInstanceConfig.get("deadLetterTopic"), "");
+        assertEquals(goInstanceConfig.get("metricsPort"), 4331);
 
         // check padding and xmx
         V1Container containerSpec = container.getFunctionContainer(Collections.emptyList(), RESOURCES);


[pulsar] 02/09: Fix setting backlogQuota will always succeed (#9382)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5fc5f65b730557ee8ecaef5f1eb0e5ad4a4f28a9
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Mon Feb 1 18:00:52 2021 +0800

    Fix setting backlogQuota will always succeed (#9382)
    
    Since SetBacklogQuota does not have a return, it can actually be updated successfully even if there is a check error.
    
    (cherry picked from commit ebd70de635f5a5fc10e71210df3e67333dece7f3)
---
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 23 ++++++++++++++++++++++
 1 file changed, 23 insertions(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index cf7353d..63d382b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -199,6 +199,29 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testCheckBacklogQuotaFailed() throws Exception {
+        RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10);
+        String namespace = TopicName.get(testTopic).getNamespace();
+        admin.namespaces().setRetention(namespace, retentionPolicies);
+
+        Awaitility.await().atMost(3, TimeUnit.SECONDS)
+                .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getRetention(namespace), retentionPolicies));
+
+        BacklogQuota backlogQuota =
+                new BacklogQuota(10 * 1024 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
+        try {
+            admin.topics().setBacklogQuota(testTopic, backlogQuota);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 412);
+        }
+        //Ensure that the cache has not been updated after a long time
+        Awaitility.await().atLeast(1, TimeUnit.SECONDS);
+        assertNull(admin.topics().getBacklogQuotaMap(testTopic)
+                .get(BacklogQuota.BacklogQuotaType.destination_storage));
+    }
+
+    @Test
     public void testCheckRetention() throws Exception {
         BacklogQuota backlogQuota =
                 new BacklogQuota(10 * 1024 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);


[pulsar] 07/09: Skip clear delayed messages while dispatch does not init (#9378)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dfef57de476e2891457b50c60ed6fc85e226d554
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Feb 3 11:48:21 2021 +0800

    Skip clear delayed messages while dispatch does not init (#9378)
    
    Fixes #9032
    
    Skip clear delayed messages while dispatch does not init
    
    New test added
    
    (cherry picked from commit 2b8615c596cfc2c46da64681ab76d43cfa8b6b2a)
---
 .../pulsar/broker/service/persistent/PersistentSubscription.java | 4 +++-
 .../test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java  | 9 ++++++++-
 2 files changed, 11 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 2c87f02..da910a8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -507,7 +507,9 @@ public class PersistentSubscription implements Subscription {
                     log.debug("[{}][{}] Backlog size after clearing: {}", topicName, subName,
                             cursor.getNumberOfEntriesInBacklog(false));
                 }
-                dispatcher.clearDelayedMessages();
+                if (dispatcher != null) {
+                    dispatcher.clearDelayedMessages();
+                }
                 future.complete(null);
             }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 70d2542..5c177da 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -1658,5 +1658,12 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
         }
     }
 
-
+    @Test
+    public void testClearBacklogForTheSubscriptionThatNoConsumers() throws Exception {
+        final String topic = "persistent://prop-xyz/ns1/clear_backlog_no_consumers" + UUID.randomUUID().toString();
+        final String sub = "my-sub";
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topics().createSubscription(topic, sub, MessageId.earliest);
+        admin.topics().skipAllMessages(topic, sub);
+    }
 }


[pulsar] 05/09: [tiered-storage] Allow AWS credentials to be refreshed (#9387)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 732c8393ecc92a0ea5d8eb7d4870116132f29441
Author: Addison Higham <ad...@gmail.com>
AuthorDate: Mon Feb 1 11:41:49 2021 -0700

    [tiered-storage] Allow AWS credentials to be refreshed (#9387)
    
    With the refactor of support azure, a regression occured where the AWS
    credentials were fetched once and then used through the entire process.
    
    This is a problem in AWS, where it is commonplace to use credentials
    that expire.
    
    The AWS credential provider chain takes care of this
    problem, but when intgrating with JClouds, that means we need the
    credential Supplier to return a new set of credentials each time.
    
    Luckily, AWS should intelligently cache this so we aren't thrashing the
    underlying credential mechanisms.
    
    This also adds a test to ensure this isn't broken in the future, it does a simple validation to ensure that the underlying credentials can change via AWS SystemPropertyCredentialProvider
    
    (cherry picked from commit 562b2e76df37113edb41bb4393afd4b6aaa2dc21)
---
 .../jcloud/provider/JCloudBlobStoreProvider.java   | 50 ++++++++++++--------
 .../provider/TieredStorageConfigurationTests.java  | 54 +++++++++++++---------
 2 files changed, 62 insertions(+), 42 deletions(-)

diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
index 9d0871e..5c5f621 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
@@ -23,6 +23,7 @@ import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorag
 import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_ROLE_SESSION_NAME_FIELD;
 
 import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.AWSSessionCredentials;
 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
 import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
@@ -279,37 +280,46 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
 
     static final CredentialBuilder AWS_CREDENTIAL_BUILDER = (TieredStorageConfiguration config) -> {
         if (config.getCredentials() == null) {
-            AWSCredentials awsCredentials = null;
+            final AWSCredentialsProvider authChain;
             try {
                 if (Strings.isNullOrEmpty(config.getConfigProperty(S3_ROLE_FIELD))) {
-                    awsCredentials = DefaultAWSCredentialsProviderChain.getInstance().getCredentials();
+                    authChain = DefaultAWSCredentialsProviderChain.getInstance();
                 } else {
-                    awsCredentials =
+                    authChain =
                             new STSAssumeRoleSessionCredentialsProvider.Builder(
                                     config.getConfigProperty(S3_ROLE_FIELD),
                                     config.getConfigProperty(S3_ROLE_SESSION_NAME_FIELD)
-                            ).build().getCredentials();
-                }
-
-                if (awsCredentials instanceof AWSSessionCredentials) {
-                    // if we have session credentials, we need to send the session token
-                    // this allows us to support EC2 metadata credentials
-                    SessionCredentials sessionCredentials =  SessionCredentials.builder()
-                            .accessKeyId(awsCredentials.getAWSAccessKeyId())
-                            .secretAccessKey(awsCredentials.getAWSSecretKey())
-                            .sessionToken(((AWSSessionCredentials) awsCredentials).getSessionToken())
-                            .build();
-                    config.setProviderCredentials(() -> sessionCredentials);
-                } else {
-                    Credentials credentials = new Credentials(
-                            awsCredentials.getAWSAccessKeyId(), awsCredentials.getAWSSecretKey());
-                    config.setProviderCredentials(() -> credentials);
+                            ).build();
                 }
 
+                // Important! Delay the building of actual credentials
+                // until later to support tokens that may be refreshed
+                // such as all session tokens
+                config.setProviderCredentials(() -> {
+                    AWSCredentials newCreds = authChain.getCredentials();
+                    Credentials jcloudCred = null;
+
+                    if (newCreds instanceof AWSSessionCredentials) {
+                        // if we have session credentials, we need to send the session token
+                        // this allows us to support EC2 metadata credentials
+                        jcloudCred = SessionCredentials.builder()
+                                .accessKeyId(newCreds.getAWSAccessKeyId())
+                                .secretAccessKey(newCreds.getAWSSecretKey())
+                                .sessionToken(((AWSSessionCredentials) newCreds).getSessionToken())
+                                .build();
+                    } else {
+                        // in the event we hit this branch, we likely don't have expiring
+                        // credentials, however, this still allows for the user to update
+                        // profiles creds or some other mechanism
+                        jcloudCred = new Credentials(
+                                newCreds.getAWSAccessKeyId(), newCreds.getAWSSecretKey());
+                    }
+                    return jcloudCred;
+                });
             } catch (Exception e) {
                 // allowed, some mock s3 service do not need credential
                 log.warn("Exception when get credentials for s3 ", e);
             }
         }
     };
-}
\ No newline at end of file
+}
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
index 41e6255..3a0c38c 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
@@ -23,7 +23,9 @@ import static org.testng.Assert.assertEquals;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.pulsar.jcloud.shade.com.google.common.base.Supplier;
 
+import org.jclouds.domain.Credentials;
 import org.testng.annotations.Test;
 
 public class TieredStorageConfigurationTests {
@@ -113,7 +115,36 @@ public class TieredStorageConfigurationTests {
         assertEquals(config.getReadBufferSizeInBytes(), new Integer(500));
         assertEquals(config.getServiceEndpoint(), "http://some-url:9093");
     }
-    
+
+    /**
+     * Confirm that with AWS we create different instances of the credentials
+     * object each time we call the supplier, this ensure that we get fresh credentials
+     * if the aws credential provider changes
+     */
+    @Test
+    public final void awsS3CredsProviderTest() {
+        Map<String, String> map = new HashMap<>();
+        map.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, JCloudBlobStoreProvider.AWS_S3.getDriver());
+        TieredStorageConfiguration config = new TieredStorageConfiguration(map);
+
+        // set the aws properties with fake creds so the defaultProviderChain works
+        System.setProperty("aws.accessKeyId", "fakeid1");
+        System.setProperty("aws.secretKey", "fakekey1");
+        Credentials creds1 = config.getProviderCredentials().get();
+        assertEquals(creds1.identity, "fakeid1");
+        assertEquals(creds1.credential, "fakekey1");
+
+        // reset the properties and ensure we get different values by re-evaluating the chain
+        System.setProperty("aws.accessKeyId", "fakeid2");
+        System.setProperty("aws.secretKey", "fakekey2");
+        Credentials creds2 = config.getProviderCredentials().get();
+        assertEquals(creds2.identity, "fakeid2");
+        assertEquals(creds2.credential, "fakekey2");
+
+        System.clearProperty("aws.accessKeyId");
+        System.clearProperty("aws.secretKey");
+    }
+
     /**
      * Confirm that both property options are available for GCS
      */
@@ -177,25 +208,4 @@ public class TieredStorageConfigurationTests {
         assertEquals(config.getMaxBlockSizeInBytes(), new Integer(12));
         assertEquals(config.getReadBufferSizeInBytes(), new Integer(500));
     }
-    
-    /**
-     * Confirm that we can configure AWS using the old properties
-     */
-    @Test
-    public final void s3BackwardCompatiblePropertiesTest() {
-        Map<String, String> map = new HashMap<String,String>(); 
-        map.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, JCloudBlobStoreProvider.AWS_S3.getDriver());
-        map.put(BC_S3_BUCKET, "test bucket");
-        map.put(BC_S3_ENDPOINT, "http://some-url:9093");
-        map.put(BC_S3_MAX_BLOCK_SIZE, "12");
-        map.put(BC_S3_READ_BUFFER_SIZE, "500");
-        map.put(BC_S3_REGION, "test region");
-        TieredStorageConfiguration config = new TieredStorageConfiguration(map);
-        
-        assertEquals(config.getRegion(), "test region");
-        assertEquals(config.getBucket(), "test bucket");
-        assertEquals(config.getMaxBlockSizeInBytes(), new Integer(12));
-        assertEquals(config.getReadBufferSizeInBytes(), new Integer(500));
-        assertEquals(config.getServiceEndpoint(), "http://some-url:9093");
-    }
 }


[pulsar] 06/09: [pulsar-broker]Add alerts for expired/expiring soon tokens (#9321)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5b7f99fbd1849413e98429ca45f9d4ebf36e41aa
Author: Zike Yang <Ro...@outlook.com>
AuthorDate: Wed Feb 3 01:47:51 2021 +0800

    [pulsar-broker]Add alerts for expired/expiring soon tokens (#9321)
    
    * Add expired token alert.
    
    * Add expiring token alert. Fix expired token metrics.
    
    * Fix testDuplicateMetricTypeDefinitions fail.
    
    (cherry picked from commit ca64811f40b78920e05f6f9ec67f2405bd8a12a0)
---
 .../AuthenticationProviderToken.java               |  30 +++++-
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 110 +++++++++++++++++++++
 2 files changed, 135 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
index f2e366f..d847548 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
@@ -24,10 +24,14 @@ import java.io.IOException;
 import java.net.SocketAddress;
 import java.security.Key;
 
+import java.util.Date;
 import java.util.List;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
 
+import io.jsonwebtoken.ExpiredJwtException;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Histogram;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics;
@@ -69,6 +73,16 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
 
     final static String TOKEN = "token";
 
+    private static final Counter expiredTokenMetrics = Counter.build()
+            .name("pulsar_expired_token_count")
+            .help("Pulsar expired token")
+            .register();
+    private static final Histogram expiringTokenMinutesMetrics = Histogram.build()
+            .name("pulsar_expiring_token_minutes")
+            .help("The remaining time of expiring token in minutes")
+            .buckets(5, 10, 60, 240)
+            .register();
+
     private Key validationKey;
     private String roleClaim;
     private SignatureAlgorithm publicKeyAlg;
@@ -121,18 +135,18 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
 
     @Override
     public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
-        // Get Token
-        String token;
         try {
+            // Get Token
+            String token;
             token = getToken(authData);
+            // Parse Token by validating
+            String role = getPrincipal(authenticateToken(token));
             AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), getAuthMethodName());
+            return role;
         } catch (AuthenticationException exception) {
             AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(), exception.getMessage());
             throw exception;
         }
-
-        // Parse Token by validating
-        return getPrincipal(authenticateToken(token));
     }
 
     @Override
@@ -200,8 +214,14 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
                 }
             }
 
+            if (jwt.getBody().getExpiration() != null) {
+                expiringTokenMinutesMetrics.observe((double) (jwt.getBody().getExpiration().getTime() - new Date().getTime()) / (60 * 1000));
+            }
             return jwt;
         } catch (JwtException e) {
+            if (e instanceof ExpiredJwtException) {
+                expiredTokenMetrics.inc();
+            }
             throw new AuthenticationException("Failed to authentication token: " + e.getMessage());
         }
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 58ed618..a266224 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -32,6 +32,7 @@ import java.io.InputStreamReader;
 import java.io.StringWriter;
 import java.lang.reflect.Field;
 import java.math.RoundingMode;
+import java.util.Date;
 import java.text.NumberFormat;
 import java.util.Arrays;
 import java.util.Collection;
@@ -550,6 +551,11 @@ public class PrometheusMetricsTest extends BrokerTestBase {
                     if (!typeDefs.containsKey(summaryMetricName)) {
                         fail("Metric " + metricName + " does not have a corresponding summary type definition");
                     }
+                } else if (metricName.endsWith("_bucket")) {
+                    String summaryMetricName = metricName.substring(0, metricName.indexOf("_bucket"));
+                    if (!typeDefs.containsKey(summaryMetricName)) {
+                        fail("Metric " + metricName + " does not have a corresponding summary type definition");
+                    }
                 } else {
                     fail("Metric " + metricName + " does not have a type definition");
                 }
@@ -771,6 +777,110 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         assertEquals(metric.tags.get("provider_name"), provider.getClass().getSimpleName());
     }
 
+    @Test
+    public void testExpiredTokenMetrics() throws Exception {
+        SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+        AuthenticationProviderToken provider = new AuthenticationProviderToken();
+
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey));
+
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setProperties(properties);
+        provider.initialize(conf);
+
+        Date expiredDate = new Date(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1));
+        String expiredToken = AuthTokenUtils.createToken(secretKey, "subject", Optional.of(expiredDate));
+
+        try {
+            provider.authenticate(new AuthenticationDataSource() {
+                @Override
+                public boolean hasDataFromCommand() {
+                    return true;
+                }
+
+                @Override
+                public String getCommandData() {
+                    return expiredToken;
+                }
+            });
+            fail("Should have failed");
+        } catch (AuthenticationException e) {
+            // expected, token was expired
+        }
+
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, false, false, statsOut);
+        String metricsStr = new String(statsOut.toByteArray());
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+        List<Metric> cm = (List<Metric>) metrics.get("pulsar_expired_token_count");
+        assertEquals(cm.size(), 1);
+
+        provider.close();
+    }
+
+    @Test
+    public void testExpiringTokenMetrics() throws Exception {
+        SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+        AuthenticationProviderToken provider = new AuthenticationProviderToken();
+
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey));
+
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setProperties(properties);
+        provider.initialize(conf);
+
+        int[] tokenRemainTime = new int[]{3, 7, 40, 100, 400};
+
+        for (int remainTime : tokenRemainTime) {
+            Date expiredDate = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(remainTime));
+            String expiringToken = AuthTokenUtils.createToken(secretKey, "subject", Optional.of(expiredDate));
+            provider.authenticate(new AuthenticationDataSource() {
+                @Override
+                public boolean hasDataFromCommand() {
+                    return true;
+                }
+
+                @Override
+                public String getCommandData() {
+                    return expiringToken;
+                }
+            });
+        }
+
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, false, false, statsOut);
+        String metricsStr = new String(statsOut.toByteArray());
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+        Metric countMetric = ((List<Metric>) metrics.get("pulsar_expiring_token_minutes_count")).get(0);
+        assertEquals(countMetric.value, tokenRemainTime.length);
+        List<Metric> cm = (List<Metric>) metrics.get("pulsar_expiring_token_minutes_bucket");
+        assertEquals(cm.size(), 5);
+        cm.forEach((e) -> {
+            switch (e.tags.get("le")) {
+                case "5.0":
+                    assertEquals(e.value, 1);
+                    break;
+                case "10.0":
+                    assertEquals(e.value, 2);
+                    break;
+                case "60.0":
+                    assertEquals(e.value, 3);
+                    break;
+                case "240.0":
+                    assertEquals(e.value, 4);
+                    break;
+                default:
+                    assertEquals(e.value, 5);
+                    break;
+            }
+        });
+        provider.close();
+    }
+
     /**
      * Hacky parsing of Prometheus text format. Sould be good enough for unit tests
      */


[pulsar] 08/09: fix narExtractionDirectory not set (#9319)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 297a1b9b9ce2c64586641f03a0a0c2a3a051de3c
Author: hangc0276 <ha...@163.com>
AuthorDate: Wed Feb 3 11:48:52 2021 +0800

    fix narExtractionDirectory not set (#9319)
    
    ### Motivation
    When extract nar jars, the `narExtractionDirectory` not set by broker.conf directly, it using default hard code value.
    
    
    ### Modification
    1. use the `narExtractionDirectory` passed by parameter.
    
    (cherry picked from commit fa41d02bebfd841767846240f3ae574047f118f0)
---
 .../src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java      | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
index bc6af79..0a890b0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
@@ -140,7 +140,7 @@ public class NarClassLoader extends URLClassLoader {
     public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars,
                                                 String narExtractionDirectory) throws IOException {
         return  NarClassLoader.getFromArchive(narPath, additionalJars, NarClassLoader.class.getClassLoader(),
-                                                NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR);
+                                                narExtractionDirectory);
     }
 
     public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars) throws IOException {