You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/09 08:00:52 UTC

[pulsar] branch branch-2.8 updated (7384b7a -> 91bdc48)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 7384b7a  [Functions] ConcurrentHashMap should be used for caching producers (#11820)
     new 720b1d5  [Functions] Support KEY_BASED batch builder for Java based functions and sources (#11706)
     new 3129c96  [Python] Handle py::call_method error without mutating internal state (#11840)
     new bc76d78  [pulsar-functions-go] sync to the latest function proto (#11853)
     new 7c6828a  Remove the uncorrect VisableTesting annotation in pulsar-client (#11784)
     new 0669247  Reduce redundant FLOW requests for non-durable multi-topics consumer (#11802)
     new 40ebefc   [pulsar-client-tools] fix packages tool parameter desc (#11809)
     new 6763969  Fix seek at batchIndex level receive duplicated messages (#11826)
     new c0bada6  [function] enable protobuf-native schema support for function (#11868)
     new 91bdc48  Fixed ZKSessionTest.testReacquireLocksAfterSessionLost (#11886)

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../broker/service/SubscriptionSeekTest.java       |  83 ++++
 .../client/api/NonDurableSubscriptionTest.java     |  57 ++-
 pulsar-client-cpp/python/custom_logger_test.py     |  54 +++
 pulsar-client-cpp/python/pulsar_test.py            |   5 -
 pulsar-client-cpp/python/src/config.cc             |  72 ++--
 pulsar-client-cpp/run-unit-tests.sh                |   9 +
 .../org/apache/pulsar/admin/cli/CmdPackages.java   |   2 +-
 .../org/apache/pulsar/client/impl/Backoff.java     |   1 -
 .../pulsar/client/impl/BatchMessageAcker.java      |   2 -
 .../org/apache/pulsar/client/impl/ClientCnx.java   |   2 -
 .../pulsar/client/impl/ConnectionHandler.java      |   5 -
 .../apache/pulsar/client/impl/ConnectionPool.java  |   1 -
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  17 +-
 .../client/impl/MultiTopicsConsumerImpl.java       |   1 -
 .../pulsar/client/impl/ProducerBuilderImpl.java    |   4 -
 .../pulsar/client/impl/PulsarClientImpl.java       |   1 -
 pulsar-function-go/pb/Function.pb.go               | 469 ++++++++++-----------
 pulsar-function-go/pb/InstanceCommunication.pb.go  |  68 +--
 pulsar-function-go/pb/Request.pb.go                |   2 +-
 pulsar-function-go/pb/doc.go                       |   4 +-
 .../pulsar/functions/instance/ContextImpl.java     |  21 +-
 .../functions/instance/JavaInstanceRunnable.java   |   1 +
 .../pulsar/functions/source/TopicSchema.java       |   4 +
 .../pulsar/functions/source/TopicSchemaTest.java   |  58 +++
 .../pulsar/functions/utils/SourceConfigUtils.java  |   7 +
 .../functions/utils/SourceConfigUtilsTest.java     |  31 ++
 .../coordination/impl/LockManagerImpl.java         |   3 +
 .../coordination/impl/ResourceLockImpl.java        |  23 +-
 .../org/apache/pulsar/metadata/ZKSessionTest.java  |   8 +-
 29 files changed, 667 insertions(+), 348 deletions(-)
 create mode 100644 pulsar-client-cpp/python/custom_logger_test.py
 create mode 100644 pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java

[pulsar] 04/09: Remove the uncorrect VisableTesting annotation in pulsar-client (#11784)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7c6828a577df2302d19c81d320c1d8b7afcc6d5d
Author: Shoothzj <sh...@gmail.com>
AuthorDate: Thu Sep 2 21:15:57 2021 +0800

    Remove the uncorrect VisableTesting annotation in pulsar-client (#11784)
    
    
    (cherry picked from commit 4147db88bc08eb3b2dab97a45af178fd9f4c7a6b)
---
 .../src/main/java/org/apache/pulsar/client/impl/Backoff.java         | 1 -
 .../main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java   | 2 --
 .../src/main/java/org/apache/pulsar/client/impl/ClientCnx.java       | 2 --
 .../main/java/org/apache/pulsar/client/impl/ConnectionHandler.java   | 5 -----
 .../src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java  | 1 -
 .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java    | 2 --
 .../java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java  | 1 -
 .../main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java | 4 ----
 .../main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java    | 1 -
 9 files changed, 19 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java
index 2a6e525..8f33599 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java
@@ -41,7 +41,6 @@ public class Backoff {
 
     private static final Random random = new Random();
 
-    @VisibleForTesting
     Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
             TimeUnit unitMandatoryStop, Clock clock) {
         this.initial = unitInitial.toMillis(initial);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
index 8b178f2..5f9e617 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.client.impl;
 
-import com.google.common.annotations.VisibleForTesting;
 import java.util.BitSet;
 
 class BatchMessageAcker {
@@ -49,7 +48,6 @@ class BatchMessageAcker {
         this.batchSize = batchSize;
     }
 
-    @VisibleForTesting
     BitSet getBitSet() {
         return bitSet;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index bed2b9c..f4d600b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
@@ -1120,7 +1119,6 @@ public class ClientCnx extends PulsarHandler {
         }
     }
 
-    @VisibleForTesting
     public void close() {
        if (ctx != null) {
            ctx.close();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 6b00a8f..8fb7ab4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -19,11 +19,9 @@
 package org.apache.pulsar.client.impl;
 
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.HandlerState.State;
 import org.slf4j.Logger;
@@ -117,7 +115,6 @@ public class ConnectionHandler {
         return EPOCH_UPDATER.incrementAndGet(this);
     }
 
-    @VisibleForTesting
     public void connectionClosed(ClientCnx cnx) {
         lastConnectionClosedTimestamp = System.currentTimeMillis();
         state.client.getCnxPool().releaseConnection(cnx);
@@ -142,7 +139,6 @@ public class ConnectionHandler {
         backoff.reset();
     }
 
-    @VisibleForTesting
     public ClientCnx cnx() {
         return CLIENT_CNX_UPDATER.get(this);
     }
@@ -170,7 +166,6 @@ public class ConnectionHandler {
         return false;
     }
 
-    @VisibleForTesting
     public long getEpoch() {
         return epoch;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index c7e4f58..8e28c87 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -272,7 +272,6 @@ public class ConnectionPool implements Closeable {
         return future;
     }
 
-    @VisibleForTesting
     CompletableFuture<List<InetAddress>> resolveName(String hostname) {
         CompletableFuture<List<InetAddress>> future = new CompletableFuture<>();
         dnsResolver.resolveAll(hostname).addListener((Future<List<InetAddress>> resolveFuture) -> {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index bc8f06b..37856aa 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.pulsar.common.protocol.Commands.hasChecksum;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Iterables;
 import com.scurrilous.circe.checksum.Crc32cIntChecksum;
@@ -2197,7 +2196,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         this.connectionHandler.connectionClosed(cnx);
     }
 
-    @VisibleForTesting
     public ClientCnx getClientCnx() {
         return this.connectionHandler.cnx();
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 7588c34..68bdf38 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -895,7 +895,6 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     }
 
     // subscribe one more given topic, but already know the numberPartitions
-    @VisibleForTesting
     CompletableFuture<Void> subscribeAsync(String topicName, int numberPartitions) {
         TopicName topicNameInstance = getTopicName(topicName);
         if (topicNameInstance == null) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 48f9f2e..d3a1bb1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.client.impl;
 
-import com.google.common.annotations.VisibleForTesting;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -45,7 +43,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
 import org.apache.pulsar.client.api.interceptor.ProducerInterceptorWrapper;
-import org.apache.pulsar.client.impl.DefaultCryptoKeyReader;
 import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -62,7 +59,6 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
     private Schema<T> schema;
     private List<ProducerInterceptor> interceptorList;
 
-    @VisibleForTesting
     public ProducerBuilderImpl(PulsarClientImpl client, Schema<T> schema) {
         this(client, new ProducerConfigurationData(), schema);
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index f951a88..b47f795 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -219,7 +219,6 @@ public class PulsarClientImpl implements PulsarClient {
         return conf;
     }
 
-    @VisibleForTesting
     public Clock getClientClock() {
         return clientClock;
     }

[pulsar] 03/09: [pulsar-functions-go] sync to the latest function proto (#11853)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bc76d787720bd7cefbcc67596d20a9b33e13bc10
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Wed Sep 1 22:02:50 2021 +0800

    [pulsar-functions-go] sync to the latest function proto (#11853)
    
    * update to the latest proto
    
    * fix license
    
    (cherry picked from commit f784379323bdbd60db2825f3e6e63730d290a8f9)
---
 pulsar-function-go/pb/Function.pb.go              | 469 +++++++++++-----------
 pulsar-function-go/pb/InstanceCommunication.pb.go |  68 ++--
 pulsar-function-go/pb/Request.pb.go               |   2 +-
 pulsar-function-go/pb/doc.go                      |   4 +-
 4 files changed, 271 insertions(+), 272 deletions(-)

diff --git a/pulsar-function-go/pb/Function.pb.go b/pulsar-function-go/pb/Function.pb.go
index 0b894c4..d137995 100644
--- a/pulsar-function-go/pb/Function.pb.go
+++ b/pulsar-function-go/pb/Function.pb.go
@@ -20,7 +20,7 @@
 // Code generated by protoc-gen-go. DO NOT EDIT.
 // versions:
 // 	protoc-gen-go v1.25.0
-// 	protoc        v3.13.0
+// 	protoc        v3.17.3
 // source: Function.proto
 
 package api
@@ -535,7 +535,6 @@ type FunctionDetails struct {
 	RetainOrdering       bool                 `protobuf:"varint,21,opt,name=retainOrdering,proto3" json:"retainOrdering,omitempty"`
 	RetainKeyOrdering    bool                 `protobuf:"varint,22,opt,name=retainKeyOrdering,proto3" json:"retainKeyOrdering,omitempty"`
 	SubscriptionPosition SubscriptionPosition `protobuf:"varint,23,opt,name=subscriptionPosition,proto3,enum=proto.SubscriptionPosition" json:"subscriptionPosition,omitempty"`
-	ExternalPulsarsMap   string               `protobuf:"bytes,24,opt,name=externalPulsarsMap,proto3" json:"externalPulsarsMap,omitempty"`
 }
 
 func (x *FunctionDetails) Reset() {
@@ -731,13 +730,6 @@ func (x *FunctionDetails) GetSubscriptionPosition() SubscriptionPosition {
 	return SubscriptionPosition_LATEST
 }
 
-func (x *FunctionDetails) GetExternalPulsarsMap() string {
-	if x != nil {
-		return x.ExternalPulsarsMap
-	}
-	return ""
-}
-
 type ConsumerSpec struct {
 	state         protoimpl.MessageState
 	sizeCache     protoimpl.SizeCache
@@ -750,6 +742,7 @@ type ConsumerSpec struct {
 	SchemaProperties   map[string]string               `protobuf:"bytes,5,rep,name=schemaProperties,proto3" json:"schemaProperties,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
 	ConsumerProperties map[string]string               `protobuf:"bytes,6,rep,name=consumerProperties,proto3" json:"consumerProperties,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
 	CryptoSpec         *CryptoSpec                     `protobuf:"bytes,7,opt,name=cryptoSpec,proto3" json:"cryptoSpec,omitempty"`
+	PoolMessages       bool                            `protobuf:"varint,8,opt,name=poolMessages,proto3" json:"poolMessages,omitempty"`
 }
 
 func (x *ConsumerSpec) Reset() {
@@ -833,6 +826,13 @@ func (x *ConsumerSpec) GetCryptoSpec() *CryptoSpec {
 	return nil
 }
 
+func (x *ConsumerSpec) GetPoolMessages() bool {
+	if x != nil {
+		return x.PoolMessages
+	}
+	return false
+}
+
 type ProducerSpec struct {
 	state         protoimpl.MessageState
 	sizeCache     protoimpl.SizeCache
@@ -1660,7 +1660,7 @@ var file_Function_proto_rawDesc = []byte{
 	0x61, 0x67, 0x65, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x73, 0x12, 0x28, 0x0a, 0x0f, 0x64, 0x65,
 	0x61, 0x64, 0x4c, 0x65, 0x74, 0x74, 0x65, 0x72, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, 0x20,
 	0x01, 0x28, 0x09, 0x52, 0x0f, 0x64, 0x65, 0x61, 0x64, 0x4c, 0x65, 0x74, 0x74, 0x65, 0x72, 0x54,
-	0x6f, 0x70, 0x69, 0x63, 0x22, 0xf5, 0x08, 0x0a, 0x0f, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f,
+	0x6f, 0x70, 0x69, 0x63, 0x22, 0xc5, 0x08, 0x0a, 0x0f, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f,
 	0x6e, 0x44, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x65, 0x6e, 0x61,
 	0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74,
 	0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x02, 0x20,
@@ -1722,16 +1722,13 @@ var file_Function_proto_rawDesc = []byte{
 	0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74,
 	0x69, 0x6f, 0x6e, 0x50, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x14, 0x73, 0x75, 0x62,
 	0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f,
-	0x6e, 0x12, 0x2e, 0x0a, 0x12, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x50, 0x75, 0x6c,
-	0x73, 0x61, 0x72, 0x73, 0x4d, 0x61, 0x70, 0x18, 0x18, 0x20, 0x01, 0x28, 0x09, 0x52, 0x12, 0x65,
-	0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x50, 0x75, 0x6c, 0x73, 0x61, 0x72, 0x73, 0x4d, 0x61,
-	0x70, 0x22, 0x27, 0x0a, 0x07, 0x52, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x12, 0x08, 0x0a, 0x04,
+	0x6e, 0x22, 0x27, 0x0a, 0x07, 0x52, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x12, 0x08, 0x0a, 0x04,
 	0x4a, 0x41, 0x56, 0x41, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x50, 0x59, 0x54, 0x48, 0x4f, 0x4e,
 	0x10, 0x01, 0x12, 0x06, 0x0a, 0x02, 0x47, 0x4f, 0x10, 0x03, 0x22, 0x40, 0x0a, 0x0d, 0x43, 0x6f,
 	0x6d, 0x70, 0x6f, 0x6e, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x55,
 	0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0c, 0x0a, 0x08, 0x46, 0x55, 0x4e, 0x43,
 	0x54, 0x49, 0x4f, 0x4e, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45,
-	0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x53, 0x49, 0x4e, 0x4b, 0x10, 0x03, 0x22, 0xf1, 0x04, 0x0a,
+	0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x53, 0x49, 0x4e, 0x4b, 0x10, 0x03, 0x22, 0x95, 0x05, 0x0a,
 	0x0c, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x53, 0x70, 0x65, 0x63, 0x12, 0x1e, 0x0a,
 	0x0a, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x54, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28,
 	0x09, 0x52, 0x0a, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x54, 0x79, 0x70, 0x65, 0x12, 0x26, 0x0a,
@@ -1759,231 +1756,233 @@ var file_Function_proto_rawDesc = []byte{
 	0x65, 0x72, 0x74, 0x69, 0x65, 0x73, 0x12, 0x31, 0x0a, 0x0a, 0x63, 0x72, 0x79, 0x70, 0x74, 0x6f,
 	0x53, 0x70, 0x65, 0x63, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x70, 0x72, 0x6f,
 	0x74, 0x6f, 0x2e, 0x43, 0x72, 0x79, 0x70, 0x74, 0x6f, 0x53, 0x70, 0x65, 0x63, 0x52, 0x0a, 0x63,
-	0x72, 0x79, 0x70, 0x74, 0x6f, 0x53, 0x70, 0x65, 0x63, 0x1a, 0x29, 0x0a, 0x11, 0x52, 0x65, 0x63,
-	0x65, 0x69, 0x76, 0x65, 0x72, 0x51, 0x75, 0x65, 0x75, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x14,
-	0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x76,
-	0x61, 0x6c, 0x75, 0x65, 0x1a, 0x43, 0x0a, 0x15, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x50, 0x72,
-	0x6f, 0x70, 0x65, 0x72, 0x74, 0x69, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a,
-	0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12,
-	0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05,
-	0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x1a, 0x45, 0x0a, 0x17, 0x43, 0x6f, 0x6e,
-	0x73, 0x75, 0x6d, 0x65, 0x72, 0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x69, 0x65, 0x73, 0x45,
-	0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28,
-	0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18,
-	0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01,
-	0x22, 0x9f, 0x02, 0x0a, 0x0c, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x72, 0x53, 0x70, 0x65,
-	0x63, 0x12, 0x2e, 0x0a, 0x12, 0x6d, 0x61, 0x78, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x4d,
-	0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x12, 0x6d,
-	0x61, 0x78, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
-	0x73, 0x12, 0x4e, 0x0a, 0x22, 0x6d, 0x61, 0x78, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x4d,
-	0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x41, 0x63, 0x72, 0x6f, 0x73, 0x73, 0x50, 0x61, 0x72,
-	0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x22, 0x6d,
-	0x61, 0x78, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
-	0x73, 0x41, 0x63, 0x72, 0x6f, 0x73, 0x73, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e,
-	0x73, 0x12, 0x38, 0x0a, 0x17, 0x75, 0x73, 0x65, 0x54, 0x68, 0x72, 0x65, 0x61, 0x64, 0x4c, 0x6f,
-	0x63, 0x61, 0x6c, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x72, 0x73, 0x18, 0x03, 0x20, 0x01,
-	0x28, 0x08, 0x52, 0x17, 0x75, 0x73, 0x65, 0x54, 0x68, 0x72, 0x65, 0x61, 0x64, 0x4c, 0x6f, 0x63,
-	0x61, 0x6c, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x72, 0x73, 0x12, 0x31, 0x0a, 0x0a, 0x63,
-	0x72, 0x79, 0x70, 0x74, 0x6f, 0x53, 0x70, 0x65, 0x63, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32,
-	0x11, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x72, 0x79, 0x70, 0x74, 0x6f, 0x53, 0x70,
-	0x65, 0x63, 0x52, 0x0a, 0x63, 0x72, 0x79, 0x70, 0x74, 0x6f, 0x53, 0x70, 0x65, 0x63, 0x12, 0x22,
-	0x0a, 0x0c, 0x62, 0x61, 0x74, 0x63, 0x68, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x65, 0x72, 0x18, 0x05,
-	0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x62, 0x61, 0x74, 0x63, 0x68, 0x42, 0x75, 0x69, 0x6c, 0x64,
-	0x65, 0x72, 0x22, 0xc1, 0x03, 0x0a, 0x0a, 0x43, 0x72, 0x79, 0x70, 0x74, 0x6f, 0x53, 0x70, 0x65,
-	0x63, 0x12, 0x3a, 0x0a, 0x18, 0x63, 0x72, 0x79, 0x70, 0x74, 0x6f, 0x4b, 0x65, 0x79, 0x52, 0x65,
-	0x61, 0x64, 0x65, 0x72, 0x43, 0x6c, 0x61, 0x73, 0x73, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20,
-	0x01, 0x28, 0x09, 0x52, 0x18, 0x63, 0x72, 0x79, 0x70, 0x74, 0x6f, 0x4b, 0x65, 0x79, 0x52, 0x65,
-	0x61, 0x64, 0x65, 0x72, 0x43, 0x6c, 0x61, 0x73, 0x73, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x34, 0x0a,
-	0x15, 0x63, 0x72, 0x79, 0x70, 0x74, 0x6f, 0x4b, 0x65, 0x79, 0x52, 0x65, 0x61, 0x64, 0x65, 0x72,
-	0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x15, 0x63, 0x72,
-	0x79, 0x70, 0x74, 0x6f, 0x4b, 0x65, 0x79, 0x52, 0x65, 0x61, 0x64, 0x65, 0x72, 0x43, 0x6f, 0x6e,
-	0x66, 0x69, 0x67, 0x12, 0x3c, 0x0a, 0x19, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x72, 0x45,
-	0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x4e, 0x61, 0x6d, 0x65,
-	0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x19, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x72,
-	0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x4e, 0x61, 0x6d,
-	0x65, 0x12, 0x61, 0x0a, 0x1b, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x72, 0x43, 0x72, 0x79,
-	0x70, 0x74, 0x6f, 0x46, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e,
-	0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43,
-	0x72, 0x79, 0x70, 0x74, 0x6f, 0x53, 0x70, 0x65, 0x63, 0x2e, 0x46, 0x61, 0x69, 0x6c, 0x75, 0x72,
-	0x65, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x1b, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65,
+	0x72, 0x79, 0x70, 0x74, 0x6f, 0x53, 0x70, 0x65, 0x63, 0x12, 0x22, 0x0a, 0x0c, 0x70, 0x6f, 0x6f,
+	0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x08, 0x20, 0x01, 0x28, 0x08, 0x52,
+	0x0c, 0x70, 0x6f, 0x6f, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x1a, 0x29, 0x0a,
+	0x11, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x72, 0x51, 0x75, 0x65, 0x75, 0x65, 0x53, 0x69,
+	0x7a, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28,
+	0x05, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x1a, 0x43, 0x0a, 0x15, 0x53, 0x63, 0x68, 0x65,
+	0x6d, 0x61, 0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x69, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72,
+	0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03,
+	0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01,
+	0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x1a, 0x45, 0x0a,
+	0x17, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74,
+	0x69, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18,
+	0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61,
+	0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65,
+	0x3a, 0x02, 0x38, 0x01, 0x22, 0x9f, 0x02, 0x0a, 0x0c, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65,
+	0x72, 0x53, 0x70, 0x65, 0x63, 0x12, 0x2e, 0x0a, 0x12, 0x6d, 0x61, 0x78, 0x50, 0x65, 0x6e, 0x64,
+	0x69, 0x6e, 0x67, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28,
+	0x05, 0x52, 0x12, 0x6d, 0x61, 0x78, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x4d, 0x65, 0x73,
+	0x73, 0x61, 0x67, 0x65, 0x73, 0x12, 0x4e, 0x0a, 0x22, 0x6d, 0x61, 0x78, 0x50, 0x65, 0x6e, 0x64,
+	0x69, 0x6e, 0x67, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x41, 0x63, 0x72, 0x6f, 0x73,
+	0x73, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28,
+	0x05, 0x52, 0x22, 0x6d, 0x61, 0x78, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x4d, 0x65, 0x73,
+	0x73, 0x61, 0x67, 0x65, 0x73, 0x41, 0x63, 0x72, 0x6f, 0x73, 0x73, 0x50, 0x61, 0x72, 0x74, 0x69,
+	0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x38, 0x0a, 0x17, 0x75, 0x73, 0x65, 0x54, 0x68, 0x72, 0x65,
+	0x61, 0x64, 0x4c, 0x6f, 0x63, 0x61, 0x6c, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x72, 0x73,
+	0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x17, 0x75, 0x73, 0x65, 0x54, 0x68, 0x72, 0x65, 0x61,
+	0x64, 0x4c, 0x6f, 0x63, 0x61, 0x6c, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x72, 0x73, 0x12,
+	0x31, 0x0a, 0x0a, 0x63, 0x72, 0x79, 0x70, 0x74, 0x6f, 0x53, 0x70, 0x65, 0x63, 0x18, 0x04, 0x20,
+	0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x72, 0x79, 0x70,
+	0x74, 0x6f, 0x53, 0x70, 0x65, 0x63, 0x52, 0x0a, 0x63, 0x72, 0x79, 0x70, 0x74, 0x6f, 0x53, 0x70,
+	0x65, 0x63, 0x12, 0x22, 0x0a, 0x0c, 0x62, 0x61, 0x74, 0x63, 0x68, 0x42, 0x75, 0x69, 0x6c, 0x64,
+	0x65, 0x72, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x62, 0x61, 0x74, 0x63, 0x68, 0x42,
+	0x75, 0x69, 0x6c, 0x64, 0x65, 0x72, 0x22, 0xc1, 0x03, 0x0a, 0x0a, 0x43, 0x72, 0x79, 0x70, 0x74,
+	0x6f, 0x53, 0x70, 0x65, 0x63, 0x12, 0x3a, 0x0a, 0x18, 0x63, 0x72, 0x79, 0x70, 0x74, 0x6f, 0x4b,
+	0x65, 0x79, 0x52, 0x65, 0x61, 0x64, 0x65, 0x72, 0x43, 0x6c, 0x61, 0x73, 0x73, 0x4e, 0x61, 0x6d,
+	0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x18, 0x63, 0x72, 0x79, 0x70, 0x74, 0x6f, 0x4b,
+	0x65, 0x79, 0x52, 0x65, 0x61, 0x64, 0x65, 0x72, 0x43, 0x6c, 0x61, 0x73, 0x73, 0x4e, 0x61, 0x6d,
+	0x65, 0x12, 0x34, 0x0a, 0x15, 0x63, 0x72, 0x79, 0x70, 0x74, 0x6f, 0x4b, 0x65, 0x79, 0x52, 0x65,
+	0x61, 0x64, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09,
+	0x52, 0x15, 0x63, 0x72, 0x79, 0x70, 0x74, 0x6f, 0x4b, 0x65, 0x79, 0x52, 0x65, 0x61, 0x64, 0x65,
+	0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3c, 0x0a, 0x19, 0x70, 0x72, 0x6f, 0x64, 0x75,
+	0x63, 0x65, 0x72, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79,
+	0x4e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x19, 0x70, 0x72, 0x6f, 0x64,
+	0x75, 0x63, 0x65, 0x72, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65,
+	0x79, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x61, 0x0a, 0x1b, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65,
 	0x72, 0x43, 0x72, 0x79, 0x70, 0x74, 0x6f, 0x46, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x41, 0x63,
-	0x74, 0x69, 0x6f, 0x6e, 0x12, 0x61, 0x0a, 0x1b, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72,
-	0x43, 0x72, 0x79, 0x70, 0x74, 0x6f, 0x46, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x41, 0x63, 0x74,
-	0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1f, 0x2e, 0x70, 0x72, 0x6f, 0x74,
-	0x6f, 0x2e, 0x43, 0x72, 0x79, 0x70, 0x74, 0x6f, 0x53, 0x70, 0x65, 0x63, 0x2e, 0x46, 0x61, 0x69,
-	0x6c, 0x75, 0x72, 0x65, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x1b, 0x63, 0x6f, 0x6e, 0x73,
+	0x74, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1f, 0x2e, 0x70, 0x72, 0x6f,
+	0x74, 0x6f, 0x2e, 0x43, 0x72, 0x79, 0x70, 0x74, 0x6f, 0x53, 0x70, 0x65, 0x63, 0x2e, 0x46, 0x61,
+	0x69, 0x6c, 0x75, 0x72, 0x65, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x1b, 0x70, 0x72, 0x6f,
+	0x64, 0x75, 0x63, 0x65, 0x72, 0x43, 0x72, 0x79, 0x70, 0x74, 0x6f, 0x46, 0x61, 0x69, 0x6c, 0x75,
+	0x72, 0x65, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x61, 0x0a, 0x1b, 0x63, 0x6f, 0x6e, 0x73,
 	0x75, 0x6d, 0x65, 0x72, 0x43, 0x72, 0x79, 0x70, 0x74, 0x6f, 0x46, 0x61, 0x69, 0x6c, 0x75, 0x72,
-	0x65, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x3d, 0x0a, 0x0d, 0x46, 0x61, 0x69, 0x6c, 0x75,
-	0x72, 0x65, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x08, 0x0a, 0x04, 0x46, 0x41, 0x49, 0x4c,
-	0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x44, 0x49, 0x53, 0x43, 0x41, 0x52, 0x44, 0x10, 0x01, 0x12,
-	0x0b, 0x0a, 0x07, 0x43, 0x4f, 0x4e, 0x53, 0x55, 0x4d, 0x45, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04,
-	0x53, 0x45, 0x4e, 0x44, 0x10, 0x0a, 0x22, 0xd1, 0x06, 0x0a, 0x0a, 0x53, 0x6f, 0x75, 0x72, 0x63,
-	0x65, 0x53, 0x70, 0x65, 0x63, 0x12, 0x1c, 0x0a, 0x09, 0x63, 0x6c, 0x61, 0x73, 0x73, 0x4e, 0x61,
-	0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x61, 0x73, 0x73, 0x4e,
-	0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x18, 0x02,
-	0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x12, 0x24, 0x0a,
-	0x0d, 0x74, 0x79, 0x70, 0x65, 0x43, 0x6c, 0x61, 0x73, 0x73, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x05,
-	0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x74, 0x79, 0x70, 0x65, 0x43, 0x6c, 0x61, 0x73, 0x73, 0x4e,
-	0x61, 0x6d, 0x65, 0x12, 0x43, 0x0a, 0x10, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74,
-	0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x17, 0x2e,
-	0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69,
-	0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x10, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70,
-	0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x69, 0x0a, 0x16, 0x74, 0x6f, 0x70, 0x69,
-	0x63, 0x73, 0x54, 0x6f, 0x53, 0x65, 0x72, 0x44, 0x65, 0x43, 0x6c, 0x61, 0x73, 0x73, 0x4e, 0x61,
-	0x6d, 0x65, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
-	0x2e, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x53, 0x70, 0x65, 0x63, 0x2e, 0x54, 0x6f, 0x70, 0x69,
-	0x63, 0x73, 0x54, 0x6f, 0x53, 0x65, 0x72, 0x44, 0x65, 0x43, 0x6c, 0x61, 0x73, 0x73, 0x4e, 0x61,
-	0x6d, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x42, 0x02, 0x18, 0x01, 0x52, 0x16, 0x74, 0x6f, 0x70,
-	0x69, 0x63, 0x73, 0x54, 0x6f, 0x53, 0x65, 0x72, 0x44, 0x65, 0x43, 0x6c, 0x61, 0x73, 0x73, 0x4e,
-	0x61, 0x6d, 0x65, 0x12, 0x41, 0x0a, 0x0a, 0x69, 0x6e, 0x70, 0x75, 0x74, 0x53, 0x70, 0x65, 0x63,
-	0x73, 0x18, 0x0a, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e,
-	0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x53, 0x70, 0x65, 0x63, 0x2e, 0x49, 0x6e, 0x70, 0x75, 0x74,
-	0x53, 0x70, 0x65, 0x63, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0a, 0x69, 0x6e, 0x70, 0x75,
-	0x74, 0x53, 0x70, 0x65, 0x63, 0x73, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75,
-	0x74, 0x4d, 0x73, 0x18, 0x06, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x6f,
-	0x75, 0x74, 0x4d, 0x73, 0x12, 0x28, 0x0a, 0x0d, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x50, 0x61,
-	0x74, 0x74, 0x65, 0x72, 0x6e, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x42, 0x02, 0x18, 0x01, 0x52,
-	0x0d, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x50, 0x61, 0x74, 0x74, 0x65, 0x72, 0x6e, 0x12, 0x18,
-	0x0a, 0x07, 0x62, 0x75, 0x69, 0x6c, 0x74, 0x69, 0x6e, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52,
-	0x07, 0x62, 0x75, 0x69, 0x6c, 0x74, 0x69, 0x6e, 0x12, 0x2a, 0x0a, 0x10, 0x73, 0x75, 0x62, 0x73,
-	0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x09, 0x20, 0x01,
-	0x28, 0x09, 0x52, 0x10, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e,
-	0x4e, 0x61, 0x6d, 0x65, 0x12, 0x30, 0x0a, 0x13, 0x63, 0x6c, 0x65, 0x61, 0x6e, 0x75, 0x70, 0x53,
-	0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x0b, 0x20, 0x01, 0x28,
-	0x08, 0x52, 0x13, 0x63, 0x6c, 0x65, 0x61, 0x6e, 0x75, 0x70, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72,
-	0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x4f, 0x0a, 0x14, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72,
-	0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x0c,
-	0x20, 0x01, 0x28, 0x0e, 0x32, 0x1b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x75, 0x62,
-	0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f,
-	0x6e, 0x52, 0x14, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x50,
-	0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x42, 0x0a, 0x1c, 0x6e, 0x65, 0x67, 0x61, 0x74,
-	0x69, 0x76, 0x65, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x64, 0x65, 0x6c, 0x69, 0x76, 0x65, 0x72, 0x79,
-	0x44, 0x65, 0x6c, 0x61, 0x79, 0x4d, 0x73, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x04, 0x52, 0x1c, 0x6e,
+	0x65, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1f, 0x2e,
+	0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x72, 0x79, 0x70, 0x74, 0x6f, 0x53, 0x70, 0x65, 0x63,
+	0x2e, 0x46, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x1b,
+	0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x43, 0x72, 0x79, 0x70, 0x74, 0x6f, 0x46, 0x61,
+	0x69, 0x6c, 0x75, 0x72, 0x65, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x3d, 0x0a, 0x0d, 0x46,
+	0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x08, 0x0a, 0x04,
+	0x46, 0x41, 0x49, 0x4c, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x44, 0x49, 0x53, 0x43, 0x41, 0x52,
+	0x44, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x43, 0x4f, 0x4e, 0x53, 0x55, 0x4d, 0x45, 0x10, 0x02,
+	0x12, 0x08, 0x0a, 0x04, 0x53, 0x45, 0x4e, 0x44, 0x10, 0x0a, 0x22, 0xd1, 0x06, 0x0a, 0x0a, 0x53,
+	0x6f, 0x75, 0x72, 0x63, 0x65, 0x53, 0x70, 0x65, 0x63, 0x12, 0x1c, 0x0a, 0x09, 0x63, 0x6c, 0x61,
+	0x73, 0x73, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c,
+	0x61, 0x73, 0x73, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x66, 0x69,
+	0x67, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67,
+	0x73, 0x12, 0x24, 0x0a, 0x0d, 0x74, 0x79, 0x70, 0x65, 0x43, 0x6c, 0x61, 0x73, 0x73, 0x4e, 0x61,
+	0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x74, 0x79, 0x70, 0x65, 0x43, 0x6c,
+	0x61, 0x73, 0x73, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x43, 0x0a, 0x10, 0x73, 0x75, 0x62, 0x73, 0x63,
+	0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28,
+	0x0e, 0x32, 0x17, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72,
+	0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x10, 0x73, 0x75, 0x62, 0x73,
+	0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x69, 0x0a, 0x16,
+	0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x54, 0x6f, 0x53, 0x65, 0x72, 0x44, 0x65, 0x43, 0x6c, 0x61,
+	0x73, 0x73, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2d, 0x2e, 0x70,
+	0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x53, 0x70, 0x65, 0x63, 0x2e,
+	0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x54, 0x6f, 0x53, 0x65, 0x72, 0x44, 0x65, 0x43, 0x6c, 0x61,
+	0x73, 0x73, 0x4e, 0x61, 0x6d, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x42, 0x02, 0x18, 0x01, 0x52,
+	0x16, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x54, 0x6f, 0x53, 0x65, 0x72, 0x44, 0x65, 0x43, 0x6c,
+	0x61, 0x73, 0x73, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x41, 0x0a, 0x0a, 0x69, 0x6e, 0x70, 0x75, 0x74,
+	0x53, 0x70, 0x65, 0x63, 0x73, 0x18, 0x0a, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x70, 0x72,
+	0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x53, 0x70, 0x65, 0x63, 0x2e, 0x49,
+	0x6e, 0x70, 0x75, 0x74, 0x53, 0x70, 0x65, 0x63, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0a,
+	0x69, 0x6e, 0x70, 0x75, 0x74, 0x53, 0x70, 0x65, 0x63, 0x73, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69,
+	0x6d, 0x65, 0x6f, 0x75, 0x74, 0x4d, 0x73, 0x18, 0x06, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x74,
+	0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x4d, 0x73, 0x12, 0x28, 0x0a, 0x0d, 0x74, 0x6f, 0x70, 0x69,
+	0x63, 0x73, 0x50, 0x61, 0x74, 0x74, 0x65, 0x72, 0x6e, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x42,
+	0x02, 0x18, 0x01, 0x52, 0x0d, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x50, 0x61, 0x74, 0x74, 0x65,
+	0x72, 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x62, 0x75, 0x69, 0x6c, 0x74, 0x69, 0x6e, 0x18, 0x08, 0x20,
+	0x01, 0x28, 0x09, 0x52, 0x07, 0x62, 0x75, 0x69, 0x6c, 0x74, 0x69, 0x6e, 0x12, 0x2a, 0x0a, 0x10,
+	0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x4e, 0x61, 0x6d, 0x65,
+	0x18, 0x09, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70,
+	0x74, 0x69, 0x6f, 0x6e, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x30, 0x0a, 0x13, 0x63, 0x6c, 0x65, 0x61,
+	0x6e, 0x75, 0x70, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x18,
+	0x0b, 0x20, 0x01, 0x28, 0x08, 0x52, 0x13, 0x63, 0x6c, 0x65, 0x61, 0x6e, 0x75, 0x70, 0x53, 0x75,
+	0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x4f, 0x0a, 0x14, 0x73, 0x75,
+	0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x6f, 0x73, 0x69, 0x74, 0x69,
+	0x6f, 0x6e, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
+	0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x6f, 0x73,
+	0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x14, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74,
+	0x69, 0x6f, 0x6e, 0x50, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x42, 0x0a, 0x1c, 0x6e,
 	0x65, 0x67, 0x61, 0x74, 0x69, 0x76, 0x65, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x64, 0x65, 0x6c, 0x69,
-	0x76, 0x65, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x4d, 0x73, 0x1a, 0x49, 0x0a, 0x1b, 0x54,
-	0x6f, 0x70, 0x69, 0x63, 0x73, 0x54, 0x6f, 0x53, 0x65, 0x72, 0x44, 0x65, 0x43, 0x6c, 0x61, 0x73,
-	0x73, 0x4e, 0x61, 0x6d, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65,
-	0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05,
-	0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c,
-	0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x1a, 0x52, 0x0a, 0x0f, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x53,
-	0x70, 0x65, 0x63, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79,
-	0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x29, 0x0a, 0x05, 0x76,
-	0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x70, 0x72, 0x6f,
-	0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x53, 0x70, 0x65, 0x63, 0x52,
-	0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x95, 0x05, 0x0a, 0x08, 0x53,
-	0x69, 0x6e, 0x6b, 0x53, 0x70, 0x65, 0x63, 0x12, 0x1c, 0x0a, 0x09, 0x63, 0x6c, 0x61, 0x73, 0x73,
-	0x4e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x61, 0x73,
-	0x73, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73,
-	0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x12,
-	0x24, 0x0a, 0x0d, 0x74, 0x79, 0x70, 0x65, 0x43, 0x6c, 0x61, 0x73, 0x73, 0x4e, 0x61, 0x6d, 0x65,
-	0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x74, 0x79, 0x70, 0x65, 0x43, 0x6c, 0x61, 0x73,
-	0x73, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x03,
-	0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x37, 0x0a, 0x0c, 0x70,
-	0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x72, 0x53, 0x70, 0x65, 0x63, 0x18, 0x0b, 0x20, 0x01, 0x28,
-	0x0b, 0x32, 0x13, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63,
-	0x65, 0x72, 0x53, 0x70, 0x65, 0x63, 0x52, 0x0c, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x72,
-	0x53, 0x70, 0x65, 0x63, 0x12, 0x26, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x44, 0x65, 0x43, 0x6c, 0x61,
-	0x73, 0x73, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x73, 0x65,
-	0x72, 0x44, 0x65, 0x43, 0x6c, 0x61, 0x73, 0x73, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07,
-	0x62, 0x75, 0x69, 0x6c, 0x74, 0x69, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x62,
-	0x75, 0x69, 0x6c, 0x74, 0x69, 0x6e, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61,
-	0x54, 0x79, 0x70, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x63, 0x68, 0x65,
-	0x6d, 0x61, 0x54, 0x79, 0x70, 0x65, 0x12, 0x42, 0x0a, 0x1c, 0x66, 0x6f, 0x72, 0x77, 0x61, 0x72,
-	0x64, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x50, 0x72,
-	0x6f, 0x70, 0x65, 0x72, 0x74, 0x79, 0x18, 0x08, 0x20, 0x01, 0x28, 0x08, 0x52, 0x1c, 0x66, 0x6f,
+	0x76, 0x65, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x4d, 0x73, 0x18, 0x0d, 0x20, 0x01, 0x28,
+	0x04, 0x52, 0x1c, 0x6e, 0x65, 0x67, 0x61, 0x74, 0x69, 0x76, 0x65, 0x41, 0x63, 0x6b, 0x52, 0x65,
+	0x64, 0x65, 0x6c, 0x69, 0x76, 0x65, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x4d, 0x73, 0x1a,
+	0x49, 0x0a, 0x1b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x54, 0x6f, 0x53, 0x65, 0x72, 0x44, 0x65,
+	0x43, 0x6c, 0x61, 0x73, 0x73, 0x4e, 0x61, 0x6d, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10,
+	0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79,
+	0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
+	0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x1a, 0x52, 0x0a, 0x0f, 0x49, 0x6e,
+	0x70, 0x75, 0x74, 0x53, 0x70, 0x65, 0x63, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a,
+	0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12,
+	0x29, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13,
+	0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x53,
+	0x70, 0x65, 0x63, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x95,
+	0x05, 0x0a, 0x08, 0x53, 0x69, 0x6e, 0x6b, 0x53, 0x70, 0x65, 0x63, 0x12, 0x1c, 0x0a, 0x09, 0x63,
+	0x6c, 0x61, 0x73, 0x73, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09,
+	0x63, 0x6c, 0x61, 0x73, 0x73, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6e,
+	0x66, 0x69, 0x67, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6e, 0x66,
+	0x69, 0x67, 0x73, 0x12, 0x24, 0x0a, 0x0d, 0x74, 0x79, 0x70, 0x65, 0x43, 0x6c, 0x61, 0x73, 0x73,
+	0x4e, 0x61, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x74, 0x79, 0x70, 0x65,
+	0x43, 0x6c, 0x61, 0x73, 0x73, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70,
+	0x69, 0x63, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12,
+	0x37, 0x0a, 0x0c, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x72, 0x53, 0x70, 0x65, 0x63, 0x18,
+	0x0b, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x72,
+	0x6f, 0x64, 0x75, 0x63, 0x65, 0x72, 0x53, 0x70, 0x65, 0x63, 0x52, 0x0c, 0x70, 0x72, 0x6f, 0x64,
+	0x75, 0x63, 0x65, 0x72, 0x53, 0x70, 0x65, 0x63, 0x12, 0x26, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x44,
+	0x65, 0x43, 0x6c, 0x61, 0x73, 0x73, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09,
+	0x52, 0x0e, 0x73, 0x65, 0x72, 0x44, 0x65, 0x43, 0x6c, 0x61, 0x73, 0x73, 0x4e, 0x61, 0x6d, 0x65,
+	0x12, 0x18, 0x0a, 0x07, 0x62, 0x75, 0x69, 0x6c, 0x74, 0x69, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28,
+	0x09, 0x52, 0x07, 0x62, 0x75, 0x69, 0x6c, 0x74, 0x69, 0x6e, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x63,
+	0x68, 0x65, 0x6d, 0x61, 0x54, 0x79, 0x70, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a,
+	0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x54, 0x79, 0x70, 0x65, 0x12, 0x42, 0x0a, 0x1c, 0x66, 0x6f,
 	0x72, 0x77, 0x61, 0x72, 0x64, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61,
-	0x67, 0x65, 0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x79, 0x12, 0x51, 0x0a, 0x10, 0x73, 0x63,
-	0x68, 0x65, 0x6d, 0x61, 0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x69, 0x65, 0x73, 0x18, 0x09,
-	0x20, 0x03, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x69, 0x6e,
-	0x6b, 0x53, 0x70, 0x65, 0x63, 0x2e, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x50, 0x72, 0x6f, 0x70,
-	0x65, 0x72, 0x74, 0x69, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x10, 0x73, 0x63, 0x68,
-	0x65, 0x6d, 0x61, 0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x69, 0x65, 0x73, 0x12, 0x57, 0x0a,
-	0x12, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74,
-	0x69, 0x65, 0x73, 0x18, 0x0a, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x70, 0x72, 0x6f, 0x74,
-	0x6f, 0x2e, 0x53, 0x69, 0x6e, 0x6b, 0x53, 0x70, 0x65, 0x63, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75,
-	0x6d, 0x65, 0x72, 0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x69, 0x65, 0x73, 0x45, 0x6e, 0x74,
-	0x72, 0x79, 0x52, 0x12, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x50, 0x72, 0x6f, 0x70,
-	0x65, 0x72, 0x74, 0x69, 0x65, 0x73, 0x1a, 0x43, 0x0a, 0x15, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61,
-	0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x69, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12,
-	0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65,
-	0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09,
-	0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x1a, 0x45, 0x0a, 0x17, 0x43,
+	0x67, 0x65, 0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x79, 0x18, 0x08, 0x20, 0x01, 0x28, 0x08,
+	0x52, 0x1c, 0x66, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4d,
+	0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x79, 0x12, 0x51,
+	0x0a, 0x10, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x69,
+	0x65, 0x73, 0x18, 0x09, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
+	0x2e, 0x53, 0x69, 0x6e, 0x6b, 0x53, 0x70, 0x65, 0x63, 0x2e, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61,
+	0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x69, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52,
+	0x10, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x69, 0x65,
+	0x73, 0x12, 0x57, 0x0a, 0x12, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x50, 0x72, 0x6f,
+	0x70, 0x65, 0x72, 0x74, 0x69, 0x65, 0x73, 0x18, 0x0a, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x27, 0x2e,
+	0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x69, 0x6e, 0x6b, 0x53, 0x70, 0x65, 0x63, 0x2e, 0x43,
 	0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x69, 0x65,
-	0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20,
-	0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75,
-	0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02,
-	0x38, 0x01, 0x22, 0x67, 0x0a, 0x17, 0x50, 0x61, 0x63, 0x6b, 0x61, 0x67, 0x65, 0x4c, 0x6f, 0x63,
-	0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x12, 0x20, 0x0a,
-	0x0b, 0x70, 0x61, 0x63, 0x6b, 0x61, 0x67, 0x65, 0x50, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01,
-	0x28, 0x09, 0x52, 0x0b, 0x70, 0x61, 0x63, 0x6b, 0x61, 0x67, 0x65, 0x50, 0x61, 0x74, 0x68, 0x12,
-	0x2a, 0x0a, 0x10, 0x6f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x61, 0x6c, 0x46, 0x69, 0x6c, 0x65, 0x4e,
-	0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x6f, 0x72, 0x69, 0x67, 0x69,
-	0x6e, 0x61, 0x6c, 0x46, 0x69, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0xd5, 0x03, 0x0a, 0x10,
-	0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61,
-	0x12, 0x40, 0x0a, 0x0f, 0x66, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x65, 0x74, 0x61,
-	0x69, 0x6c, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74,
-	0x6f, 0x2e, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x65, 0x74, 0x61, 0x69, 0x6c,
-	0x73, 0x52, 0x0f, 0x66, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x65, 0x74, 0x61, 0x69,
-	0x6c, 0x73, 0x12, 0x48, 0x0a, 0x0f, 0x70, 0x61, 0x63, 0x6b, 0x61, 0x67, 0x65, 0x4c, 0x6f, 0x63,
-	0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x70, 0x72,
-	0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x61, 0x63, 0x6b, 0x61, 0x67, 0x65, 0x4c, 0x6f, 0x63, 0x61, 0x74,
-	0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x52, 0x0f, 0x70, 0x61, 0x63,
-	0x6b, 0x61, 0x67, 0x65, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a, 0x07,
-	0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x76,
-	0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65,
-	0x54, 0x69, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x63, 0x72, 0x65, 0x61,
-	0x74, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x53, 0x0a, 0x0e, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e,
-	0x63, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2b,
-	0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4d,
-	0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x2e, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65,
-	0x53, 0x74, 0x61, 0x74, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0e, 0x69, 0x6e, 0x73,
-	0x74, 0x61, 0x6e, 0x63, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x73, 0x12, 0x4d, 0x0a, 0x10, 0x66,
-	0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x75, 0x74, 0x68, 0x53, 0x70, 0x65, 0x63, 0x18,
-	0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x46, 0x75,
-	0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x75, 0x74, 0x68, 0x65, 0x6e, 0x74, 0x69, 0x63, 0x61,
-	0x74, 0x69, 0x6f, 0x6e, 0x53, 0x70, 0x65, 0x63, 0x52, 0x10, 0x66, 0x75, 0x6e, 0x63, 0x74, 0x69,
-	0x6f, 0x6e, 0x41, 0x75, 0x74, 0x68, 0x53, 0x70, 0x65, 0x63, 0x1a, 0x57, 0x0a, 0x13, 0x49, 0x6e,
-	0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72,
-	0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03,
-	0x6b, 0x65, 0x79, 0x12, 0x2a, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01,
-	0x28, 0x0e, 0x32, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x46, 0x75, 0x6e, 0x63, 0x74,
-	0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a,
-	0x02, 0x38, 0x01, 0x22, 0x4c, 0x0a, 0x1a, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x41,
-	0x75, 0x74, 0x68, 0x65, 0x6e, 0x74, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x70, 0x65,
-	0x63, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52,
-	0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65,
-	0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65,
-	0x72, 0x22, 0x6f, 0x0a, 0x08, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x12, 0x43, 0x0a,
-	0x10, 0x66, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74,
-	0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e,
-	0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61,
-	0x52, 0x10, 0x66, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61,
-	0x74, 0x61, 0x12, 0x1e, 0x0a, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x64,
-	0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65,
-	0x49, 0x64, 0x22, 0x55, 0x0a, 0x0a, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74,
-	0x12, 0x2b, 0x0a, 0x08, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01,
-	0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x49, 0x6e, 0x73, 0x74, 0x61,
-	0x6e, 0x63, 0x65, 0x52, 0x08, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x12, 0x1a, 0x0a,
-	0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
-	0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x2a, 0x4f, 0x0a, 0x14, 0x50, 0x72, 0x6f,
-	0x63, 0x65, 0x73, 0x73, 0x69, 0x6e, 0x67, 0x47, 0x75, 0x61, 0x72, 0x61, 0x6e, 0x74, 0x65, 0x65,
-	0x73, 0x12, 0x10, 0x0a, 0x0c, 0x41, 0x54, 0x4c, 0x45, 0x41, 0x53, 0x54, 0x5f, 0x4f, 0x4e, 0x43,
-	0x45, 0x10, 0x00, 0x12, 0x0f, 0x0a, 0x0b, 0x41, 0x54, 0x4d, 0x4f, 0x53, 0x54, 0x5f, 0x4f, 0x4e,
-	0x43, 0x45, 0x10, 0x01, 0x12, 0x14, 0x0a, 0x10, 0x45, 0x46, 0x46, 0x45, 0x43, 0x54, 0x49, 0x56,
-	0x45, 0x4c, 0x59, 0x5f, 0x4f, 0x4e, 0x43, 0x45, 0x10, 0x02, 0x2a, 0x3c, 0x0a, 0x10, 0x53, 0x75,
-	0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0a,
-	0x0a, 0x06, 0x53, 0x48, 0x41, 0x52, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0c, 0x0a, 0x08, 0x46, 0x41,
-	0x49, 0x4c, 0x4f, 0x56, 0x45, 0x52, 0x10, 0x01, 0x12, 0x0e, 0x0a, 0x0a, 0x4b, 0x45, 0x59, 0x5f,
-	0x53, 0x48, 0x41, 0x52, 0x45, 0x44, 0x10, 0x02, 0x2a, 0x30, 0x0a, 0x14, 0x53, 0x75, 0x62, 0x73,
-	0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e,
-	0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x41, 0x54, 0x45, 0x53, 0x54, 0x10, 0x00, 0x12, 0x0c, 0x0a, 0x08,
-	0x45, 0x41, 0x52, 0x4c, 0x49, 0x45, 0x53, 0x54, 0x10, 0x01, 0x2a, 0x29, 0x0a, 0x0d, 0x46, 0x75,
-	0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x52,
-	0x55, 0x4e, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x54, 0x4f, 0x50,
-	0x50, 0x45, 0x44, 0x10, 0x01, 0x42, 0x2d, 0x0a, 0x21, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61,
-	0x63, 0x68, 0x65, 0x2e, 0x70, 0x75, 0x6c, 0x73, 0x61, 0x72, 0x2e, 0x66, 0x75, 0x6e, 0x63, 0x74,
-	0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x42, 0x08, 0x46, 0x75, 0x6e, 0x63,
-	0x74, 0x69, 0x6f, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+	0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x12, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72,
+	0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x69, 0x65, 0x73, 0x1a, 0x43, 0x0a, 0x15, 0x53, 0x63,
+	0x68, 0x65, 0x6d, 0x61, 0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x69, 0x65, 0x73, 0x45, 0x6e,
+	0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
+	0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02,
+	0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x1a,
+	0x45, 0x0a, 0x17, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x50, 0x72, 0x6f, 0x70, 0x65,
+	0x72, 0x74, 0x69, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65,
+	0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05,
+	0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c,
+	0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x67, 0x0a, 0x17, 0x50, 0x61, 0x63, 0x6b, 0x61, 0x67,
+	0x65, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74,
+	0x61, 0x12, 0x20, 0x0a, 0x0b, 0x70, 0x61, 0x63, 0x6b, 0x61, 0x67, 0x65, 0x50, 0x61, 0x74, 0x68,
+	0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x70, 0x61, 0x63, 0x6b, 0x61, 0x67, 0x65, 0x50,
+	0x61, 0x74, 0x68, 0x12, 0x2a, 0x0a, 0x10, 0x6f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x61, 0x6c, 0x46,
+	0x69, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x6f,
+	0x72, 0x69, 0x67, 0x69, 0x6e, 0x61, 0x6c, 0x46, 0x69, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x22,
+	0xd5, 0x03, 0x0a, 0x10, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x61,
+	0x44, 0x61, 0x74, 0x61, 0x12, 0x40, 0x0a, 0x0f, 0x66, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e,
+	0x44, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e,
+	0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x65,
+	0x74, 0x61, 0x69, 0x6c, 0x73, 0x52, 0x0f, 0x66, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x44,
+	0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x12, 0x48, 0x0a, 0x0f, 0x70, 0x61, 0x63, 0x6b, 0x61, 0x67,
+	0x65, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32,
+	0x1e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x61, 0x63, 0x6b, 0x61, 0x67, 0x65, 0x4c,
+	0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x52,
+	0x0f, 0x70, 0x61, 0x63, 0x6b, 0x61, 0x67, 0x65, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e,
+	0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28,
+	0x04, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x72,
+	0x65, 0x61, 0x74, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a,
+	0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x53, 0x0a, 0x0e, 0x69, 0x6e,
+	0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x73, 0x18, 0x05, 0x20, 0x03,
+	0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x46, 0x75, 0x6e, 0x63, 0x74,
+	0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x2e, 0x49, 0x6e, 0x73, 0x74,
+	0x61, 0x6e, 0x63, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52,
+	0x0e, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x73, 0x12,
+	0x4d, 0x0a, 0x10, 0x66, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x75, 0x74, 0x68, 0x53,
+	0x70, 0x65, 0x63, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x70, 0x72, 0x6f, 0x74,
+	0x6f, 0x2e, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x75, 0x74, 0x68, 0x65, 0x6e,
+	0x74, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x70, 0x65, 0x63, 0x52, 0x10, 0x66, 0x75,
+	0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x75, 0x74, 0x68, 0x53, 0x70, 0x65, 0x63, 0x1a, 0x57,
+	0x0a, 0x13, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x73,
+	0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01,
+	0x28, 0x05, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x2a, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65,
+	0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x46,
+	0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x76, 0x61,
+	0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x4c, 0x0a, 0x1a, 0x46, 0x75, 0x6e, 0x63, 0x74,
+	0x69, 0x6f, 0x6e, 0x41, 0x75, 0x74, 0x68, 0x65, 0x6e, 0x74, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f,
+	0x6e, 0x53, 0x70, 0x65, 0x63, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20,
+	0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f,
+	0x76, 0x69, 0x64, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x72, 0x6f,
+	0x76, 0x69, 0x64, 0x65, 0x72, 0x22, 0x6f, 0x0a, 0x08, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63,
+	0x65, 0x12, 0x43, 0x0a, 0x10, 0x66, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74,
+	0x61, 0x44, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x70, 0x72,
+	0x6f, 0x74, 0x6f, 0x2e, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x61,
+	0x44, 0x61, 0x74, 0x61, 0x52, 0x10, 0x66, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65,
+	0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x12, 0x1e, 0x0a, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e,
+	0x63, 0x65, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x69, 0x6e, 0x73, 0x74,
+	0x61, 0x6e, 0x63, 0x65, 0x49, 0x64, 0x22, 0x55, 0x0a, 0x0a, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e,
+	0x6d, 0x65, 0x6e, 0x74, 0x12, 0x2b, 0x0a, 0x08, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65,
+	0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x49,
+	0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x52, 0x08, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63,
+	0x65, 0x12, 0x1a, 0x0a, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x18, 0x02, 0x20,
+	0x01, 0x28, 0x09, 0x52, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x2a, 0x4f, 0x0a,
+	0x14, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x69, 0x6e, 0x67, 0x47, 0x75, 0x61, 0x72, 0x61,
+	0x6e, 0x74, 0x65, 0x65, 0x73, 0x12, 0x10, 0x0a, 0x0c, 0x41, 0x54, 0x4c, 0x45, 0x41, 0x53, 0x54,
+	0x5f, 0x4f, 0x4e, 0x43, 0x45, 0x10, 0x00, 0x12, 0x0f, 0x0a, 0x0b, 0x41, 0x54, 0x4d, 0x4f, 0x53,
+	0x54, 0x5f, 0x4f, 0x4e, 0x43, 0x45, 0x10, 0x01, 0x12, 0x14, 0x0a, 0x10, 0x45, 0x46, 0x46, 0x45,
+	0x43, 0x54, 0x49, 0x56, 0x45, 0x4c, 0x59, 0x5f, 0x4f, 0x4e, 0x43, 0x45, 0x10, 0x02, 0x2a, 0x3c,
+	0x0a, 0x10, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79,
+	0x70, 0x65, 0x12, 0x0a, 0x0a, 0x06, 0x53, 0x48, 0x41, 0x52, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0c,
+	0x0a, 0x08, 0x46, 0x41, 0x49, 0x4c, 0x4f, 0x56, 0x45, 0x52, 0x10, 0x01, 0x12, 0x0e, 0x0a, 0x0a,
+	0x4b, 0x45, 0x59, 0x5f, 0x53, 0x48, 0x41, 0x52, 0x45, 0x44, 0x10, 0x02, 0x2a, 0x30, 0x0a, 0x14,
+	0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x6f, 0x73, 0x69,
+	0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x41, 0x54, 0x45, 0x53, 0x54, 0x10, 0x00,
+	0x12, 0x0c, 0x0a, 0x08, 0x45, 0x41, 0x52, 0x4c, 0x49, 0x45, 0x53, 0x54, 0x10, 0x01, 0x2a, 0x29,
+	0x0a, 0x0d, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12,
+	0x0b, 0x0a, 0x07, 0x52, 0x55, 0x4e, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07,
+	0x53, 0x54, 0x4f, 0x50, 0x50, 0x45, 0x44, 0x10, 0x01, 0x42, 0x2d, 0x0a, 0x21, 0x6f, 0x72, 0x67,
+	0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x70, 0x75, 0x6c, 0x73, 0x61, 0x72, 0x2e, 0x66,
+	0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x42, 0x08,
+	0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
 }
 
 var (
diff --git a/pulsar-function-go/pb/InstanceCommunication.pb.go b/pulsar-function-go/pb/InstanceCommunication.pb.go
index 572df69..aa7fcb7 100644
--- a/pulsar-function-go/pb/InstanceCommunication.pb.go
+++ b/pulsar-function-go/pb/InstanceCommunication.pb.go
@@ -20,7 +20,7 @@
 // Code generated by protoc-gen-go. DO NOT EDIT.
 // versions:
 // 	protoc-gen-go v1.25.0
-// 	protoc        v3.13.0
+// 	protoc        v3.17.3
 // source: InstanceCommunication.proto
 
 package api
@@ -28,12 +28,12 @@ package api
 import (
 	context "context"
 	proto "github.com/golang/protobuf/proto"
-	empty "github.com/golang/protobuf/ptypes/empty"
 	grpc "google.golang.org/grpc"
 	codes "google.golang.org/grpc/codes"
 	status "google.golang.org/grpc/status"
 	protoreflect "google.golang.org/protobuf/reflect/protoreflect"
 	protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+	emptypb "google.golang.org/protobuf/types/known/emptypb"
 	reflect "reflect"
 	sync "sync"
 )
@@ -836,7 +836,7 @@ var file_InstanceCommunication_proto_goTypes = []interface{}{
 	(*FunctionStatus_ExceptionInformation)(nil), // 5: proto.FunctionStatus.ExceptionInformation
 	nil,                             // 6: proto.MetricsData.UserMetricsEntry
 	(*Metrics_InstanceMetrics)(nil), // 7: proto.Metrics.InstanceMetrics
-	(*empty.Empty)(nil),             // 8: google.protobuf.Empty
+	(*emptypb.Empty)(nil),           // 8: google.protobuf.Empty
 }
 var file_InstanceCommunication_proto_depIdxs = []int32{
 	5,  // 0: proto.FunctionStatus.latestUserExceptions:type_name -> proto.FunctionStatus.ExceptionInformation
@@ -987,11 +987,11 @@ const _ = grpc.SupportPackageIsVersion6
 //
 // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
 type InstanceControlClient interface {
-	GetFunctionStatus(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*FunctionStatus, error)
-	GetAndResetMetrics(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*MetricsData, error)
-	ResetMetrics(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error)
-	GetMetrics(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*MetricsData, error)
-	HealthCheck(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*HealthCheckResult, error)
+	GetFunctionStatus(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*FunctionStatus, error)
+	GetAndResetMetrics(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*MetricsData, error)
+	ResetMetrics(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*emptypb.Empty, error)
+	GetMetrics(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*MetricsData, error)
+	HealthCheck(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*HealthCheckResult, error)
 }
 
 type instanceControlClient struct {
@@ -1002,7 +1002,7 @@ func NewInstanceControlClient(cc grpc.ClientConnInterface) InstanceControlClient
 	return &instanceControlClient{cc}
 }
 
-func (c *instanceControlClient) GetFunctionStatus(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*FunctionStatus, error) {
+func (c *instanceControlClient) GetFunctionStatus(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*FunctionStatus, error) {
 	out := new(FunctionStatus)
 	err := c.cc.Invoke(ctx, "/proto.InstanceControl/GetFunctionStatus", in, out, opts...)
 	if err != nil {
@@ -1011,7 +1011,7 @@ func (c *instanceControlClient) GetFunctionStatus(ctx context.Context, in *empty
 	return out, nil
 }
 
-func (c *instanceControlClient) GetAndResetMetrics(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*MetricsData, error) {
+func (c *instanceControlClient) GetAndResetMetrics(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*MetricsData, error) {
 	out := new(MetricsData)
 	err := c.cc.Invoke(ctx, "/proto.InstanceControl/GetAndResetMetrics", in, out, opts...)
 	if err != nil {
@@ -1020,8 +1020,8 @@ func (c *instanceControlClient) GetAndResetMetrics(ctx context.Context, in *empt
 	return out, nil
 }
 
-func (c *instanceControlClient) ResetMetrics(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error) {
-	out := new(empty.Empty)
+func (c *instanceControlClient) ResetMetrics(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*emptypb.Empty, error) {
+	out := new(emptypb.Empty)
 	err := c.cc.Invoke(ctx, "/proto.InstanceControl/ResetMetrics", in, out, opts...)
 	if err != nil {
 		return nil, err
@@ -1029,7 +1029,7 @@ func (c *instanceControlClient) ResetMetrics(ctx context.Context, in *empty.Empt
 	return out, nil
 }
 
-func (c *instanceControlClient) GetMetrics(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*MetricsData, error) {
+func (c *instanceControlClient) GetMetrics(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*MetricsData, error) {
 	out := new(MetricsData)
 	err := c.cc.Invoke(ctx, "/proto.InstanceControl/GetMetrics", in, out, opts...)
 	if err != nil {
@@ -1038,7 +1038,7 @@ func (c *instanceControlClient) GetMetrics(ctx context.Context, in *empty.Empty,
 	return out, nil
 }
 
-func (c *instanceControlClient) HealthCheck(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*HealthCheckResult, error) {
+func (c *instanceControlClient) HealthCheck(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*HealthCheckResult, error) {
 	out := new(HealthCheckResult)
 	err := c.cc.Invoke(ctx, "/proto.InstanceControl/HealthCheck", in, out, opts...)
 	if err != nil {
@@ -1049,30 +1049,30 @@ func (c *instanceControlClient) HealthCheck(ctx context.Context, in *empty.Empty
 
 // InstanceControlServer is the server API for InstanceControl service.
 type InstanceControlServer interface {
-	GetFunctionStatus(context.Context, *empty.Empty) (*FunctionStatus, error)
-	GetAndResetMetrics(context.Context, *empty.Empty) (*MetricsData, error)
-	ResetMetrics(context.Context, *empty.Empty) (*empty.Empty, error)
-	GetMetrics(context.Context, *empty.Empty) (*MetricsData, error)
-	HealthCheck(context.Context, *empty.Empty) (*HealthCheckResult, error)
+	GetFunctionStatus(context.Context, *emptypb.Empty) (*FunctionStatus, error)
+	GetAndResetMetrics(context.Context, *emptypb.Empty) (*MetricsData, error)
+	ResetMetrics(context.Context, *emptypb.Empty) (*emptypb.Empty, error)
+	GetMetrics(context.Context, *emptypb.Empty) (*MetricsData, error)
+	HealthCheck(context.Context, *emptypb.Empty) (*HealthCheckResult, error)
 }
 
 // UnimplementedInstanceControlServer can be embedded to have forward compatible implementations.
 type UnimplementedInstanceControlServer struct {
 }
 
-func (*UnimplementedInstanceControlServer) GetFunctionStatus(context.Context, *empty.Empty) (*FunctionStatus, error) {
+func (*UnimplementedInstanceControlServer) GetFunctionStatus(context.Context, *emptypb.Empty) (*FunctionStatus, error) {
 	return nil, status.Errorf(codes.Unimplemented, "method GetFunctionStatus not implemented")
 }
-func (*UnimplementedInstanceControlServer) GetAndResetMetrics(context.Context, *empty.Empty) (*MetricsData, error) {
+func (*UnimplementedInstanceControlServer) GetAndResetMetrics(context.Context, *emptypb.Empty) (*MetricsData, error) {
 	return nil, status.Errorf(codes.Unimplemented, "method GetAndResetMetrics not implemented")
 }
-func (*UnimplementedInstanceControlServer) ResetMetrics(context.Context, *empty.Empty) (*empty.Empty, error) {
+func (*UnimplementedInstanceControlServer) ResetMetrics(context.Context, *emptypb.Empty) (*emptypb.Empty, error) {
 	return nil, status.Errorf(codes.Unimplemented, "method ResetMetrics not implemented")
 }
-func (*UnimplementedInstanceControlServer) GetMetrics(context.Context, *empty.Empty) (*MetricsData, error) {
+func (*UnimplementedInstanceControlServer) GetMetrics(context.Context, *emptypb.Empty) (*MetricsData, error) {
 	return nil, status.Errorf(codes.Unimplemented, "method GetMetrics not implemented")
 }
-func (*UnimplementedInstanceControlServer) HealthCheck(context.Context, *empty.Empty) (*HealthCheckResult, error) {
+func (*UnimplementedInstanceControlServer) HealthCheck(context.Context, *emptypb.Empty) (*HealthCheckResult, error) {
 	return nil, status.Errorf(codes.Unimplemented, "method HealthCheck not implemented")
 }
 
@@ -1081,7 +1081,7 @@ func RegisterInstanceControlServer(s *grpc.Server, srv InstanceControlServer) {
 }
 
 func _InstanceControl_GetFunctionStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
-	in := new(empty.Empty)
+	in := new(emptypb.Empty)
 	if err := dec(in); err != nil {
 		return nil, err
 	}
@@ -1093,13 +1093,13 @@ func _InstanceControl_GetFunctionStatus_Handler(srv interface{}, ctx context.Con
 		FullMethod: "/proto.InstanceControl/GetFunctionStatus",
 	}
 	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
-		return srv.(InstanceControlServer).GetFunctionStatus(ctx, req.(*empty.Empty))
+		return srv.(InstanceControlServer).GetFunctionStatus(ctx, req.(*emptypb.Empty))
 	}
 	return interceptor(ctx, in, info, handler)
 }
 
 func _InstanceControl_GetAndResetMetrics_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
-	in := new(empty.Empty)
+	in := new(emptypb.Empty)
 	if err := dec(in); err != nil {
 		return nil, err
 	}
@@ -1111,13 +1111,13 @@ func _InstanceControl_GetAndResetMetrics_Handler(srv interface{}, ctx context.Co
 		FullMethod: "/proto.InstanceControl/GetAndResetMetrics",
 	}
 	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
-		return srv.(InstanceControlServer).GetAndResetMetrics(ctx, req.(*empty.Empty))
+		return srv.(InstanceControlServer).GetAndResetMetrics(ctx, req.(*emptypb.Empty))
 	}
 	return interceptor(ctx, in, info, handler)
 }
 
 func _InstanceControl_ResetMetrics_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
-	in := new(empty.Empty)
+	in := new(emptypb.Empty)
 	if err := dec(in); err != nil {
 		return nil, err
 	}
@@ -1129,13 +1129,13 @@ func _InstanceControl_ResetMetrics_Handler(srv interface{}, ctx context.Context,
 		FullMethod: "/proto.InstanceControl/ResetMetrics",
 	}
 	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
-		return srv.(InstanceControlServer).ResetMetrics(ctx, req.(*empty.Empty))
+		return srv.(InstanceControlServer).ResetMetrics(ctx, req.(*emptypb.Empty))
 	}
 	return interceptor(ctx, in, info, handler)
 }
 
 func _InstanceControl_GetMetrics_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
-	in := new(empty.Empty)
+	in := new(emptypb.Empty)
 	if err := dec(in); err != nil {
 		return nil, err
 	}
@@ -1147,13 +1147,13 @@ func _InstanceControl_GetMetrics_Handler(srv interface{}, ctx context.Context, d
 		FullMethod: "/proto.InstanceControl/GetMetrics",
 	}
 	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
-		return srv.(InstanceControlServer).GetMetrics(ctx, req.(*empty.Empty))
+		return srv.(InstanceControlServer).GetMetrics(ctx, req.(*emptypb.Empty))
 	}
 	return interceptor(ctx, in, info, handler)
 }
 
 func _InstanceControl_HealthCheck_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
-	in := new(empty.Empty)
+	in := new(emptypb.Empty)
 	if err := dec(in); err != nil {
 		return nil, err
 	}
@@ -1165,7 +1165,7 @@ func _InstanceControl_HealthCheck_Handler(srv interface{}, ctx context.Context,
 		FullMethod: "/proto.InstanceControl/HealthCheck",
 	}
 	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
-		return srv.(InstanceControlServer).HealthCheck(ctx, req.(*empty.Empty))
+		return srv.(InstanceControlServer).HealthCheck(ctx, req.(*emptypb.Empty))
 	}
 	return interceptor(ctx, in, info, handler)
 }
diff --git a/pulsar-function-go/pb/Request.pb.go b/pulsar-function-go/pb/Request.pb.go
index 29aa249..162a184 100644
--- a/pulsar-function-go/pb/Request.pb.go
+++ b/pulsar-function-go/pb/Request.pb.go
@@ -20,7 +20,7 @@
 // Code generated by protoc-gen-go. DO NOT EDIT.
 // versions:
 // 	protoc-gen-go v1.25.0
-// 	protoc        v3.13.0
+// 	protoc        v3.17.3
 // source: Request.proto
 
 package api
diff --git a/pulsar-function-go/pb/doc.go b/pulsar-function-go/pb/doc.go
index 3bd8317..151be8b 100644
--- a/pulsar-function-go/pb/doc.go
+++ b/pulsar-function-go/pb/doc.go
@@ -27,8 +27,8 @@
 // https://github.com/apache/pulsar/tree/master/pulsar-functions/proto/src/main/proto
 //
 // The generated Go code was created from the source Pulsar files at git:
-//    tag:      v2.7.0-a55a405c7-633-g411bf710060
-//    revision: 411bf71006039af661be13c54dda4ce074f97679
+//    tag:      v2.7.0-a55a405c7-741-gcb6617fa02-18-g31f7d70bd4-40-g331ca7c684-16-g956328d84e-3-g21a3a3b537-2-g73e0dbd4df-350-gbd1db1b708-15-g0d52d0b1ca-108-gefacf71f41-27-g9afc59f21c-9-ge4b4627890-19-g510ecfa164-113-g7a89212afa-851-g9f105d0d871
+//    revision: 9f105d0d8716d7650299f43163c1f4ef2a5a819d
 //
 // Files generated by the protoc-gen-go program should not be modified.
 package api

[pulsar] 08/09: [function] enable protobuf-native schema support for function (#11868)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c0bada6198055c13b8004e16b5d436349dd9fe19
Author: Neng Lu <nl...@streamnative.io>
AuthorDate: Wed Sep 1 21:48:02 2021 -0700

    [function] enable protobuf-native schema support for function (#11868)
    
    Fixes #11721
    
    ### Motivation
    
    Enable function proces topic with protobuf_native schema
    
    ### Modifications
    
    update `TopicSchema`
    
    (cherry picked from commit d0e5d96185336f56a7599e97474a6074cf6b76a7)
---
 .../pulsar/functions/source/TopicSchema.java       |  4 ++
 .../pulsar/functions/source/TopicSchemaTest.java   | 58 ++++++++++++++++++++++
 2 files changed, 62 insertions(+)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index dcd424e..067a793 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema;
 import org.apache.pulsar.client.impl.schema.ProtobufSchema;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.schema.KeyValue;
@@ -172,6 +173,9 @@ public class TopicSchema {
         case PROTOBUF:
             return ProtobufSchema.ofGenericClass(clazz, new HashMap<>());
 
+        case PROTOBUF_NATIVE:
+            return ProtobufNativeSchema.ofGenericClass(clazz, new HashMap<>());
+
         case AUTO_PUBLISH:
             return (Schema<T>) Schema.AUTO_PRODUCE_BYTES();
 
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java
new file mode 100644
index 0000000..c746093
--- /dev/null
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.source;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema;
+import org.apache.pulsar.client.impl.schema.ProtobufSchema;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.functions.proto.Request;
+import org.junit.Test;
+
+import java.util.Optional;
+
+import static org.testng.Assert.assertEquals;
+
+@Slf4j
+public class TopicSchemaTest {
+
+    @Test
+    public void testGetSchema() {
+        TopicSchema topicSchema = new TopicSchema(null);
+
+        String TOPIC = "public/default/test";
+        Schema<?> schema = topicSchema.getSchema(TOPIC + "1", DummyClass.class, Optional.of(SchemaType.JSON));
+        assertEquals(schema.getClass(), JSONSchema.class);
+
+        schema = topicSchema.getSchema(TOPIC + "2", DummyClass.class, Optional.of(SchemaType.AVRO));
+        assertEquals(schema.getClass(), AvroSchema.class);
+
+        // use an arbitrary protobuf class for testing purpose
+        schema = topicSchema.getSchema(TOPIC + "3", Request.ServiceRequest.class, Optional.of(SchemaType.PROTOBUF));
+        assertEquals(schema.getClass(), ProtobufSchema.class);
+
+        schema = topicSchema.getSchema(TOPIC + "4", Request.ServiceRequest.class, Optional.of(SchemaType.PROTOBUF_NATIVE));
+        assertEquals(schema.getClass(), ProtobufNativeSchema.class);
+    }
+
+    private static class DummyClass {}
+}

[pulsar] 02/09: [Python] Handle py::call_method error without mutating internal state (#11840)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3129c9634eb2f3c155ae46170ab16710912288ea
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Sep 1 00:48:25 2021 +0800

    [Python] Handle py::call_method error without mutating internal state (#11840)
    
    Fixes #11823
    
    When the Python logger is customized with underlying `LoggerWrapper` objects, sometimes `async` Python functions may return an incorrect value like `None`. It's because there's a bug (or feature?) of Boost-python that `py::call_method` will fail in C++ object's destructor. See https://github.com/boostorg/python/issues/374 for details.
    
    For the code example in #11823, it's because in `ConsumerImpl`'s destructor, the logger for `AckGroupingTrackerEnabled` will be created again because the logger is thread local and will be created in new threads. In this case, `py::call_method` in `LoggerWrapper#_updateCurrentPythonLogLevel` will fail, and `PyErr_Print` will be called and the error indicator will be cleared, which leads to the result that `async` functions' result became `None`.
    
    - Reduce unnecessary `Logger.getEffectiveLevel` calls to get Python log level , just get the log level when the logger factory is initialized and pass the same level to all loggers.
    - Remove the `PyErr_Print` calls in `LoggerWrapper` related code. In the cases when `py::call_method` failed, use the fallback logger to print logs.
    - Add a dependent test for custom logger test because once the `LoggerFactory` was set all other tests would be affected.
    
    - [x] Make sure that the change passes the CI checks.
    
    This change added test `CustomLoggingTest`. Since `asyncio` module was introduced from Python 3.3 while CI is based on Python 2.7, this test cannot be tested by CI unless Python3 based CI was added.
    
    (cherry picked from commit 9153e7131736decf526684a3bddc4d3f7e307ce3)
---
 pulsar-client-cpp/python/custom_logger_test.py | 54 +++++++++++++++++++
 pulsar-client-cpp/python/pulsar_test.py        |  5 --
 pulsar-client-cpp/python/src/config.cc         | 72 +++++++++++++++-----------
 pulsar-client-cpp/run-unit-tests.sh            |  9 ++++
 4 files changed, 105 insertions(+), 35 deletions(-)

diff --git a/pulsar-client-cpp/python/custom_logger_test.py b/pulsar-client-cpp/python/custom_logger_test.py
new file mode 100644
index 0000000..de0600cc
--- /dev/null
+++ b/pulsar-client-cpp/python/custom_logger_test.py
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from unittest import TestCase, main
+import asyncio
+import logging
+from pulsar import Client
+
+class CustomLoggingTest(TestCase):
+
+    serviceUrl = 'pulsar://localhost:6650'
+
+    def test_async_func_with_custom_logger(self):
+        # boost::python::call may fail in C++ destructors, even worse, calls
+        # to PyErr_Print could corrupt the Python interpreter.
+        # See https://github.com/boostorg/python/issues/374 for details.
+        # This test is to verify these functions won't be called in C++ destructors
+        # so that Python's async function works well.
+        client = Client(
+            self.serviceUrl,
+            logger=logging.getLogger('custom-logger')
+        )
+
+        async def async_get(value):
+            consumer = client.subscribe('test_async_get', 'sub')
+            consumer.close()
+            return value
+
+        value = 'foo'
+        result = asyncio.run(async_get(value))
+        self.assertEqual(value, result)
+
+        client.close()
+
+if __name__ == '__main__':
+    logging.basicConfig(encoding='utf-8', level=logging.DEBUG)
+    main()
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index 4a8ca03..7c85f77 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -19,7 +19,6 @@
 #
 
 
-import logging
 from unittest import TestCase, main
 import time
 import os
@@ -104,10 +103,6 @@ class PulsarTest(TestCase):
         conf.replicate_subscription_state_enabled(True)
         self.assertEqual(conf.replicate_subscription_state_enabled(), True)
 
-    def test_client_logger(self):
-        logger = logging.getLogger("pulsar")
-        Client(self.serviceUrl, logger=logger)
-        
     def test_connect_error(self):
         with self.assertRaises(pulsar.ConnectError):
             client = Client('fakeServiceUrl')
diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc
index 0b30713..c34e818 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -18,6 +18,8 @@
  */
 #include "utils.h"
 #include <pulsar/ConsoleLoggerFactory.h>
+#include "lib/Utils.h"
+#include <memory>
 
 template<typename T>
 struct ListenerWrapper {
@@ -90,51 +92,40 @@ static ProducerConfiguration& ProducerConfiguration_setCryptoKeyReader(ProducerC
 }
 
 class LoggerWrapper: public Logger {
-    PyObject* _pyLogger;
-    Logger* fallbackLogger;
-    int _currentPythonLogLevel = _getLogLevelValue(Logger::LEVEL_INFO);
-
-    void _updateCurrentPythonLogLevel() {
-        PyGILState_STATE state = PyGILState_Ensure();
-
-        try {
-            _currentPythonLogLevel = py::call_method<int>(_pyLogger, "getEffectiveLevel");
-        } catch (py::error_already_set e) {
-            PyErr_Print();
-        }
-
-        PyGILState_Release(state);
-    };
+    PyObject* const _pyLogger;
+    const int _pythonLogLevel;
+    const std::unique_ptr<Logger> _fallbackLogger;
 
-    int _getLogLevelValue(Level level) {
+    static constexpr int _getLogLevelValue(Level level) {
         return 10 + (level * 10);
     }
 
    public:
 
-    LoggerWrapper(const std::string &filename, PyObject* pyLogger) {
-        _pyLogger = pyLogger;
+    LoggerWrapper(PyObject* pyLogger, int pythonLogLevel, Logger* fallbackLogger)
+        : _pyLogger(pyLogger),
+          _pythonLogLevel(pythonLogLevel),
+          _fallbackLogger(fallbackLogger) {
         Py_XINCREF(_pyLogger);
-
-        std::unique_ptr<LoggerFactory> factory(new ConsoleLoggerFactory());
-        fallbackLogger = factory->getLogger(filename);
-
-        _updateCurrentPythonLogLevel();
     }
 
+    LoggerWrapper(const LoggerWrapper&) = delete;
+    LoggerWrapper(LoggerWrapper&&) noexcept = delete;
+    LoggerWrapper& operator=(const LoggerWrapper&) = delete;
+    LoggerWrapper& operator=(LoggerWrapper&&) = delete;
+
     virtual ~LoggerWrapper() {
         Py_XDECREF(_pyLogger);
-        delete fallbackLogger;
     }
 
     bool isEnabled(Level level) {
-        return _getLogLevelValue(level) >= _currentPythonLogLevel;
+        return _getLogLevelValue(level) >= _pythonLogLevel;
     }
 
     void log(Level level, int line, const std::string& message) {
-        if (Py_IsInitialized() != true) {
+        if (!Py_IsInitialized()) {
             // Python logger is unavailable - fallback to console logger
-            fallbackLogger->log(level, line, message);
+            _fallbackLogger->log(level, line, message);
         } else {
             PyGILState_STATE state = PyGILState_Ensure();
 
@@ -153,9 +144,8 @@ class LoggerWrapper: public Logger {
                         py::call_method<void>(_pyLogger, "error", message.c_str());
                         break;
                 }
-
             } catch (py::error_already_set e) {
-                PyErr_Print();
+                _fallbackLogger->log(level, line, message);
             }
 
             PyGILState_Release(state);
@@ -164,12 +154,29 @@ class LoggerWrapper: public Logger {
 };
 
 class LoggerWrapperFactory : public LoggerFactory {
+    std::unique_ptr<LoggerFactory> _fallbackLoggerFactory{new ConsoleLoggerFactory};
     PyObject* _pyLogger;
+    Optional<int> _pythonLogLevel{Optional<int>::empty()};
+
+    void initializePythonLogLevel() {
+        PyGILState_STATE state = PyGILState_Ensure();
+
+        try {
+            int level = py::call_method<int>(_pyLogger, "getEffectiveLevel");
+            _pythonLogLevel = Optional<int>::of(level);
+        } catch (const py::error_already_set& e) {
+            // Failed to get log level from _pyLogger, set it to empty to fallback to _fallbackLogger
+            _pythonLogLevel = Optional<int>::empty();
+        }
+
+        PyGILState_Release(state);
+    }
 
    public:
     LoggerWrapperFactory(py::object pyLogger) {
         _pyLogger = pyLogger.ptr();
         Py_XINCREF(_pyLogger);
+        initializePythonLogLevel();
     }
 
     virtual ~LoggerWrapperFactory() {
@@ -177,7 +184,12 @@ class LoggerWrapperFactory : public LoggerFactory {
     }
 
     Logger* getLogger(const std::string &fileName) {
-        return new LoggerWrapper(fileName, _pyLogger);
+        const auto fallbackLogger = _fallbackLoggerFactory->getLogger(fileName);
+        if (_pythonLogLevel.is_present()) {
+            return new LoggerWrapper(_pyLogger, _pythonLogLevel.value(), fallbackLogger);
+        } else {
+            return fallbackLogger;
+        }
     }
 };
 
diff --git a/pulsar-client-cpp/run-unit-tests.sh b/pulsar-client-cpp/run-unit-tests.sh
index 7e40a49..b8d6a8e 100755
--- a/pulsar-client-cpp/run-unit-tests.sh
+++ b/pulsar-client-cpp/run-unit-tests.sh
@@ -73,12 +73,21 @@ if [ $RES -eq 0 ]; then
     cp *_test.py /tmp
     pushd /tmp
 
+    # TODO: this test requires asyncio module that is supported by Python >= 3.3.
+    #  Hoeever, CI doesn't support Python3 yet, we should uncomment following
+    #  lines after Python3 CI script is added.
+    #python custom_logger_test.py
+    #RES=$?
+    #echo "custom_logger_test.py: $RES"
+
     python pulsar_test.py
     RES=$?
+    echo "pulsar_test.py: $RES"
 
     echo "---- Running Python Function Instance unit tests"
     bash $ROOT_DIR/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh
     RES=$?
+    echo "run_python_instance_tests.sh: $RES"
 
     popd
     popd

[pulsar] 09/09: Fixed ZKSessionTest.testReacquireLocksAfterSessionLost (#11886)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 91bdc48ced5172b20fdf5f88e1c2f8c32e612519
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Sep 1 14:47:47 2021 -0700

    Fixed ZKSessionTest.testReacquireLocksAfterSessionLost (#11886)
    
    
    (cherry picked from commit bc49191d1816bedf8cb6a8ba156b8593b921b27a)
---
 .../coordination/impl/LockManagerImpl.java         |  3 +++
 .../coordination/impl/ResourceLockImpl.java        | 23 +++++++++++++++++++---
 .../org/apache/pulsar/metadata/ZKSessionTest.java  |  8 +++-----
 3 files changed, 26 insertions(+), 8 deletions(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
index aadaf51..0e3754ac 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
@@ -107,6 +107,9 @@ class LockManagerImpl<T> implements LockManager<T> {
         if (se == SessionEvent.SessionReestablished) {
             log.info("Metadata store session has been re-established. Revalidating all the existing locks.");
             locks.values().forEach(ResourceLockImpl::revalidate);
+        } else if (se == SessionEvent.Reconnected) {
+            log.info("Metadata store connection has been re-established. Revalidating locks that were pending.");
+            locks.values().forEach(ResourceLockImpl::revalidateIfNeededAfterReconnection);
         }
     }
 
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
index 2ca7022..55b420e 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
@@ -43,6 +43,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
     private volatile T value;
     private long version;
     private final CompletableFuture<Void> expiredFuture;
+    private boolean revalidateAfterReconnection = false;
 
     private enum State {
         Init,
@@ -197,14 +198,30 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
                 .thenRun(() -> log.info("Successfully revalidated the lock on {}", path))
                 .exceptionally(ex -> {
                     synchronized (ResourceLockImpl.this) {
-                        log.warn("Failed to revalidate the lock at {}. Marked as expired", path);
-                        state = State.Released;
-                        expiredFuture.complete(null);
+                        if (ex.getCause() instanceof BadVersionException) {
+                            log.warn("Failed to revalidate the lock at {}. Marked as expired", path);
+                            state = State.Released;
+                            expiredFuture.complete(null);
+                        } else {
+                            // We failed to revalidate the lock due to connectivity issue
+                            // Continue assuming we hold the lock, until we can revalidate it, either
+                            // on Reconnected or SessionReestablished events.
+                            log.warn("Failed to revalidate the lock at {}. Retrying later on reconnection", path,
+                                    ex.getCause().getMessage());
+                        }
                     }
                     return null;
                 });
     }
 
+    synchronized void revalidateIfNeededAfterReconnection() {
+        if (revalidateAfterReconnection) {
+            revalidateAfterReconnection = false;
+            log.warn("Revalidate lock at {} after reconnection", path);
+            revalidate();
+        }
+    }
+
     synchronized CompletableFuture<Void> revalidate() {
         return store.get(path)
                 .thenCompose(optGetResult -> {
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
index 7a0a006..43c67fc 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
@@ -117,6 +117,7 @@ public class ZKSessionTest extends BaseMetadataStoreTest {
 
         ResourceLock<String> lock = lm1.acquireLock(path, "value-1").join();
 
+
         zks.expireSession(((ZKMetadataStore) store).getZkSessionId());
 
         SessionEvent e = sessionEvents.poll(5, TimeUnit.SECONDS);
@@ -130,11 +131,8 @@ public class ZKSessionTest extends BaseMetadataStoreTest {
         e = sessionEvents.poll(10, TimeUnit.SECONDS);
         assertEquals(e, SessionEvent.SessionReestablished);
 
-        Awaitility.waitAtMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
-            assertFalse(lock.getLockExpiredFuture().isDone());
-        });
-
-        assertTrue(store.get(path).join().isPresent());
+        Awaitility.await().untilAsserted(() -> assertTrue(store.get(path).join().isPresent()));
+        assertFalse(lock.getLockExpiredFuture().isDone());
     }
 
     @Test

[pulsar] 05/09: Reduce redundant FLOW requests for non-durable multi-topics consumer (#11802)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0669247047f8050e6dae83d9f608f3a5822520b3
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Sep 2 13:09:12 2021 +0800

    Reduce redundant FLOW requests for non-durable multi-topics consumer (#11802)
    
    ### Motivation
    
    https://github.com/apache/pulsar/pull/3960 fixed the bug that reader will get stuck if it's reading a partition of a partitioned topic. The fix is using `isDurable` to check whether the consumer is a reader's internal consumer because it used `partitionIndex` to check whether the target topic is a partition while reader's `partitionIndex` is already set. However, for a non-durable multi-topics consumer, `isDurable` is false and each internal consumer will send FLOW request once the co [...]
    
    After https://github.com/apache/pulsar/pull/4591 introduced `hasParentConsumer` field, the check works for even a reader without the `isDurable` check.
    
    ### Modifications
    
    - Remove the check for `isDurable` before sending FLOW request and update the related comment.
    - Add a test for non-durable multi-topics consumer to verify the number of FLOW requests is the topics number, not the twice the topics number.
    
    ### Verifying this change
    
    - [x] Make sure that the change passes the CI checks.
    
    This change added `NonDurableSubscriptionTest#testFlowCountForMultiTopics` and the existing test `ReaderTest#testReadFromPartition` added in #3960 can also pass after this change.
    This change added tests and can be verified as follows:
    
    (cherry picked from commit 1303e7dc7a70f10c5da015804184caecd218e7e3)
---
 .../client/api/NonDurableSubscriptionTest.java     | 57 +++++++++++++++++++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  4 +-
 2 files changed, 57 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
index 09f6f39..bef6274 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
@@ -21,18 +21,25 @@ package org.apache.pulsar.client.api;
 import java.lang.reflect.Field;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.PulsarChannelInitializer;
+import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSubscription;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.common.api.proto.CommandFlow;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.AssertJUnit.assertFalse;
 import static org.testng.AssertJUnit.assertNull;
@@ -42,6 +49,8 @@ import static org.testng.AssertJUnit.assertTrue;
 @Slf4j
 public class NonDurableSubscriptionTest  extends ProducerConsumerBase {
 
+    private final AtomicInteger numFlow = new AtomicInteger(0);
+
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
@@ -56,6 +65,34 @@ public class NonDurableSubscriptionTest  extends ProducerConsumerBase {
         super.internalCleanup();
     }
 
+    @Override
+    protected PulsarService newPulsarService(ServiceConfiguration conf) throws Exception {
+        return new PulsarService(conf) {
+
+            @Override
+            protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
+                BrokerService broker = new BrokerService(this, ioEventLoopGroup);
+                broker.setPulsarChannelInitializerFactory(
+                        (_pulsar, tls) -> {
+                            return new PulsarChannelInitializer(_pulsar, tls) {
+                                @Override
+                                protected ServerCnx newServerCnx(PulsarService pulsar) throws Exception {
+                                    return new ServerCnx(pulsar) {
+
+                                        @Override
+                                        protected void handleFlow(CommandFlow flow) {
+                                            super.handleFlow(flow);
+                                            numFlow.incrementAndGet();
+                                        }
+                                    };
+                                }
+                            };
+                        });
+                return broker;
+            }
+        };
+    }
+
     @Test
     public void testNonDurableSubscription() throws Exception {
         String topicName = "persistent://my-property/my-ns/nonDurable-topic1";
@@ -188,4 +225,22 @@ public class NonDurableSubscriptionTest  extends ProducerConsumerBase {
         }
 
     }
+
+    @Test
+    public void testFlowCountForMultiTopics() throws Exception {
+        String topicName = "persistent://my-property/my-ns/test-flow-count";
+        int numPartitions = 5;
+        admin.topics().createPartitionedTopic(topicName, numPartitions);
+        numFlow.set(0);
+
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName("my-nonDurable-subscriber")
+                .subscriptionMode(SubscriptionMode.NonDurable)
+                .subscribe();
+        consumer.receive(1, TimeUnit.SECONDS);
+        consumer.close();
+
+        assertEquals(numFlow.get(), numPartitions);
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 37856aa..b7fa039 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -755,9 +755,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             boolean firstTimeConnect = subscribeFuture.complete(this);
             // if the consumer is not partitioned or is re-connected and is partitioned, we send the flow
             // command to receive messages.
-            // For readers too (isDurable==false), the partition idx will be set though we have to
-            // send available permits immediately after establishing the reader session
-            if (!(firstTimeConnect && hasParentConsumer && isDurable) && conf.getReceiverQueueSize() != 0) {
+            if (!(firstTimeConnect && hasParentConsumer) && conf.getReceiverQueueSize() != 0) {
                 increaseAvailablePermits(cnx, conf.getReceiverQueueSize());
             }
         }).exceptionally((e) -> {

[pulsar] 06/09: [pulsar-client-tools] fix packages tool parameter desc (#11809)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 40ebefc987f5fb396dba95f82cc725ee4ab564af
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Thu Sep 2 09:45:48 2021 +0800

     [pulsar-client-tools] fix packages tool parameter desc (#11809)
    
    ### Motivation
    
    fix the parameter's description in packages client tool
    
    (cherry picked from commit 44becbf0b15e8c4db8f253603545f04b7fb7e8ba)
---
 .../src/main/java/org/apache/pulsar/admin/cli/CmdPackages.java          | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPackages.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPackages.java
index c922dce..0837592 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPackages.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPackages.java
@@ -104,7 +104,7 @@ class CmdPackages extends CmdBase {
         @DynamicParameter(names = {"--properties", "-P"},  description ="external infromations of a package")
         private Map<String, String> properties = new HashMap<>();
 
-        @Parameter(names = "--path", description = "descriptions of a package", required = true)
+        @Parameter(names = "--path", description = "file path of the package", required = true)
         private String path;
 
         @Override

[pulsar] 07/09: Fix seek at batchIndex level receive duplicated messages (#11826)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6763969dac1b9133c9b80130a00fa0f01051b8c4
Author: Aloys <lo...@gmail.com>
AuthorDate: Thu Sep 2 15:46:35 2021 +0800

    Fix seek at batchIndex level receive duplicated messages (#11826)
    
    Fixes #11825
    
    ### Motivation
    If AcknowledgmentAtBatchIndexLevelEnabled, consumer may receive duplicated messages after seek to batchIndex.
    This pull request tries to resolve this problem.
    
    ### Modifications
    Set the `duringSeek` before reveived seek response.
    
    ### Verifying this change
    
    This change added tests and can be verified as follows:
    SubscriptionSeekTest.testSeekForBatchMessageAndSpecifiedBatchIndex
    
    (cherry picked from commit fe4cd09c297f7ab2a892a6c81bb0998e7ee737f2)
---
 .../broker/service/SubscriptionSeekTest.java       | 83 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 11 ++-
 2 files changed, 91 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index 22e7135..cd584ae 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
@@ -182,6 +183,88 @@ public class SubscriptionSeekTest extends BrokerTestBase {
             assertEquals(receiveId, messageId);
         }
     }
+
+    @Test
+    public void testSeekForBatchMessageAndSpecifiedBatchIndex() throws Exception {
+        final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatch";
+        String subscriptionName = "my-subscription-batch";
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(true)
+                .batchingMaxMessages(3)
+                // set batch max publish delay big enough to make sure entry has 3 messages
+                .batchingMaxPublishDelay(10, TimeUnit.SECONDS)
+                .topic(topicName).create();
+
+
+        List<MessageId> messageIds = new ArrayList<>();
+        List<CompletableFuture<MessageId>> futureMessageIds = new ArrayList<>();
+
+        List<String> messages = new ArrayList<>();
+        for (int i = 0; i < 3; i++) {
+            String message = "my-message-" + i;
+            messages.add(message);
+            CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync(message);
+            futureMessageIds.add(messageIdCompletableFuture);
+        }
+
+        for (CompletableFuture<MessageId> futureMessageId : futureMessageIds) {
+            MessageId messageId = futureMessageId.get();
+            messageIds.add(messageId);
+        }
+
+        producer.close();
+
+        assertTrue(messageIds.get(0) instanceof  BatchMessageIdImpl);
+        assertTrue(messageIds.get(1) instanceof  BatchMessageIdImpl);
+        assertTrue(messageIds.get(2) instanceof  BatchMessageIdImpl);
+
+        BatchMessageIdImpl batchMsgId0 = (BatchMessageIdImpl) messageIds.get(0);
+        BatchMessageIdImpl batchMsgId1 = (BatchMessageIdImpl) messageIds.get(1);
+        BatchMessageIdImpl msgIdToSeekFirst = (BatchMessageIdImpl) messageIds.get(2);
+
+        assertEquals(batchMsgId0.getEntryId(), batchMsgId1.getEntryId());
+        assertEquals(batchMsgId1.getEntryId(), msgIdToSeekFirst.getEntryId());
+
+        PulsarClient newPulsarClient = PulsarClient.builder()
+                // set start backoff interval short enough to make sure client will re-connect quickly
+                .startingBackoffInterval(1, TimeUnit.MICROSECONDS)
+                .serviceUrl(lookupUrl.toString())
+                .build();
+
+        org.apache.pulsar.client.api.Consumer<String> consumer = newPulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .startMessageIdInclusive()
+                .subscribe();
+
+        PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+        assertNotNull(topicRef);
+        assertEquals(topicRef.getSubscriptions().size(), 1);
+
+        consumer.seek(msgIdToSeekFirst);
+        MessageId msgId = consumer.receive().getMessageId();
+        assertTrue(msgId instanceof BatchMessageIdImpl);
+        BatchMessageIdImpl batchMsgId = (BatchMessageIdImpl) msgId;
+        assertEquals(batchMsgId, msgIdToSeekFirst);
+
+
+        consumer.seek(MessageId.earliest);
+        Message<String> receiveBeforEarliest = consumer.receive();
+        assertEquals(receiveBeforEarliest.getValue(), messages.get(0));
+        consumer.seek(MessageId.latest);
+        Message<String> receiveAfterLatest = consumer.receive(1, TimeUnit.SECONDS);
+        assertNull(receiveAfterLatest);
+
+        for (MessageId messageId : messageIds) {
+            consumer.seek(messageId);
+            MessageId receiveId = consumer.receive().getMessageId();
+            assertEquals(receiveId, messageId);
+        }
+
+        newPulsarClient.close();
+    }
+
     @Test
     public void testSeekForBatchByAdmin() throws PulsarClientException, ExecutionException, InterruptedException, PulsarAdminException {
         final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatchByAdmin-" + UUID.randomUUID().toString();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b7fa039..7a20ff4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1802,20 +1802,25 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
         ClientCnx cnx = cnx();
 
-        log.info("[{}][{}] Seek subscription to {}", topic, subscription, seekBy);
+        BatchMessageIdImpl originSeekMessageId = seekMessageId;
+        seekMessageId = new BatchMessageIdImpl((MessageIdImpl) seekId);
+        duringSeek.set(true);
+        log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);
 
         cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
             log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy);
             acknowledgmentsGroupingTracker.flushAndClean();
 
-            seekMessageId = new BatchMessageIdImpl((MessageIdImpl) seekId);
-            duringSeek.set(true);
             lastDequeuedMessageId = MessageId.earliest;
 
             clearIncomingMessages();
             seekFuture.complete(null);
         }).exceptionally(e -> {
+            // re-set duringSeek and seekMessageId if seek failed
+            seekMessageId = originSeekMessageId;
+            duringSeek.set(false);
             log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
+
             seekFuture.completeExceptionally(
                 PulsarClientException.wrap(e.getCause(),
                     String.format("Failed to seek the subscription %s of the topic %s to %s",

[pulsar] 01/09: [Functions] Support KEY_BASED batch builder for Java based functions and sources (#11706)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 720b1d52cf2a530514de67b6a7ab33d3d377de54
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Aug 25 20:33:50 2021 +0300

    [Functions] Support KEY_BASED batch builder for Java based functions and sources (#11706)
    
    * [Functions] Support KEY_BASED batch builder for Java based functions and sources
    
    * Include batchBuilder in ProducerSpec -> ProducerConfig.ProducerConfigBuilder conversion
    
    * Support setting batch builder for sources with "--batch-builder KEY_BASED" argument
    
    (cherry picked from commit b923af16f629bf298ee2e8fec44864a2c8a2615b)
---
 .../pulsar/functions/instance/ContextImpl.java     | 21 ++++++++++-----
 .../functions/instance/JavaInstanceRunnable.java   |  1 +
 .../pulsar/functions/utils/SourceConfigUtils.java  |  7 +++++
 .../functions/utils/SourceConfigUtilsTest.java     | 31 ++++++++++++++++++++++
 4 files changed, 54 insertions(+), 6 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index fc806c1..f3438c4 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -39,6 +39,7 @@ import java.util.stream.Stream;
 import lombok.ToString;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -143,14 +144,22 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
         this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer().blockIfQueueFull(true).enableBatching(true)
                 .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
         boolean useThreadLocalProducers = false;
-        if (config.getFunctionDetails().getSink().getProducerSpec() != null) {
-            if (config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages() != 0) {
-                this.producerBuilder.maxPendingMessages(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages());
+        Function.ProducerSpec producerSpec = config.getFunctionDetails().getSink().getProducerSpec();
+        if (producerSpec != null) {
+            if (producerSpec.getMaxPendingMessages() != 0) {
+                this.producerBuilder.maxPendingMessages(producerSpec.getMaxPendingMessages());
             }
-            if (config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions() != 0) {
-                this.producerBuilder.maxPendingMessagesAcrossPartitions(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions());
+            if (producerSpec.getMaxPendingMessagesAcrossPartitions() != 0) {
+                this.producerBuilder.maxPendingMessagesAcrossPartitions(producerSpec.getMaxPendingMessagesAcrossPartitions());
             }
-            useThreadLocalProducers = config.getFunctionDetails().getSink().getProducerSpec().getUseThreadLocalProducers();
+            if (producerSpec.getBatchBuilder() != null) {
+                if (producerSpec.getBatchBuilder().equals("KEY_BASED")) {
+                    this.producerBuilder.batcherBuilder(BatcherBuilder.KEY_BASED);
+                } else {
+                    this.producerBuilder.batcherBuilder(BatcherBuilder.DEFAULT);
+                }
+            }
+            useThreadLocalProducers = producerSpec.getUseThreadLocalProducers();
         }
         if (useThreadLocalProducers) {
             tlPublishProducers = new ThreadLocal<>();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 3d79c2b..692902e 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -776,6 +776,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                     ProducerConfig.ProducerConfigBuilder builder = ProducerConfig.builder()
                             .maxPendingMessages(conf.getMaxPendingMessages())
                             .maxPendingMessagesAcrossPartitions(conf.getMaxPendingMessagesAcrossPartitions())
+                            .batchBuilder(conf.getBatchBuilder())
                             .useThreadLocalProducers(conf.getUseThreadLocalProducers())
                             .cryptoConfig(CryptoUtils.convertFromSpec(conf.getCryptoSpec()));
                     pulsarSinkConfig.setProducerConfig(builder.build());
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 9049eb6..6450d6e 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -167,6 +167,13 @@ public class SourceConfigUtils {
             sinkSpecBuilder.setProducerSpec(pbldr.build());
         }
 
+        if (sourceConfig.getBatchBuilder() != null) {
+            Function.ProducerSpec.Builder builder = sinkSpecBuilder.getProducerSpec() != null
+                    ? sinkSpecBuilder.getProducerSpec().toBuilder()
+                    : Function.ProducerSpec.newBuilder();
+            sinkSpecBuilder.setProducerSpec(builder.setBatchBuilder(sourceConfig.getBatchBuilder()).build());
+        }
+
         sinkSpecBuilder.setForwardSourceMessageProperty(true);
 
         functionDetailsBuilder.setSink(sinkSpecBuilder);
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index 20a64f8..22b5afa 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -331,6 +331,37 @@ public class SourceConfigUtilsTest extends PowerMockTestCase {
         assertTrue(e.getMessage().contains("Could not validate source config: Field 'configParameter' cannot be null!"));
     }
 
+    @Test
+    public void testSupportsBatchBuilderWhenProducerConfigIsNull() {
+        SourceConfig sourceConfig = createSourceConfig();
+        sourceConfig.setProducerConfig(null);
+        sourceConfig.setBatchBuilder("KEY_BASED");
+        Function.FunctionDetails functionDetails =
+                SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null));
+        assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), "KEY_BASED");
+    }
+
+    @Test
+    public void testSupportsBatchBuilderWhenProducerConfigExists() {
+        SourceConfig sourceConfig = createSourceConfig();
+        sourceConfig.setBatchBuilder("KEY_BASED");
+        sourceConfig.getProducerConfig().setMaxPendingMessages(123456);
+        Function.FunctionDetails functionDetails =
+                SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null));
+        assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), "KEY_BASED");
+        assertEquals(functionDetails.getSink().getProducerSpec().getMaxPendingMessages(), 123456);
+    }
+
+    @Test
+    public void testSupportsBatchBuilderDefinedInProducerConfigWhenTopLevelBatchBuilderIsUndefined() {
+        SourceConfig sourceConfig = createSourceConfig();
+        sourceConfig.setBatchBuilder(null);
+        sourceConfig.getProducerConfig().setBatchBuilder("KEY_BASED");
+        Function.FunctionDetails functionDetails =
+                SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null));
+        assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), "KEY_BASED");
+    }
+
     private SourceConfig createSourceConfigWithBatch() {
         SourceConfig sourceConfig = createSourceConfig();
         BatchSourceConfig batchSourceConfig = createBatchSourceConfig();