You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/01/04 11:05:14 UTC

[GitHub] merlimat closed pull request #1009: Add optional key/value metadata to producers.

merlimat closed pull request #1009: Add optional key/value metadata to producers.
URL: https://github.com/apache/incubator-pulsar/pull/1009
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 4c7bf8297..e97f48933 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -23,6 +23,8 @@
 import static org.apache.pulsar.common.api.Commands.hasChecksum;
 import static org.apache.pulsar.common.api.Commands.readChecksum;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@@ -75,7 +77,10 @@
     private final boolean isNonPersistentTopic;
     private final boolean isEncrypted;
 
-    public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName, String appId, boolean isEncrypted) {
+    private final Map<String, String> metadata;
+
+    public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName, String appId,
+        boolean isEncrypted, Map<String, String> metadata) {
         this.topic = topic;
         this.cnx = cnx;
         this.producerId = producerId;
@@ -85,12 +90,16 @@ public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName
         this.msgIn = new Rate();
         this.isNonPersistentTopic = topic instanceof NonPersistentTopic;
         this.msgDrop = this.isNonPersistentTopic ? new Rate() : null;
+
+        this.metadata = metadata != null ? metadata : Collections.emptyMap();
+
         this.stats = isNonPersistentTopic ? new NonPersistentPublisherStats() : new PublisherStats();
         stats.address = cnx.clientAddress().toString();
         stats.connectedSince = DateFormatter.now();
         stats.clientVersion = cnx.getClientVersion();
         stats.producerName = producerName;
         stats.producerId = producerId;
+        stats.metadata = this.metadata;
 
         this.isRemote = producerName
                 .startsWith(cnx.getBrokerService().pulsar().getConfiguration().getReplicatorPrefix());
@@ -357,6 +366,10 @@ public long getProducerId() {
         return producerId;
     }
 
+    public Map<String, String> getMetadata() {
+        return metadata;
+    }
+
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this).add("topic", topic).add("client", cnx.clientAddress())
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index aaf86a7ee..fbe28873a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -26,6 +26,7 @@
 import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;
 
 import java.net.SocketAddress;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -43,6 +44,7 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.api.CommandUtils;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarHandler;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
@@ -66,6 +68,7 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
 import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.Metadata;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
