You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2020/05/15 21:16:26 UTC

[samza] branch master updated: SAMZA-2522: Add metadata generator for azure storage blob (#1358)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f24f15  SAMZA-2522: Add metadata generator for azure storage blob (#1358)
9f24f15 is described below

commit 9f24f1526d8f1c47e6e8fa4d9f68577cc2995f69
Author: lakshmi-manasa-g <mg...@linkedin.com>
AuthorDate: Fri May 15 14:16:17 2020 -0700

    SAMZA-2522: Add metadata generator for azure storage blob (#1358)
    
    API changes: New interfaces BlobMetadataGeneratorFactory, BlobMetadataGenerator and class BlobMetadataContext added. New configs with prefix systems..azureblob.metadataGeneratorConfig. The factory is wired in through systems..azureblob.metadataPropertiesGeneratorFactory.
    Prior to this PR, "rawSizeBytes" metadata was always added by default to the blobs committed. Now, that will not be the case. A BlobMetadataGenerator implementation has to be provided to add this.
    
    Upgrade instructions: Backwards incompatible. "rawSizeBytes" metadata will no longer be added by default. Instead a generator will have to be provided.
    
    Usage instructions: Wire in the generator factory through systems..azureblob.metadataPropertiesGeneratorFactory and pass in additional configs with prefix systems..azureblob.metadataGeneratorConfig
---
 .../versioned/jobs/samza-configurations.md         |  2 +
 .../samza/system/azureblob/AzureBlobConfig.java    | 24 ++++++++++-
 .../system/azureblob/avro/AzureBlobAvroWriter.java | 23 +++++++++-
 .../azureblob/avro/AzureBlobAvroWriterFactory.java |  6 ++-
 .../azureblob/avro/AzureBlobOutputStream.java      | 40 +++++++++++++++---
 .../producer/AzureBlobSystemProducer.java          | 18 +++++++-
 .../azureblob/producer/AzureBlobWriterFactory.java |  5 +++
 .../azureblob/utils/BlobMetadataContext.java       | 41 ++++++++++++++++++
 .../azureblob/utils/BlobMetadataGenerator.java     | 37 ++++++++++++++++
 .../utils/BlobMetadataGeneratorFactory.java        | 31 ++++++++++++++
 .../azureblob/utils/NullBlobMetadataGenerator.java | 33 +++++++++++++++
 .../utils/NullBlobMetadataGeneratorFactory.java    | 33 +++++++++++++++
 .../azureblob/avro/TestAzureBlobAvroWriter.java    | 32 +++++++++-----
 .../azureblob/avro/TestAzureBlobOutputStream.java  | 33 ++++++++++++++-
 .../producer/TestAzureBlobSystemProducer.java      | 20 +++++++--
 .../utils/TestNullBlobMetadataGenerator.java       | 49 ++++++++++++++++++++++
 16 files changed, 399 insertions(+), 28 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 5d4daed..932c4c8 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -271,6 +271,8 @@ Configs for producing to [Azure Blob Storage](https://azure.microsoft.com/en-us/
 |systems.**_system-name_**.azureblob.flushTimeoutMs|180000 (3 mins)|timeout to finish uploading all blocks before committing a blob.|
 |systems.**_system-name_**.azureblob.closeTimeoutMs|300000 (5 mins)|timeout to finish committing all the blobs currently being written to. This does not include the flush timeout per blob.|
 |systems.**_system-name_**.azureblob.suffixRandomStringToBlobName|true|if true, a random string of 8 chars is suffixed to the blob name to prevent name collision when more than one Samza tasks are writing to the same SSP.|
+|systems.**_system-name_**.azureblob.metadataPropertiesGeneratorFactory|`org.apache.samza.system.`<br>`azureblob.utils.`<br>`NullBlobMetadataGeneratorFactory`|Fully qualified class name of the `org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory` impl for the system producer. <br><br>The default metadata generator does not add any metadata to the blob.| 
+|systems.**_system-name_**.azureblob.metadataGeneratorConfig| |Additional configs for the metadata generator should be prefixed with this string which is passed to the generator.<br>For example, to pass a "key":"value" pair to the metadata generator, add config like systems.<system-name>.azureblob.metadataGeneratorConfig.\<key\> with value \<value\>| 
 
 
 ### <a name="state-storage"></a>[4. State Storage](#state-storage)
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java
index 92d76c4..58f206e 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java
@@ -39,7 +39,8 @@ public class AzureBlobConfig extends MapConfig {
   public static final String SYSTEM_WRITER_FACTORY_CLASS_NAME_DEFAULT = "org.apache.samza.system.azureblob.avro.AzureBlobAvroWriterFactory";
 
   // Azure Storage Account name under which the Azure container representing this system is.
-  // System name = Azure container name (https://docs.microsoft.com/en-us/rest/api/storageservices/naming-and-referencing-containers--blobs--and-metadata#container-names)
+  // System name = Azure container name
+  // (https://docs.microsoft.com/en-us/rest/api/storageservices/naming-and-referencing-containers--blobs--and-metadata#container-names)
   public static final String SYSTEM_AZURE_ACCOUNT_NAME = Config.SENSITIVE_PREFIX + SYSTEM_AZUREBLOB_PREFIX + "account.name";
 
   // Azure Storage Account key associated with the Azure Storage Account
@@ -94,6 +95,18 @@ public class AzureBlobConfig extends MapConfig {
   public static final String SYSTEM_SUFFIX_RANDOM_STRING_TO_BLOB_NAME = SYSTEM_AZUREBLOB_PREFIX + "suffixRandomStringToBlobName";
   private static final boolean SYSTEM_SUFFIX_RANDOM_STRING_TO_BLOB_NAME_DEFAULT = true;
 
+  // full class name of an implementation of org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory
+  // this factory should return an implementation of org.apache.samza.system.azureblob.utils.BlobMetadataGenerator
+  // this generator will be invoked when a blob is committed to add metadata properties to it
+  public static final String SYSTEM_BLOB_METADATA_PROPERTIES_GENERATOR_FACTORY = SYSTEM_AZUREBLOB_PREFIX + "metadataPropertiesGeneratorFactory";
+  private static final String SYSTEM_BLOB_METADATA_PROPERTIES_GENERATOR_FACTORY_DEFAULT =
+      "org.apache.samza.system.azureblob.utils.NullBlobMetadataGeneratorFactory";
+
+  // Additional configs for the metadata generator should be prefixed with this string which is passed to the generator.
+  // for example, to pass a "key":"value" pair to the metadata generator, add config like
+  // systems.<system-name>.azureblob.metadataGeneratorConfig.<key> with value <value>
+  public static final String SYSTEM_BLOB_METADATA_GENERATOR_CONFIG_PREFIX = SYSTEM_AZUREBLOB_PREFIX + "metadataGeneratorConfig.";
+
   public AzureBlobConfig(Config config) {
     super(config);
   }
@@ -185,4 +198,13 @@ public class AzureBlobConfig extends MapConfig {
   public long getMaxMessagesPerBlob(String systemName) {
     return getLong(String.format(SYSTEM_MAX_MESSAGES_PER_BLOB, systemName), SYSTEM_MAX_MESSAGES_PER_BLOB_DEFAULT);
   }
+
+  public String getSystemBlobMetadataPropertiesGeneratorFactory(String systemName) {
+    return get(String.format(SYSTEM_BLOB_METADATA_PROPERTIES_GENERATOR_FACTORY, systemName),
+        SYSTEM_BLOB_METADATA_PROPERTIES_GENERATOR_FACTORY_DEFAULT);
+  }
+
+  public Config getSystemBlobMetadataGeneratorConfigs(String systemName) {
+    return subset(String.format(SYSTEM_BLOB_METADATA_GENERATOR_CONFIG_PREFIX, systemName));
+  }
 }
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
index 7f9a926..6e74461 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
@@ -46,7 +46,9 @@ import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
 import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -109,9 +111,13 @@ public class AzureBlobAvroWriter implements AzureBlobWriter {
   private final boolean useRandomStringInBlobName;
   private final Object currentDataFileWriterLock = new Object();
   private volatile long recordsInCurrentBlob = 0;
+  private BlobMetadataGeneratorFactory blobMetadataGeneratorFactory;
+  private Config blobMetadataGeneratorConfig;
+  private String streamName;
 
   public AzureBlobAvroWriter(BlobContainerAsyncClient containerAsyncClient, String blobURLPrefix,
       Executor blobThreadPool, AzureBlobWriterMetrics metrics,
+      BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
       int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
       long maxBlobSize, long maxRecordsPerBlob) {
 
@@ -125,6 +131,9 @@ public class AzureBlobAvroWriter implements AzureBlobWriter {
     this.useRandomStringInBlobName = useRandomStringInBlobName;
     this.maxBlobSize = maxBlobSize;
     this.maxRecordsPerBlob = maxRecordsPerBlob;
+    this.blobMetadataGeneratorFactory = blobMetadataGeneratorFactory;
+    this.blobMetadataGeneratorConfig = blobMetadataGeneratorConfig;
+    this.streamName = streamName;
   }
 
   /**
@@ -217,6 +226,7 @@ public class AzureBlobAvroWriter implements AzureBlobWriter {
       Executor blobThreadPool, int maxBlockFlushThresholdSize, int flushTimeoutMs, String blobURLPrefix,
       DataFileWriter<IndexedRecord> dataFileWriter,
       AzureBlobOutputStream azureBlobOutputStream, BlockBlobAsyncClient blockBlobAsyncClient,
+      BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
       long maxBlobSize, long maxRecordsPerBlob, Compression compression, boolean useRandomStringInBlobName) {
     if (dataFileWriter == null || azureBlobOutputStream == null || blockBlobAsyncClient == null) {
       this.currentBlobWriterComponents = null;
@@ -235,6 +245,9 @@ public class AzureBlobAvroWriter implements AzureBlobWriter {
     this.useRandomStringInBlobName = useRandomStringInBlobName;
     this.maxBlobSize = maxBlobSize;
     this.maxRecordsPerBlob = maxRecordsPerBlob;
+    this.blobMetadataGeneratorFactory = blobMetadataGeneratorFactory;
+    this.blobMetadataGeneratorConfig = blobMetadataGeneratorConfig;
+    this.streamName = streamName;
   }
 
   @VisibleForTesting
@@ -318,8 +331,14 @@ public class AzureBlobAvroWriter implements AzureBlobWriter {
     BlockBlobAsyncClient blockBlobAsyncClient = containerAsyncClient.getBlobAsyncClient(blobURL).getBlockBlobAsyncClient();
 
     DataFileWriter<IndexedRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
-    AzureBlobOutputStream azureBlobOutputStream = new AzureBlobOutputStream(blockBlobAsyncClient, blobThreadPool, metrics,
-            flushTimeoutMs, maxBlockFlushThresholdSize, compression);
+    AzureBlobOutputStream azureBlobOutputStream;
+    try {
+      azureBlobOutputStream = new AzureBlobOutputStream(blockBlobAsyncClient, blobThreadPool, metrics,
+          blobMetadataGeneratorFactory, blobMetadataGeneratorConfig,
+          streamName, flushTimeoutMs, maxBlockFlushThresholdSize, compression);
+    } catch (Exception e) {
+      throw new SamzaException("Unable to create AzureBlobOutputStream", e);
+    }
     dataFileWriter.create(schema, azureBlobOutputStream);
     dataFileWriter.setFlushOnEveryBlock(false);
     this.currentBlobWriterComponents = new BlobWriterComponents(dataFileWriter, azureBlobOutputStream, blockBlobAsyncClient);
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriterFactory.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriterFactory.java
index 2510766..0a4e019 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriterFactory.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriterFactory.java
@@ -26,6 +26,8 @@ import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
 import com.azure.storage.blob.BlobContainerAsyncClient;
 import java.io.IOException;
 import java.util.concurrent.Executor;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
 
 
 public class AzureBlobAvroWriterFactory implements AzureBlobWriterFactory {
@@ -35,9 +37,11 @@ public class AzureBlobAvroWriterFactory implements AzureBlobWriterFactory {
    */
   public AzureBlobWriter getWriterInstance(BlobContainerAsyncClient containerAsyncClient, String blobURL,
       Executor blobUploadThreadPool, AzureBlobWriterMetrics metrics,
+      BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
       int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
       long maxBlobSize, long maxMessagesPerBlob) throws IOException {
     return new AzureBlobAvroWriter(containerAsyncClient, blobURL, blobUploadThreadPool, metrics,
-        maxBlockFlushThresholdSize, flushTimeoutMs, compression, useRandomStringInBlobName, maxBlobSize, maxMessagesPerBlob);
+          blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, streamName, maxBlockFlushThresholdSize, flushTimeoutMs,
+          compression, useRandomStringInBlobName, maxBlobSize, maxMessagesPerBlob);
   }
 }
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
index 9db15a3..716e488 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
@@ -30,7 +30,6 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Base64;
-import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -38,6 +37,10 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.azureblob.utils.BlobMetadataContext;
+import org.apache.samza.system.azureblob.utils.BlobMetadataGenerator;
+import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Flux;
@@ -68,7 +71,6 @@ public class AzureBlobOutputStream extends OutputStream {
   private static final Logger LOG = LoggerFactory.getLogger(AzureBlobOutputStream.class);
   private static final int MAX_ATTEMPT = 3;
   private static final int MAX_BLOCKS_IN_AZURE_BLOB = 50000;
-  public static final String BLOB_RAW_SIZE_BYTES_METADATA = "rawSizeBytes";
   private final long flushTimeoutMs;
   private final BlockBlobAsyncClient blobAsyncClient;
   private final Executor blobThreadPool;
@@ -84,10 +86,27 @@ public class AzureBlobOutputStream extends OutputStream {
   private volatile boolean isClosed = false;
   private long totalUploadedBlockSize = 0;
   private int blockNum;
+  private final BlobMetadataGeneratorFactory blobMetadataGeneratorFactory;
+  private final Config blobMetadataGeneratorConfig;
+  private String streamName;
 
+  /**
+   *
+   * @param blobAsyncClient Client to communicate with Azure Blob Storage.
+   * @param blobThreadPool threads to be used for uploading blocks to Azure Blob Storage.
+   * @param metrics needed for emitting metrics about bytes written, blocks uploaded, blobs committed.
+   * @param blobMetadataGeneratorFactory impl of {@link org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory}
+   *                                   to be used for generating metadata properties for a blob
+   * @param streamName name of the stream to which the blob generated corresponds to. Used in metadata properties.
+   * @param flushTimeoutMs timeout for uploading a block
+   * @param maxBlockFlushThresholdSize max block size
+   * @param compression type of compression to be used before uploading a block
+   */
   public AzureBlobOutputStream(BlockBlobAsyncClient blobAsyncClient, Executor blobThreadPool, AzureBlobWriterMetrics metrics,
+      BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
       long flushTimeoutMs, int maxBlockFlushThresholdSize, Compression compression) {
-    this(blobAsyncClient, blobThreadPool, metrics, flushTimeoutMs, maxBlockFlushThresholdSize,
+    this(blobAsyncClient, blobThreadPool, metrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, streamName,
+        flushTimeoutMs, maxBlockFlushThresholdSize,
         new ByteArrayOutputStream(maxBlockFlushThresholdSize), compression);
   }
 
@@ -148,6 +167,8 @@ public class AzureBlobOutputStream extends OutputStream {
    *       - byteArrayOutputStream.close fails or
    *       - any of the pending uploads fails or
    *       - blob's commitBlockList fails
+   * throws ClassNotFoundException or IllegalAccessException or InstantiationException
+   *       - while creating an instance of BlobMetadataGenerator
    */
   @Override
   public synchronized void close() {
@@ -172,8 +193,8 @@ public class AzureBlobOutputStream extends OutputStream {
       future.get((long) flushTimeoutMs, TimeUnit.MILLISECONDS);
       LOG.info("For blob: {} committing blockList size:{}", blobAsyncClient.getBlobUrl().toString(), blockList.size());
       metrics.updateAzureCommitMetrics();
-      Map<String, String> blobMetadata = Collections.singletonMap(BLOB_RAW_SIZE_BYTES_METADATA, Long.toString(totalUploadedBlockSize));
-      commitBlob(blockList, blobMetadata);
+      BlobMetadataGenerator blobMetadataGenerator = getBlobMetadataGenerator();
+      commitBlob(blockList, blobMetadataGenerator.getBlobMetadata(new BlobMetadataContext(streamName, totalUploadedBlockSize)));
     } catch (Exception e) {
       String msg = String.format("Close blob %s failed with exception. Total pending sends %d",
           blobAsyncClient.getBlobUrl().toString(), pendingUpload.size());
@@ -211,6 +232,7 @@ public class AzureBlobOutputStream extends OutputStream {
 
   @VisibleForTesting
   AzureBlobOutputStream(BlockBlobAsyncClient blobAsyncClient, Executor blobThreadPool, AzureBlobWriterMetrics metrics,
+      BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
       long flushTimeoutMs, int maxBlockFlushThresholdSize,
       ByteArrayOutputStream byteArrayOutputStream, Compression compression) {
     this.byteArrayOutputStream = Optional.of(byteArrayOutputStream);
@@ -222,6 +244,9 @@ public class AzureBlobOutputStream extends OutputStream {
     this.maxBlockFlushThresholdSize = maxBlockFlushThresholdSize;
     this.metrics = metrics;
     this.compression = compression;
+    this.blobMetadataGeneratorFactory = blobMetadataGeneratorFactory;
+    this.blobMetadataGeneratorConfig = blobMetadataGeneratorConfig;
+    this.streamName = streamName;
   }
 
   // SAMZA-2476 stubbing BlockBlobAsyncClient.commitBlockListWithResponse was causing flaky tests.
@@ -245,6 +270,11 @@ public class AzureBlobOutputStream extends OutputStream {
     isClosed = true;
   }
 
+  @VisibleForTesting
+  BlobMetadataGenerator getBlobMetadataGenerator() throws Exception {
+    return blobMetadataGeneratorFactory.getBlobMetadataGeneratorInstance(blobMetadataGeneratorConfig);
+  }
+
   /**
    * This api will async upload the outputstream into block using stageBlocks,
    * reint outputstream
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
index a2fa2ac..d8205a5 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
@@ -50,10 +50,12 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.samza.config.Config;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemProducerException;
+import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -136,6 +138,9 @@ public class AzureBlobSystemProducer implements SystemProducer {
   private final Map<String, Object> sourceWriterCreationLockMap = new ConcurrentHashMap<>();
   private final Map<String, ReadWriteLock> sourceSendFlushLockMap = new ConcurrentHashMap<>();
 
+  private final BlobMetadataGeneratorFactory blobMetadataGeneratorFactory;
+  private final Config blobMetadataGeneratorConfig;
+
   public AzureBlobSystemProducer(String systemName, AzureBlobConfig config, MetricsRegistry metricsRegistry) {
     Preconditions.checkNotNull(systemName, "System name can not be null when creating AzureBlobSystemProducer");
     Preconditions.checkNotNull(config, "Config can not be null when creating AzureBlobSystemProducer");
@@ -171,6 +176,14 @@ public class AzureBlobSystemProducer implements SystemProducer {
     this.writerMap = new ConcurrentHashMap<>();
 
     this.metrics = new AzureBlobSystemProducerMetrics(systemName, config.getAzureAccountName(systemName), metricsRegistry);
+
+    String blobMetadataGeneratorFactoryClassName = this.config.getSystemBlobMetadataPropertiesGeneratorFactory(this.systemName);
+    try {
+      blobMetadataGeneratorFactory = (BlobMetadataGeneratorFactory) Class.forName(blobMetadataGeneratorFactoryClassName).newInstance();
+    } catch (Exception e) {
+      throw new SystemProducerException("Could not create blob metadata generator factory with name " + blobMetadataGeneratorFactoryClassName, e);
+    }
+    blobMetadataGeneratorConfig = this.config.getSystemBlobMetadataGeneratorConfigs(systemName);
   }
 
   /**
@@ -423,7 +436,7 @@ public class AzureBlobSystemProducer implements SystemProducer {
         if (writer == null) {
           AzureBlobWriterMetrics writerMetrics =
               new AzureBlobWriterMetrics(metrics.getAggregateMetrics(), metrics.getSystemMetrics(), metrics.getSourceMetrics(source));
-          writer = createNewWriter(blobURLPrefix, writerMetrics);
+          writer = createNewWriter(blobURLPrefix, writerMetrics, messageEnvelope.getSystemStream().getStream());
           sourceWriterMap.put(writerMapKey, writer);
         }
       }
@@ -502,9 +515,10 @@ public class AzureBlobSystemProducer implements SystemProducer {
   }
 
   @VisibleForTesting
-  AzureBlobWriter createNewWriter(String blobURL, AzureBlobWriterMetrics writerMetrics) {
+  AzureBlobWriter createNewWriter(String blobURL, AzureBlobWriterMetrics writerMetrics, String streamName) {
     try {
       return writerFactory.getWriterInstance(containerAsyncClient, blobURL, asyncBlobThreadPool, writerMetrics,
+          blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, streamName,
           blockFlushThresholdSize, flushTimeoutMs,
           CompressionFactory.getInstance().getCompression(config.getCompressionType(systemName)),
           config.getSuffixRandomStringToBlobName(systemName),
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobWriterFactory.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobWriterFactory.java
index 87ca5a6..e9fa16d 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobWriterFactory.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobWriterFactory.java
@@ -23,6 +23,8 @@ import org.apache.samza.system.azureblob.compression.Compression;
 import com.azure.storage.blob.BlobContainerAsyncClient;
 import java.io.IOException;
 import java.util.concurrent.Executor;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
 
 
 public interface AzureBlobWriterFactory {
@@ -32,6 +34,8 @@ public interface AzureBlobWriterFactory {
    * @param blobURL Azure blob url
    * @param blobUploadThreadPool thread pool to be used by writer for uploading
    * @param metrics metrics to measure the number of bytes written by writer
+   * @param blobMetadataGeneratorFactory factory to get generator for metadata properties for a blob
+   * @param streamName name of the stream that this AzureBlobWriter is associated with
    * @param maxBlockFlushThresholdSize threshold at which to upload
    * @param flushTimeoutMs timeout after which the flush is abandoned
    * @return AzureBlobWriter instance
@@ -39,6 +43,7 @@ public interface AzureBlobWriterFactory {
    */
   AzureBlobWriter getWriterInstance(BlobContainerAsyncClient containerAsyncClient, String blobURL,
       Executor blobUploadThreadPool, AzureBlobWriterMetrics metrics,
+      BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
       int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
       long maxBlobSize, long maxMessagesPerBlob) throws IOException;
 }
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/BlobMetadataContext.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/BlobMetadataContext.java
new file mode 100644
index 0000000..a3460eb
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/BlobMetadataContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.azureblob.utils;
+
+/**
+ * Properties about a Blob which can then be used to generate metadata for the blob
+ */
+public class BlobMetadataContext {
+  private final String streamName;
+  private final long blobSize;
+
+  public BlobMetadataContext(String streamName, long blobSize) {
+    this.streamName = streamName;
+    this.blobSize = blobSize;
+  }
+
+  public String getStreamName() {
+    return streamName;
+  }
+
+  public long getBlobSize() {
+    return blobSize;
+  }
+}
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/BlobMetadataGenerator.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/BlobMetadataGenerator.java
new file mode 100644
index 0000000..76f5f70
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/BlobMetadataGenerator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.azureblob.utils;
+
+import java.util.Map;
+
+
+/**
+ * Interface for generating metadata properties of an Azure Blob.
+ * Implementation is not expected to be thread safe.
+ */
+public interface BlobMetadataGenerator {
+
+  /**
+   * create metadata properties for a blob
+   * @param blobMetatadataConext contains details about the blob that can be used for generating metadata
+   * @return map containing metadata properties to be associated with the blob
+   */
+  Map<String, String> getBlobMetadata(BlobMetadataContext blobMetatadataConext);
+}
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/BlobMetadataGeneratorFactory.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/BlobMetadataGeneratorFactory.java
new file mode 100644
index 0000000..61661a4
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/BlobMetadataGeneratorFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.azureblob.utils;
+
+import org.apache.samza.config.Config;
+
+
+public interface BlobMetadataGeneratorFactory {
+
+  /**
+   * Creates an instance of {@link BlobMetadataGenerator}
+   */
+  BlobMetadataGenerator getBlobMetadataGeneratorInstance(Config blobMetadataGeneratorConfig);
+}
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/NullBlobMetadataGenerator.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/NullBlobMetadataGenerator.java
new file mode 100644
index 0000000..6b6efd2
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/NullBlobMetadataGenerator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.azureblob.utils;
+
+import java.util.Map;
+
+
+public class NullBlobMetadataGenerator implements BlobMetadataGenerator {
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Map<String, String> getBlobMetadata(BlobMetadataContext blobMetadataContext) {
+    return null;
+  }
+}
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/NullBlobMetadataGeneratorFactory.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/NullBlobMetadataGeneratorFactory.java
new file mode 100644
index 0000000..107e1c6
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/NullBlobMetadataGeneratorFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.azureblob.utils;
+
+import org.apache.samza.config.Config;
+
+
+public class NullBlobMetadataGeneratorFactory implements BlobMetadataGeneratorFactory {
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public BlobMetadataGenerator getBlobMetadataGeneratorInstance(Config blobMetadataGeneratorConfig) {
+    return new NullBlobMetadataGenerator();
+  }
+}
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
index b4ef4b4..62076aa 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
@@ -45,8 +45,10 @@ import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -79,6 +81,9 @@ public class TestAzureBlobAvroWriter {
   private BlockBlobAsyncClient mockBlockBlobAsyncClient;
   private Compression mockCompression;
 
+  private final BlobMetadataGeneratorFactory blobMetadataGeneratorFactory = mock(BlobMetadataGeneratorFactory.class);
+  private final Config blobMetadataGeneratorConfig = mock(Config.class);
+  private static final String STREAM_NAME = "FAKE_STREAM";
   private static final String VALUE = "FAKE_VALUE";
   private static final String SYSTEM_NAME = "FAKE_SYSTEM";
   private static final int THRESHOLD = 100;
@@ -146,8 +151,9 @@ public class TestAzureBlobAvroWriter {
     mockCompression = CompressionFactory.getInstance().getCompression(CompressionType.GZIP);
     azureBlobAvroWriter =
         spy(new AzureBlobAvroWriter(mockContainerAsyncClient, mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD,
-            60000, "test", mockDataFileWriter, mockAzureBlobOutputStream, mockBlockBlobAsyncClient, Long.MAX_VALUE,
-            Long.MAX_VALUE, mockCompression, false)); // keeping blob size and number of records unlimited
+            60000, "test", mockDataFileWriter, mockAzureBlobOutputStream, mockBlockBlobAsyncClient,
+            blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
+            Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false)); // keeping blob size and number of records unlimited
     doReturn(encodedRecord).when(azureBlobAvroWriter).encodeRecord((IndexedRecord) ome.getMessage());
   }
   @Test
@@ -187,7 +193,8 @@ public class TestAzureBlobAvroWriter {
     azureBlobAvroWriter =
         spy(new AzureBlobAvroWriter(PowerMockito.mock(BlobContainerAsyncClient.class), mock(AzureBlobWriterMetrics.class),
             threadPool, THRESHOLD, 60000, "test",
-            null, null, null, 1000, 100, mockCompression, false));
+            null, null, null, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
+            1000, 100, mockCompression, false));
     OutgoingMessageEnvelope omeEncoded = new OutgoingMessageEnvelope(new SystemStream(SYSTEM_NAME, "Topic1"), new byte[100]);
     azureBlobAvroWriter.write(omeEncoded);
   }
@@ -242,7 +249,8 @@ public class TestAzureBlobAvroWriter {
     BlobContainerAsyncClient mockContainerClient = PowerMockito.mock(BlobContainerAsyncClient.class);
     azureBlobAvroWriter = spy(new AzureBlobAvroWriter(mockContainerClient,
         mockMetrics, threadPool, THRESHOLD, 60000, blobUrlPrefix,
-        null, null, null, maxBlobSize, 10, mockCompression, true));
+        null, null, null, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
+        maxBlobSize, 10, mockCompression, true));
 
     DataFileWriter mockDataFileWriter1 = mock(DataFileWriter.class);
     PowerMockito.whenNew(DataFileWriter.class).withAnyArguments().thenReturn(mockDataFileWriter1);
@@ -254,7 +262,7 @@ public class TestAzureBlobAvroWriter {
 
     AzureBlobOutputStream mockAzureBlobOutputStream1 = mock(AzureBlobOutputStream.class);
     PowerMockito.whenNew(AzureBlobOutputStream.class).withArguments(mockBlockBlobAsyncClient1, threadPool,
-        mockMetrics,
+        mockMetrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
         (long) 60000, THRESHOLD, mockCompression).thenReturn(mockAzureBlobOutputStream1);
     when(mockAzureBlobOutputStream1.getSize()).thenReturn((long) maxBlobSize - 1);
 
@@ -272,7 +280,7 @@ public class TestAzureBlobAvroWriter {
 
     AzureBlobOutputStream mockAzureBlobOutputStream2 = mock(AzureBlobOutputStream.class);
     PowerMockito.whenNew(AzureBlobOutputStream.class).withArguments(mockBlockBlobAsyncClient2, threadPool,
-        mockMetrics,
+        mockMetrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
         (long) 60000, THRESHOLD, mockCompression).thenReturn(mockAzureBlobOutputStream2);
     when(mockAzureBlobOutputStream2.getSize()).thenReturn((long) maxBlobSize - 1);
 
@@ -304,7 +312,8 @@ public class TestAzureBlobAvroWriter {
     BlobContainerAsyncClient mockContainerClient = PowerMockito.mock(BlobContainerAsyncClient.class);
     azureBlobAvroWriter = spy(new AzureBlobAvroWriter(mockContainerClient,
         mockMetrics, threadPool, THRESHOLD, 60000, blobUrlPrefix,
-        null, null, null, maxBlobSize, maxRecordsPerBlob, mockCompression, true));
+        null, null, null, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
+        maxBlobSize, maxRecordsPerBlob, mockCompression, true));
 
     DataFileWriter mockDataFileWriter1 = mock(DataFileWriter.class);
     PowerMockito.whenNew(DataFileWriter.class).withAnyArguments().thenReturn(mockDataFileWriter1);
@@ -316,7 +325,7 @@ public class TestAzureBlobAvroWriter {
 
     AzureBlobOutputStream mockAzureBlobOutputStream1 = mock(AzureBlobOutputStream.class);
     PowerMockito.whenNew(AzureBlobOutputStream.class).withArguments(mockBlockBlobAsyncClient1, threadPool,
-        mockMetrics,
+        mockMetrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
         (long) 60000, THRESHOLD, mockCompression).thenReturn(mockAzureBlobOutputStream1);
     when(mockAzureBlobOutputStream1.getSize()).thenReturn((long) 1);
 
@@ -337,7 +346,7 @@ public class TestAzureBlobAvroWriter {
 
     AzureBlobOutputStream mockAzureBlobOutputStream2 = mock(AzureBlobOutputStream.class);
     PowerMockito.whenNew(AzureBlobOutputStream.class).withArguments(mockBlockBlobAsyncClient2, threadPool,
-        mockMetrics,
+        mockMetrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
         (long) 60000, THRESHOLD, mockCompression).thenReturn(mockAzureBlobOutputStream2);
     when(mockAzureBlobOutputStream2.getSize()).thenReturn((long) 1);
 
@@ -366,8 +375,8 @@ public class TestAzureBlobAvroWriter {
     BlobContainerAsyncClient mockContainerClient = PowerMockito.mock(BlobContainerAsyncClient.class);
     azureBlobAvroWriter = spy(new AzureBlobAvroWriter(mockContainerClient,
         mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD, 60000, blobUrlPrefix,
-        mockDataFileWriter, mockAzureBlobOutputStream, mockBlockBlobAsyncClient, maxBlobSize, maxRecordsPerBlob,
-        mockCompression, false));
+        mockDataFileWriter, mockAzureBlobOutputStream, mockBlockBlobAsyncClient, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
+        maxBlobSize, maxRecordsPerBlob, mockCompression, false));
 
     DataFileWriter<IndexedRecord> mockDataFileWriter2 = mock(DataFileWriter.class);
     AzureBlobOutputStream mockAzureBlobOutputStream2 = mock(AzureBlobOutputStream.class);
@@ -393,6 +402,7 @@ public class TestAzureBlobAvroWriter {
     azureBlobAvroWriter = spy(new AzureBlobAvroWriter(PowerMockito.mock(BlobContainerAsyncClient.class),
         mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD,
         60000, "test", mockDataFileWriter, mockAzureBlobOutputStream, mockBlockBlobAsyncClient,
+        blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
         Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false));
     IndexedRecord record = new GenericRecordEvent();
     Assert.assertTrue(Arrays.equals(encodeRecord(record), azureBlobAvroWriter.encodeRecord(record)));
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
index d635693..f4780fd 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
@@ -20,6 +20,7 @@
 package org.apache.samza.system.azureblob.avro;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import org.apache.samza.AzureException;
 import org.apache.samza.system.azureblob.compression.Compression;
 import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
@@ -33,6 +34,10 @@ import java.util.Map;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.azureblob.utils.BlobMetadataContext;
+import org.apache.samza.system.azureblob.utils.BlobMetadataGenerator;
+import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -47,7 +52,9 @@ import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.anyMap;
+import static org.mockito.Mockito.anyObject;
 import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
@@ -72,6 +79,11 @@ public class TestAzureBlobOutputStream {
   private static final byte[] COMPRESSED_BYTES = RANDOM_STRING.substring(0, THRESHOLD / 2).getBytes();
   private AzureBlobWriterMetrics mockMetrics;
   private Compression mockCompression;
+  private static final String FAKE_STREAM = "FAKE_STREAM";
+  private static final String BLOB_RAW_SIZE_BYTES_METADATA = "rawSizeBytes";
+  private static final String BLOB_STREAM_NAME_METADATA = "streamName";
+  private final BlobMetadataGeneratorFactory blobMetadataGeneratorFactory = mock(BlobMetadataGeneratorFactory.class);
+  private final Config blobMetadataGeneratorConfig = mock(Config.class);
 
   @Before
   public void setup() throws Exception {
@@ -90,12 +102,25 @@ public class TestAzureBlobOutputStream {
     mockCompression = mock(Compression.class);
     doReturn(COMPRESSED_BYTES).when(mockCompression).compress(BYTES);
 
+    BlobMetadataGenerator mockBlobMetadataGenerator = mock(BlobMetadataGenerator.class);
+    doAnswer(invocation -> {
+        BlobMetadataContext blobMetadataContext = invocation.getArgumentAt(0, BlobMetadataContext.class);
+        String streamName = blobMetadataContext.getStreamName();
+        Long blobSize = blobMetadataContext.getBlobSize();
+        Map<String, String> metadataProperties = new HashMap<>();
+        metadataProperties.put(BLOB_STREAM_NAME_METADATA, streamName);
+        metadataProperties.put(BLOB_RAW_SIZE_BYTES_METADATA, Long.toString(blobSize));
+        return metadataProperties;
+      }).when(mockBlobMetadataGenerator).getBlobMetadata(anyObject());
+
     azureBlobOutputStream = spy(new AzureBlobOutputStream(mockBlobAsyncClient, threadPool, mockMetrics,
+        blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, FAKE_STREAM,
         60000, THRESHOLD, mockByteArrayOutputStream, mockCompression));
 
     doNothing().when(azureBlobOutputStream).commitBlob(any(ArrayList.class), anyMap());
     doNothing().when(azureBlobOutputStream).stageBlock(anyString(), any(ByteBuffer.class), anyInt());
     doNothing().when(azureBlobOutputStream).clearAndMarkClosed();
+    doReturn(mockBlobMetadataGenerator).when(azureBlobOutputStream).getBlobMetadataGenerator();
   }
 
   @Test
@@ -216,7 +241,8 @@ public class TestAzureBlobOutputStream {
     verify(azureBlobOutputStream).commitBlob(blockListArgument.capture(), blobMetadataArg.capture());
     Assert.assertEquals(Arrays.asList(blockIdEncoded), blockListArgument.getAllValues().get(0));
     Map<String, String> blobMetadata = (Map<String, String>) blobMetadataArg.getAllValues().get(0);
-    Assert.assertEquals(blobMetadata.get(AzureBlobOutputStream.BLOB_RAW_SIZE_BYTES_METADATA), Long.toString(THRESHOLD));
+    Assert.assertEquals(blobMetadata.get(BLOB_RAW_SIZE_BYTES_METADATA), Long.toString(THRESHOLD));
+    Assert.assertEquals(blobMetadata.get(BLOB_STREAM_NAME_METADATA), FAKE_STREAM);
   }
 
   @Test
@@ -239,13 +265,15 @@ public class TestAzureBlobOutputStream {
     Assert.assertEquals(blockIdEncoded, blockListArgument.getAllValues().get(0).toArray()[0]);
     Assert.assertEquals(blockIdEncoded1, blockListArgument.getAllValues().get(0).toArray()[1]);
     Map<String, String> blobMetadata = (Map<String, String>) blobMetadataArg.getAllValues().get(0);
-    Assert.assertEquals(blobMetadata.get(AzureBlobOutputStream.BLOB_RAW_SIZE_BYTES_METADATA), Long.toString(2 * THRESHOLD));
+    Assert.assertEquals(blobMetadata.get(BLOB_RAW_SIZE_BYTES_METADATA), Long.toString(2 * THRESHOLD));
+    Assert.assertEquals(blobMetadata.get(BLOB_STREAM_NAME_METADATA), FAKE_STREAM);
   }
 
   @Test(expected = AzureException.class)
   public void testCloseFailed() {
 
     azureBlobOutputStream = spy(new AzureBlobOutputStream(mockBlobAsyncClient, threadPool, mockMetrics,
+        blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, FAKE_STREAM,
         60000, THRESHOLD, mockByteArrayOutputStream, mockCompression));
 
     //doNothing().when(azureBlobOutputStream).commitBlob(any(ArrayList.class), anyMap());
@@ -286,6 +314,7 @@ public class TestAzureBlobOutputStream {
   @Test (expected = AzureException.class)
   public void testFlushFailed() throws IOException {
     azureBlobOutputStream = spy(new AzureBlobOutputStream(mockBlobAsyncClient, threadPool, mockMetrics,
+        blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, FAKE_STREAM,
         60000, THRESHOLD, mockByteArrayOutputStream, mockCompression));
 
     doNothing().when(azureBlobOutputStream).commitBlob(any(ArrayList.class), anyMap());
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/producer/TestAzureBlobSystemProducer.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/producer/TestAzureBlobSystemProducer.java
index 2969cd0..4ad22ae 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/producer/TestAzureBlobSystemProducer.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/producer/TestAzureBlobSystemProducer.java
@@ -45,6 +45,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
@@ -95,7 +96,7 @@ public class TestAzureBlobSystemProducer {
 
     systemProducer = spy(new AzureBlobSystemProducer(SYSTEM_NAME, azureBlobConfig, mockMetricsRegistry));
     // use mock writer impl
-    doReturn(mockAzureWriter).when(systemProducer).createNewWriter(anyString(), any());
+    setupWriterForProducer(systemProducer, mockAzureWriter, STREAM);
     // bypass Azure connection setup
     doNothing().when(systemProducer).setupAzureContainer(anyString(), anyString());
   }
@@ -415,7 +416,7 @@ public class TestAzureBlobSystemProducer {
     systemProducer.register(source1);
     systemProducer.start();
 
-    doReturn(mockAzureWriter1).when(systemProducer).createNewWriter(anyString(), any());
+    setupWriterForProducer(systemProducer, mockAzureWriter1, stream1);
 
     Thread t1 = sendFlushInThread(source1, ome1, systemProducer, sendsInFirstThread);
     Thread t2 = sendFlushInThread(source1, ome1, systemProducer, sendsInSecondThread);
@@ -451,7 +452,7 @@ public class TestAzureBlobSystemProducer {
     // bypass Azure connection setup
     doNothing().when(systemProducer).setupAzureContainer(anyString(), anyString());
 
-    doReturn(mockAzureWriter1).when(systemProducer).createNewWriter(anyString(), any());
+    setupWriterForProducer(systemProducer, mockAzureWriter1, stream1);
 
     systemProducer.register(source1);
     systemProducer.start();
@@ -493,7 +494,7 @@ public class TestAzureBlobSystemProducer {
     // bypass Azure connection setup
     doNothing().when(systemProducer).setupAzureContainer(anyString(), anyString());
 
-    doReturn(mockAzureWriter1).when(systemProducer).createNewWriter(anyString(), any());
+    setupWriterForProducer(systemProducer, mockAzureWriter1, STREAM);
 
     systemProducer.register(source1);
     systemProducer.start();
@@ -591,4 +592,15 @@ public class TestAzureBlobSystemProducer {
     Config config = new MapConfig(bareConfigs);
     return config;
   }
+
+  private void setupWriterForProducer(AzureBlobSystemProducer azureBlobSystemProducer,
+      AzureBlobWriter mockAzureBlobWriter, String stream) {
+    doAnswer(invocation -> {
+        String blobUrl = invocation.getArgumentAt(0, String.class);
+        String streamName = invocation.getArgumentAt(2, String.class);
+        Assert.assertEquals(stream, streamName);
+        Assert.assertEquals(stream, blobUrl);
+        return mockAzureBlobWriter;
+      }).when(azureBlobSystemProducer).createNewWriter(anyString(), any(), anyString());
+  }
 }
\ No newline at end of file
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/utils/TestNullBlobMetadataGenerator.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/utils/TestNullBlobMetadataGenerator.java
new file mode 100644
index 0000000..072a51c
--- /dev/null
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/utils/TestNullBlobMetadataGenerator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.azureblob.utils;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestNullBlobMetadataGenerator {
+  private NullBlobMetadataGenerator nullBlobMetadataGenerator;
+
+  @Before
+  public void setup() {
+    nullBlobMetadataGenerator = new NullBlobMetadataGenerator();
+  }
+
+  @Test
+  public void testGetBlobMetadata() {
+    Assert.assertNull(nullBlobMetadataGenerator.getBlobMetadata(new BlobMetadataContext("fake_stream", 100)));
+  }
+
+  @Test
+  public void testGetBlobMetadataEmptyInput() {
+    Assert.assertNull(nullBlobMetadataGenerator.getBlobMetadata(new BlobMetadataContext("", 0)));
+  }
+
+  @Test
+  public void testGetBlobMetadataNullInput() {
+    Assert.assertNull(nullBlobMetadataGenerator.getBlobMetadata(new BlobMetadataContext(null, 0)));
+  }
+}