@@ -457,7 +460,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {
         CompletableFuture<Boolean> authorizationFuture;
         if (service.isAuthorizationEnabled()) {
             authorizationFuture = service.getAuthorizationManager().canProduceAsync(
-                    DestinationName.get(cmdProducer.getTopic().toString()),
+                    DestinationName.get(cmdProducer.getTopic()),
                     originalPrincipal != null ? originalPrincipal : authRole);
         } else {
             authorizationFuture = CompletableFuture.completedFuture(true);
@@ -470,11 +473,22 @@ protected void handleProducer(final CommandProducer cmdProducer) {
         final long producerId = cmdProducer.getProducerId();
         final long requestId = cmdProducer.getRequestId();
         final boolean isEncrypted = cmdProducer.getEncrypted();
+        final Map<String, String> metadata = CommandUtils.metadataFromCommand(cmdProducer);
+
         authorizationFuture.thenApply(isAuthorized -> {
             if (isAuthorized) {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Client is authorized to Produce with role {}", remoteAddress, authRole);
                 }
+
+                try {
+                    Metadata.validateMetadata(metadata);
+                } catch (IllegalArgumentException iae) {
+                    final String msg = iae.getMessage();
+                    ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, msg));
+                    return null;
+                }
+
                 CompletableFuture<Producer> producerFuture = new CompletableFuture<>();
                 CompletableFuture<Producer> existingProducerFuture = producers.putIfAbsent(producerId, producerFuture);
 
@@ -533,7 +547,8 @@ protected void handleProducer(final CommandProducer cmdProducer) {
 
                     disableTcpNoDelayIfNeeded(topicName, producerName);
 
-                    Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole, isEncrypted);
+                    Producer producer =
+                            new Producer(topic, ServerCnx.this, producerId, producerName, authRole, isEncrypted, metadata);
 
                     try {
                         topic.addProducer(producer);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index bb2a0bcc2..e1e9f382f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -38,7 +38,6 @@
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
-import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.net.URL;
@@ -80,7 +79,6 @@
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
@@ -322,7 +320,8 @@ public void testAddRemoveProducer() throws Exception {
 
         String role = "appid1";
         // 1. simple add producer
-        Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", role, false);
+        Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
+                role, false, null);
         topic.addProducer(producer);
         assertEquals(topic.getProducers().size(), 1);
 
@@ -337,7 +336,8 @@ public void testAddRemoveProducer() throws Exception {
 
         // 3. add producer for a different topic
         PersistentTopic failTopic = new PersistentTopic(failTopicName, ledgerMock, brokerService);
-        Producer failProducer = new Producer(failTopic, serverCnx, 2 /* producer id */, "prod-name", role, false);
+        Producer failProducer = new Producer(failTopic, serverCnx, 2 /* producer id */, "prod-name",
+                role, false, null);
         try {
             topic.addProducer(failProducer);
             fail("should have failed");
@@ -480,7 +480,8 @@ public void testDeleteTopic() throws Exception {
 
         // 2. delete topic with producer
         topic = (PersistentTopic) brokerService.getTopic(successTopicName).get();
-        Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", role, false);
+        Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
+                role, false, null);
         topic.addProducer(producer);
 
         assertTrue(topic.delete().isCompletedExceptionally());
@@ -635,7 +636,8 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
         try {
             String role = "appid1";
             Thread.sleep(10); /* delay to ensure that the delete gets executed first */
-            Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", role, false);
+            Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
+                    role, false, null);
             topic.addProducer(producer);
             fail("Should have failed");
         } catch (BrokerServiceException e) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 4a932b133..b5c6f7441 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -37,6 +37,7 @@
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -356,7 +357,7 @@ public void testProducerCommand() throws Exception {
 
         // test PRODUCER success case
         ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
-                "prod-name");
+                "prod-name", Collections.emptyMap());
         channel.writeInbound(clientCommand);
         assertTrue(getResponse() instanceof CommandProducerSuccess);
 
@@ -366,7 +367,8 @@ public void testProducerCommand() throws Exception {
         assertEquals(topicRef.getProducers().size(), 1);
 
         // test PRODUCER error case
-        clientCommand = Commands.newProducer(failTopicName, 2, 2, "prod-name-2");
+        clientCommand = Commands.newProducer(failTopicName, 2, 2,
+                "prod-name-2", Collections.emptyMap());
         channel.writeInbound(clientCommand);
 
         assertTrue(getResponse() instanceof CommandError);
@@ -385,11 +387,12 @@ public void testDuplicateConcurrentProducerCommand() throws Exception {
         doReturn(delayFuture).when(brokerService).getTopic(any(String.class));
         // Create producer first time
         ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
-                "prod-name");
+                "prod-name", Collections.emptyMap());
         channel.writeInbound(clientCommand);
 
         // Create producer second time
-        clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, "prod-name");
+        clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
+                "prod-name", Collections.emptyMap());
         channel.writeInbound(clientCommand);
 
         Object response = getResponse();
@@ -408,7 +411,7 @@ public void testProducerOnNotOwnedTopic() throws Exception {
 
         // test PRODUCER failure case
         ByteBuf clientCommand = Commands.newProducer(nonOwnedTopicName, 1 /* producer id */, 1 /* request id */,
-                "prod-name");
+                "prod-name", Collections.emptyMap());
         channel.writeInbound(clientCommand);
 
         Object response = getResponse();
@@ -433,7 +436,7 @@ public void testProducerCommandWithAuthorizationPositive() throws Exception {
 
         // test PRODUCER success case
         ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
-                "prod-name");
+                "prod-name", Collections.emptyMap());
         channel.writeInbound(clientCommand);
         assertEquals(getResponse().getClass(), CommandProducerSuccess.class);
 
@@ -464,7 +467,7 @@ public void testNonExistentTopic() throws Exception {
         resetChannel();
         setChannelConnected();
         ByteBuf newProducerCmd = Commands.newProducer(nonExistentTopicName, 1 /* producer id */, 1 /* request id */,
-                "prod-name");
+                "prod-name", Collections.emptyMap());
         channel.writeInbound(newProducerCmd);
         assertTrue(getResponse() instanceof CommandError);
         channel.finish();
@@ -490,14 +493,14 @@ public void testClusterAccess() throws Exception {
         resetChannel();
         setChannelConnected();
         ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
-                "prod-name");
+                "prod-name", Collections.emptyMap());
         channel.writeInbound(clientCommand);
         assertTrue(getResponse() instanceof CommandProducerSuccess);
 
         resetChannel();
         setChannelConnected();
         clientCommand = Commands.newProducer(topicWithNonLocalCluster, 1 /* producer id */, 1 /* request id */,
-                "prod-name");
+                "prod-name", Collections.emptyMap());
         channel.writeInbound(clientCommand);
         assertTrue(getResponse() instanceof CommandError);
     }
@@ -513,7 +516,7 @@ public void testNonExistentTopicSuperUserAccess() throws Exception {
         resetChannel();
         setChannelConnected();
         ByteBuf newProducerCmd = Commands.newProducer(nonExistentTopicName, 1 /* producer id */, 1 /* request id */,
-                "prod-name");
+                "prod-name", Collections.emptyMap());
         channel.writeInbound(newProducerCmd);
         assertTrue(getResponse() instanceof CommandProducerSuccess);
 
@@ -545,7 +548,8 @@ public void testProducerCommandWithAuthorizationNegative() throws Exception {
         resetChannel();
         setChannelConnected();
 
-        ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, null);
+        ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
+                null, Collections.emptyMap());
         channel.writeInbound(clientCommand);
         assertTrue(getResponse() instanceof CommandError);
 
@@ -558,7 +562,7 @@ public void testSendCommand() throws Exception {
         setChannelConnected();
 
         ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
-                "prod-name");
+                "prod-name", Collections.emptyMap());
         channel.writeInbound(clientCommand);
         assertTrue(getResponse() instanceof CommandProducerSuccess);
 
@@ -583,12 +587,12 @@ public void testUseSameProducerName() throws Exception {
         String producerName = "my-producer";
 
         ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
-                producerName);
+                producerName, Collections.emptyMap());
         channel.writeInbound(clientCommand1);
         assertTrue(getResponse() instanceof CommandProducerSuccess);
 
         ByteBuf clientCommand2 = Commands.newProducer(successTopicName, 2 /* producer id */, 2 /* request id */,
-                producerName);
+                producerName, Collections.emptyMap());
         channel.writeInbound(clientCommand2);
         assertTrue(getResponse() instanceof CommandError);
 
@@ -605,7 +609,7 @@ public void testRecreateSameProducer() throws Exception {
         String producerName = "my-producer";
 
         ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
-                producerName);
+                producerName, Collections.emptyMap());
         channel.writeInbound(createProducer1);
 
         // Producer create succeeds
@@ -614,7 +618,7 @@ public void testRecreateSameProducer() throws Exception {
         assertEquals(((CommandProducerSuccess) response).getRequestId(), 1);
 
         ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 2 /* request id */,
-                producerName);
+                producerName, Collections.emptyMap());
         channel.writeInbound(createProducer2);
 
         // 2nd producer create succeeds as well
@@ -708,14 +712,14 @@ public void testCreateProducerTimeout() throws Exception {
         String producerName = "my-producer";
 
         ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
-                producerName);
+                producerName, Collections.emptyMap());
         channel.writeInbound(createProducer1);
 
         ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 2 /* request id */ );
         channel.writeInbound(closeProducer);
 
         ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 3 /* request id */,
-                producerName);
+                producerName, Collections.emptyMap());
         channel.writeInbound(createProducer2);
 
         // Complete the topic opening
@@ -767,22 +771,22 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
         String producerName = "my-producer";
 
         ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
-                producerName);
+                producerName, Collections.emptyMap());
         channel.writeInbound(createProducer1);
 
         ByteBuf closeProducer1 = Commands.newCloseProducer(1 /* producer id */, 2 /* request id */ );
         channel.writeInbound(closeProducer1);
 
         ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 3 /* request id */,
-                producerName);
+                producerName, Collections.emptyMap());
         channel.writeInbound(createProducer2);
 
         ByteBuf createProducer3 = Commands.newProducer(successTopicName, 1 /* producer id */, 4 /* request id */,
-                producerName);
+                producerName, Collections.emptyMap());
         channel.writeInbound(createProducer3);
 
         ByteBuf createProducer4 = Commands.newProducer(successTopicName, 1 /* producer id */, 5 /* request id */,
-                producerName);
+                producerName, Collections.emptyMap());
         channel.writeInbound(createProducer4);
 
         // Close succeeds
@@ -846,14 +850,14 @@ public void testCreateProducerBookieTimeout() throws Exception {
         String producerName = "my-producer";
 
         ByteBuf createProducer1 = Commands.newProducer(failTopicName, 1 /* producer id */, 1 /* request id */,
-                producerName);
+                producerName, Collections.emptyMap());
         channel.writeInbound(createProducer1);
 
         ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 2 /* request id */ );
         channel.writeInbound(closeProducer);
 
         ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 3 /* request id */,
-                producerName);
+                producerName, Collections.emptyMap());
         channel.writeInbound(createProducer2);
 
         // Now the topic gets opened
@@ -872,7 +876,7 @@ public void testCreateProducerBookieTimeout() throws Exception {
         // Wait till the failtopic timeout interval
         Thread.sleep(500);
         ByteBuf createProducer3 = Commands.newProducer(successTopicName, 1 /* producer id */, 4 /* request id */,
-                producerName);
+                producerName, Collections.emptyMap());
         channel.writeInbound(createProducer3);
 
         // 3rd producer succeeds
@@ -1218,7 +1222,7 @@ public void testProducerSuccessOnEncryptionRequiredTopic() throws Exception {
 
         // test success case: encrypted producer can connect
         ByteBuf clientCommand = Commands.newProducer(encryptionRequiredTopicName, 1 /* producer id */, 1 /* request id */,
-                "encrypted-producer", true);
+                "encrypted-producer", true, null);
         channel.writeInbound(clientCommand);
 
         Object response = getResponse();
@@ -1246,7 +1250,7 @@ public void testProducerFailureOnEncryptionRequiredTopic() throws Exception {
 
         // test failure case: unencrypted producer cannot connect
         ByteBuf clientCommand = Commands.newProducer(encryptionRequiredTopicName, 2 /* producer id */, 2 /* request id */,
-                "unencrypted-producer", false);
+                "unencrypted-producer", false, null);
         channel.writeInbound(clientCommand);
 
         Object response = getResponse();
@@ -1275,7 +1279,7 @@ public void testSendSuccessOnEncryptionRequiredTopic() throws Exception {
         doReturn(zkDataCache).when(configCacheService).policiesCache();
 
         ByteBuf clientCommand = Commands.newProducer(encryptionRequiredTopicName, 1 /* producer id */, 1 /* request id */,
-                "prod-name", true);
+                "prod-name", true, null);
         channel.writeInbound(clientCommand);
         assertTrue(getResponse() instanceof CommandProducerSuccess);
 
@@ -1310,7 +1314,7 @@ public void testSendFailureOnEncryptionRequiredTopic() throws Exception {
         doReturn(zkDataCache).when(configCacheService).policiesCache();
 
         ByteBuf clientCommand = Commands.newProducer(encryptionRequiredTopicName, 1 /* producer id */, 1 /* request id */,
-                "prod-name", true);
+                "prod-name", true, null);
         channel.writeInbound(clientCommand);
         assertTrue(getResponse() instanceof CommandProducerSuccess);
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
index 88bef9868..8e657c89e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
@@ -22,6 +22,8 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
@@ -56,6 +58,8 @@
     // Cannot use Optional<Long> since it's not serializable
     private Long initialSequenceId = null;
 
+    private final Map<String, String> properties = new HashMap<>();
+
     public enum MessageRoutingMode {
         SinglePartition, RoundRobinPartition, CustomPartition
     }
@@ -423,6 +427,35 @@ public ProducerConfiguration setInitialSequenceId(long initialSequenceId) {
         return this;
     }
 
+    /**
+     * Set a name/value property with this producer.
+     * @param key
+     * @param value
+     * @return
+     */
+    public ProducerConfiguration setProperty(String key, String value) {
+        checkArgument(key != null);
+        checkArgument(value != null);
+        properties.put(key, value);
+        return this;
+    }
+
+    /**
+     * Add all the properties in the provided map
+     * @param properties
+     * @return
+     */
+    public ProducerConfiguration setProperties(Map<String, String> properties) {
+        if (properties != null) {
+            this.properties.putAll(properties);
+        }
+        return this;
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
     @Override
     public boolean equals(Object obj) {
         if (obj instanceof ProducerConfiguration) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index b0f45d9be..af9214302 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -26,7 +26,10 @@
 import static org.apache.pulsar.common.api.Commands.readChecksum;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
@@ -98,6 +101,8 @@
     
     private ScheduledFuture<?> keyGeneratorTask = null;
 
+    private final Map<String, String> metadata;
+
     private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater
             .newUpdater(ProducerImpl.class, "msgIdGenerator");
 
@@ -158,6 +163,13 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
         } else {
             stats = ProducerStats.PRODUCER_STATS_DISABLED;
         }
+
+        if (conf.getProperties().isEmpty()) {
+            metadata = Collections.emptyMap();
+        } else {
+            metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
+        }
+
         grabCnx();
     }
 
@@ -800,7 +812,8 @@ void connectionOpened(final ClientCnx cnx) {
 
         long requestId = client.newRequestId();
 
-        cnx.sendRequestWithId(Commands.newProducer(topic, producerId, requestId, producerName, conf.isEncryptionEnabled()), requestId)
+        cnx.sendRequestWithId(Commands.newProducer(topic, producerId, requestId, producerName,
+                conf.isEncryptionEnabled(), metadata), requestId)
                 .thenAccept(pair -> {
                     String producerName = pair.getLeft();
                     long lastSequenceId = pair.getRight();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/CommandUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/CommandUtils.java
new file mode 100644
index 000000000..3ebbb011b
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/CommandUtils.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.api;
+
+import org.apache.pulsar.common.api.proto.PulsarApi;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public final class CommandUtils {
+
+    private CommandUtils() {}
+
+    public static Map<String, String> metadataFromCommand(PulsarApi.CommandProducer commandProducer) {
+        return toMap(commandProducer.getMetadataList());
+    }
+
+    static List<PulsarApi.KeyValue> toKeyValueList(Map<String, String> metadata) {
+        if (metadata == null || metadata.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        return metadata.entrySet().stream().map(e ->
+                PulsarApi.KeyValue.newBuilder().setKey(e.getKey()).setValue(e.getValue()).build())
+                .collect(Collectors.toList());
+    }
+
+    private static Map<String, String> toMap(List<PulsarApi.KeyValue> keyValues) {
+        if (keyValues == null || keyValues.isEmpty()) {
+            return Collections.emptyMap();
+        }
+
+        return keyValues.stream()
+                .collect(Collectors.toMap(PulsarApi.KeyValue::getKey, PulsarApi.KeyValue::getValue));
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index aea9abbc6..4e9a2546c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -22,7 +22,9 @@
 import static org.apache.pulsar.checksum.utils.Crc32cChecksum.resumeChecksum;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.AuthMethod;
@@ -400,11 +402,13 @@ public static ByteBuf newCloseProducer(long producerId, long requestId) {
         return res;
     }
 
-    public static ByteBuf newProducer(String topic, long producerId, long requestId, String producerName) {
-        return newProducer(topic, producerId, requestId, producerName, false);
+    public static ByteBuf newProducer(String topic, long producerId, long requestId, String producerName,
+                Map<String, String> metadata) {
+        return newProducer(topic, producerId, requestId, producerName, false, metadata);
     }
 
-    public static ByteBuf newProducer(String topic, long producerId, long requestId, String producerName, boolean encrypted) {
+    public static ByteBuf newProducer(String topic, long producerId, long requestId, String producerName,
+                boolean encrypted, Map<String, String> metadata) {
         CommandProducer.Builder producerBuilder = CommandProducer.newBuilder();
         producerBuilder.setTopic(topic);
         producerBuilder.setProducerId(producerId);
@@ -414,6 +418,8 @@ public static ByteBuf newProducer(String topic, long producerId, long requestId,
         }
         producerBuilder.setEncrypted(encrypted);
 
+        producerBuilder.addAllMetadata(CommandUtils.toKeyValueList(metadata));
+
         CommandProducer producer = producerBuilder.build();
         ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.PRODUCER).setProducer(producer));
         producerBuilder.recycle();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index b7cf8a4e5..58af213dc 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -8669,6 +8669,12 @@ public Builder clearProxyThroughServiceUrl() {
     // optional bool encrypted = 5 [default = false];
     boolean hasEncrypted();
     boolean getEncrypted();
+    
+    // repeated .pulsar.proto.KeyValue metadata = 6;
+    java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue> 
+        getMetadataList();
+    org.apache.pulsar.common.api.proto.PulsarApi.KeyValue getMetadata(int index);
+    int getMetadataCount();
   }
   public static final class CommandProducer extends
       com.google.protobuf.GeneratedMessageLite
@@ -8801,12 +8807,34 @@ public boolean getEncrypted() {
       return encrypted_;
     }
     
+    // repeated .pulsar.proto.KeyValue metadata = 6;
+    public static final int METADATA_FIELD_NUMBER = 6;
+    private java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue> metadata_;
+    public java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue> getMetadataList() {
+      return metadata_;
+    }
+    public java.util.List<? extends org.apache.pulsar.common.api.proto.PulsarApi.KeyValueOrBuilder> 
+        getMetadataOrBuilderList() {
+      return metadata_;
+    }
+    public int getMetadataCount() {
+      return metadata_.size();
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.KeyValue getMetadata(int index) {
+      return metadata_.get(index);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.KeyValueOrBuilder getMetadataOrBuilder(
+        int index) {
+      return metadata_.get(index);
+    }
+    
     private void initFields() {
       topic_ = "";
       producerId_ = 0L;
       requestId_ = 0L;
       producerName_ = "";
       encrypted_ = false;
+      metadata_ = java.util.Collections.emptyList();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -8825,6 +8853,12 @@ public final boolean isInitialized() {
         memoizedIsInitialized = 0;
         return false;
       }
+      for (int i = 0; i < getMetadataCount(); i++) {
+        if (!getMetadata(i).isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -8852,6 +8886,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
       if (((bitField0_ & 0x00000010) == 0x00000010)) {
         output.writeBool(5, encrypted_);
       }
+      for (int i = 0; i < metadata_.size(); i++) {
+        output.writeMessage(6, metadata_.get(i));
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -8880,6 +8917,10 @@ public int getSerializedSize() {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(5, encrypted_);
       }
+      for (int i = 0; i < metadata_.size(); i++) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(6, metadata_.get(i));
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -9003,6 +9044,8 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x00000008);
         encrypted_ = false;
         bitField0_ = (bitField0_ & ~0x00000010);
+        metadata_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000020);
         return this;
       }
       
@@ -9056,6 +9099,11 @@ public Builder clone() {
           to_bitField0_ |= 0x00000010;
         }
         result.encrypted_ = encrypted_;
+        if (((bitField0_ & 0x00000020) == 0x00000020)) {
+          metadata_ = java.util.Collections.unmodifiableList(metadata_);
+          bitField0_ = (bitField0_ & ~0x00000020);
+        }
+        result.metadata_ = metadata_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -9077,6 +9125,16 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandPro
         if (other.hasEncrypted()) {
           setEncrypted(other.getEncrypted());
         }
+        if (!other.metadata_.isEmpty()) {
+          if (metadata_.isEmpty()) {
+            metadata_ = other.metadata_;
+            bitField0_ = (bitField0_ & ~0x00000020);
+          } else {
+            ensureMetadataIsMutable();
+            metadata_.addAll(other.metadata_);
+          }
+          
+        }
         return this;
       }
       
@@ -9093,6 +9151,12 @@ public final boolean isInitialized() {
           
           return false;
         }
+        for (int i = 0; i < getMetadataCount(); i++) {
+          if (!getMetadata(i).isInitialized()) {
+            
+            return false;
+          }
+        }
         return true;
       }
       
@@ -9143,6 +9207,12 @@ public Builder mergeFrom(
               encrypted_ = input.readBool();
               break;
             }
+            case 50: {
+              org.apache.pulsar.common.api.proto.PulsarApi.KeyValue.Builder subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.KeyValue.newBuilder();
+              input.readMessage(subBuilder, extensionRegistry);
+              addMetadata(subBuilder.buildPartial());
+              break;
+            }
           }
         }
       }
@@ -9284,6 +9354,95 @@ public Builder clearEncrypted() {
         return this;
       }
       
+      // repeated .pulsar.proto.KeyValue metadata = 6;
+      private java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue> metadata_ =
+        java.util.Collections.emptyList();
+      private void ensureMetadataIsMutable() {
+        if (!((bitField0_ & 0x00000020) == 0x00000020)) {
+          metadata_ = new java.util.ArrayList<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue>(metadata_);
+          bitField0_ |= 0x00000020;
+         }
+      }
+      
+      public java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue> getMetadataList() {
+        return java.util.Collections.unmodifiableList(metadata_);
+      }
+      public int getMetadataCount() {
+        return metadata_.size();
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.KeyValue getMetadata(int index) {
+        return metadata_.get(index);
+      }
+      public Builder setMetadata(
+          int index, org.apache.pulsar.common.api.proto.PulsarApi.KeyValue value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureMetadataIsMutable();
+        metadata_.set(index, value);
+        
+        return this;
+      }
+      public Builder setMetadata(
+          int index, org.apache.pulsar.common.api.proto.PulsarApi.KeyValue.Builder builderForValue) {
+        ensureMetadataIsMutable();
+        metadata_.set(index, builderForValue.build());
+        
+        return this;
+      }
+      public Builder addMetadata(org.apache.pulsar.common.api.proto.PulsarApi.KeyValue value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureMetadataIsMutable();
+        metadata_.add(value);
+        
+        return this;
+      }
+      public Builder addMetadata(
+          int index, org.apache.pulsar.common.api.proto.PulsarApi.KeyValue value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureMetadataIsMutable();
+        metadata_.add(index, value);
+        
+        return this;
+      }
+      public Builder addMetadata(
+          org.apache.pulsar.common.api.proto.PulsarApi.KeyValue.Builder builderForValue) {
+        ensureMetadataIsMutable();
+        metadata_.add(builderForValue.build());
+        
+        return this;
+      }
+      public Builder addMetadata(
+          int index, org.apache.pulsar.common.api.proto.PulsarApi.KeyValue.Builder builderForValue) {
+        ensureMetadataIsMutable();
+        metadata_.add(index, builderForValue.build());
+        
+        return this;
+      }
+      public Builder addAllMetadata(
+          java.lang.Iterable<? extends org.apache.pulsar.common.api.proto.PulsarApi.KeyValue> values) {
+        ensureMetadataIsMutable();
+        super.addAll(values, metadata_);
+        
+        return this;
+      }
+      public Builder clearMetadata() {
+        metadata_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000020);
+        
+        return this;
+      }
+      public Builder removeMetadata(int index) {
+        ensureMetadataIsMutable();
+        metadata_.remove(index);
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandProducer)
     }
     
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Metadata.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Metadata.java
new file mode 100644
index 000000000..283b4f848
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Metadata.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import java.util.Map;
+
+public class Metadata {
+
+    private static final int MAX_METADATA_SIZE = 1024; // 1 Kb
+
+    private Metadata() {}
+
+    public static void validateMetadata(Map<String, String> metadata) throws IllegalArgumentException {
+        if (metadata == null) {
+            return;
+        }
+
+        int size = 0;
+        for (Map.Entry<String, String> e : metadata.entrySet()) {
+            size += (e.getKey().length() + e.getValue().length());
+            if (size > MAX_METADATA_SIZE) {
+                throw new IllegalArgumentException(getErrorMessage());
+            }
+        }
+    }
+
+    private static String getErrorMessage() {
+        return "metadata has a max size of 1 Kb";
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java
index ce3f67e3d..4819b35ab 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.common.policies.data;
 
+import java.util.Map;
+
 import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
@@ -47,6 +49,9 @@
     /** Client library version */
     public String clientVersion;
 
+    /** Metadata (key/value strings) associated with this publisher */
+    public Map<String, String> metadata;
+
     public PublisherStats add(PublisherStats stats) {
         checkNotNull(stats);
         this.msgRateIn += stats.msgRateIn;
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index b94b83afa..a6de6f20a 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -232,6 +232,9 @@ message CommandProducer {
 	optional string producer_name = 4;
 
 	optional bool encrypted       = 5 [default = false];
+
+    /// Add optional metadata key=value to this producer
+    repeated KeyValue metadata    = 6;
 }
 
 message CommandSend {
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/api/CommandUtilsTests.java b/pulsar-common/src/test/java/org/apache/pulsar/common/api/CommandUtilsTests.java
new file mode 100644
index 000000000..cfdd8a806
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/api/CommandUtilsTests.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.api;
+
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.junit.Test;
+import org.testng.Assert;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class CommandUtilsTests {
+
+    @Test
+    public void testToKeyValueList() {
+        List<PulsarApi.KeyValue> keyValues = CommandUtils.toKeyValueList(null);
+        Assert.assertNotNull(keyValues);
+        Assert.assertTrue(keyValues.isEmpty());
+
+        final Map<String, String> metadata = new HashMap<>();
+        metadata.put("key1", "value1");
+
+        keyValues = CommandUtils.toKeyValueList(metadata);
+        Assert.assertEquals(keyValues.size(), keyValues.size());
+        PulsarApi.KeyValue kv = keyValues.get(0);
+        final Map.Entry<String, String> entry = metadata.entrySet().iterator().next();
+        Assert.assertEquals(kv.getKey(), entry.getKey());
+        Assert.assertEquals(kv.getValue(), entry.getValue());
+    }
+
+    @Test
+    public void testMetadataFromCommand() {
+        Map<String, String> metadata = CommandUtils.metadataFromCommand(newCommand(null, null));
+        Assert.assertNotNull(metadata);
+        Assert.assertTrue(metadata.isEmpty());
+
+        final String key = "key";
+        final String value = "value";
+
+        PulsarApi.CommandProducer cmd = newCommand(key, value);
+        metadata = CommandUtils.metadataFromCommand(cmd);
+        Assert.assertEquals(1, metadata.size());
+        final Map.Entry<String, String> entry = metadata.entrySet().iterator().next();
+        Assert.assertEquals(key, entry.getKey());
+        Assert.assertEquals(value, entry.getValue());
+    }
+
+    private PulsarApi.CommandProducer newCommand(String key, String value) {
+        PulsarApi.CommandProducer.Builder cmd = PulsarApi.CommandProducer.newBuilder()
+                .setProducerId(1)
+                .setRequestId(1)
+                .setTopic("my-topic")
+                .setProducerName("producer");
+
+        if (key != null && value != null) {
+            cmd.addMetadata(PulsarApi.KeyValue.newBuilder().setKey(key).setValue(value).build());
+        }
+
+        return cmd.build();
+    }
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/MetadataTests.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/MetadataTests.java
new file mode 100644
index 000000000..9e60c8286
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/MetadataTests.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class MetadataTests {
+
+    @Test
+    public void testValidMetadata() {
+        Map<String, String> metadata = new HashMap<>();
+
+        metadata.put(generateKey(1, 512), generateKey(1, 512));
+        Assert.assertTrue(validateMetadata(metadata));
+
+        metadata.clear();
+        metadata.put(generateKey(1, 512), generateKey(1, 511));
+        Assert.assertTrue(validateMetadata(metadata));
+
+        metadata.clear();
+        metadata.put(generateKey(1, 256), generateKey(1, 256));
+        metadata.put(generateKey(2, 256), generateKey(2, 256));
+        Assert.assertTrue(validateMetadata(metadata));
+
+        metadata.clear();
+        metadata.put(generateKey(1, 256), generateKey(1, 256));
+        metadata.put(generateKey(2, 256), generateKey(2, 255));
+        Assert.assertTrue(validateMetadata(metadata));
+    }
+
+    @Test
+    public void testInvalidMetadata() {
+        Map<String, String> metadata = new HashMap<>();
+
+        metadata.put(generateKey(1, 512), generateKey(1, 513));
+        Assert.assertFalse(validateMetadata(metadata));
+
+        metadata.clear();
+        metadata.put(generateKey(1, 256), generateKey(1, 256));
+        metadata.put(generateKey(2, 256), generateKey(2, 257));
+        Assert.assertFalse(validateMetadata(metadata));
+
+
+        metadata.clear();
+        metadata.put(generateKey(1, 256), generateKey(1, 256));
+        metadata.put(generateKey(2, 256), generateKey(2, 256));
+        metadata.put(generateKey(3, 1), generateKey(3, 1));
+        Assert.assertFalse(validateMetadata(metadata));
+    }
+
+    private static boolean validateMetadata(Map<String, String> metadata) {
+        try {
+            Metadata.validateMetadata(metadata);
+            return true;
+        } catch (IllegalArgumentException ignore) {
+            return false;
+        }
+    }
+
+    private static String generateKey(int number, int length) {
+        return IntStream.generate(() -> number).limit(length).mapToObj(Integer::toString)
+                .collect(Collectors.joining(""));
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services