You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2023/06/17 02:21:03 UTC

[samza] branch master updated: SAMZA-2778: Make AzureBlobOutputStream buffer initialization size configurable. (#1662)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new aa5db44e2 SAMZA-2778: Make AzureBlobOutputStream buffer initialization size configurable. (#1662)
aa5db44e2 is described below

commit aa5db44e25e87e84d7fc32dc50ba51347290acd8
Author: Eric Honer <eh...@linkedin.com>
AuthorDate: Fri Jun 16 19:20:58 2023 -0700

    SAMZA-2778: Make AzureBlobOutputStream buffer initialization size configurable. (#1662)
    
    Symptom:
    JVM crashes due to OOM exceptions. When the system has a large number of AzureBlobWriterObjects, the memory becomes heavily fragmented and can be susceptible to crashing when a new AzureBlobOutputStream is created.
    
    Cause:
    The crashes are caused by inefficient memory management when using the G1GC. (The default garbage collector in Java 11+.) What causes (code paths) this issue and why this is a problem for the G1GC are examined separately.
    
    What causes this issue?
    The underlying issue is caused by the AzureBlobOutputStream. When a new instance is created, it creates a new ByteArrayOutputStream initialized to maxFlushThresholdSize.12 The ByteArrayOutputStream is the buffer used by the parent to accumulate messages between flush intervals. It requires 10MB (current default) of memory to initialize. This allows the buffer to accumulate messages without resize operations, however, it does not prevent resizing. The maxFlushThresholdSize is enforced  [...]
    
    Why this is a problem for the G1GC?
    The focus here is on the G1GC and humongous objects (G1 specific).3 The G1 GC introduced a new memory management strategy that divides the heap into regions, -XX:G1HeapRegionSize=n. The GC can operate on regions concurrently and copies live objects between regions during full GC to reclaim/empty regions.4 The default behavior creates ~2048 regions, sized to a factor of 2 between 1MB and 32MB. Any object larger than half of a region size, is considered a humongous object.
    Humongous objects are allocated an entire region (or consecutive regions if larger than a single region) and are not copied between regions during full GC. Any space not used by the object within a region is non-addressable for the life of the object.5 A JVM heap size of 31GB, -Xmx31G, will default to 16MB regions. Considering the current default size is 10MB, each buffer requires an entire region and prevents the use of 6MB, regardless of the how much data is in the buffer. For a hea [...]
    The 10MB buffer size can exhaust the regions and cause OOMs or create fragmentation that causes OOMs. A fragmentation caused OOM occurs in the following sequence. On new, the JVM attempts to create the object in Eden. If there is insufficient space in Eden a minor GC is performed. If there is insufficient space after minor GC, the object is immediately promoted Old Gen. If there is insufficient space in Old Gen, a full GC is performed. If a full GC cannot allocate memory or region(s)  [...]
    
    Changes:
    The javadocs, where appropriate, have been updated to reflect changes or describe new behaviors. No public APIs were removed, they were marked deprecated and migrated to the new default initialization value. All of the changes are itemized below.
    
    AzureBlobConfig
    Adding two new public fields and one public method. The new configuration is made accessible in the same manner as existing configs (see #configs SEP-26), also consistent the coding-guide. There is a new public config key: SYSTEM_INIT_BUFFER_SIZE - named initBufferSize.bytes. The default value is public field SYSTEM_INIT_BUFFER_SIZE_DEFAULT. The user provided configuration value is accessible with new public method getInitBufferSizeBytes(..). The method returns the configuration value [...]
    
    AzureBlobWriterFactory
    There are two changes to this interface, both to the method, getWriterInstance(..). The existing implementation is marked @Deprecated and a new method with an additional parameter is added. The new parameter is an int that is expected to be the _ initBufferSize_.
    
    AzureBlobAvroWriterFactory
    The modifications here are consistent with the changes to interface AzureBlobWriterFactory. However, the deprecated implementation uses the new field AzureBlobConfig.SYSTEM_INIT_BUFFER_SIZE_DEFAULT when calling the new public API. This will migrate users to the new initialization behavior.
    
    AzureBlobAvroWriter
    There are two public, one package-private, and two private changes. Both public changes are to constructors. The existing public constructor is marked deprecated and invokes the new public constructor with the new field AzureBlobConfig.SYSTEM_INIT_BUFFER_SIZE_DEFAULT. The new constructor sets the new private field initBufferSize. The package-private constructor is modified with an additional int parameter and the tests were changed accordingly. The remaining private change modifies cr [...]
    
    AzureBlobOutputStream
    The existing public API is marked @Deprecated and the ByteArrayOutputStream is initialized with the new field AzureBlobConfig.SYSTEM_INIT_BUFFER_SIZE_DEFAULT. The new public constructor includes the new parameter initBufferSize and initializes the ByteArrayOutputStream to that size.
---
 .../versioned/jobs/samza-configurations.md         |  1 +
 .../samza/system/azureblob/AzureBlobConfig.java    | 12 +++++++++++
 .../system/azureblob/avro/AzureBlobAvroWriter.java | 22 ++++++++++++++++++--
 .../azureblob/avro/AzureBlobAvroWriterFactory.java |  5 +++--
 .../azureblob/avro/AzureBlobOutputStream.java      |  9 ++++----
 .../producer/AzureBlobSystemProducer.java          |  3 ++-
 .../azureblob/producer/AzureBlobWriterFactory.java |  3 ++-
 .../azureblob/avro/TestAzureBlobAvroWriter.java    | 24 ++++++++++++----------
 .../azureblob/avro/TestAzureBlobOutputStream.java  |  4 +++-
 9 files changed, 61 insertions(+), 22 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 790e289ec..60d8ae8b5 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -267,6 +267,7 @@ Configs for producing to [Azure Blob Storage](https://azure.microsoft.com/en-us/
 |systems.**_system-name_**.azureblob.maxFlushThresholdSize|10485760 (10 MB)|max size of the uncompressed block to be uploaded in bytes. Maximum size allowed by Azure is 100MB.|
 |systems.**_system-name_**.azureblob.maxBlobSize|Long.MAX_VALUE (unlimited)|max size of the uncompressed blob in bytes.<br>If default value then size is unlimited capped only by Azure BlockBlob size of 4.75 TB (100 MB per block X 50,000 blocks).|
 |systems.**_system-name_**.azureblob.maxMessagesPerBlob|Long.MAX_VALUE (unlimited)|max number of messages per blob.|
+|systems.**_system-name_**.azureblob.initBufferSize.bytes|32|The amount of memory pre-allocated for `org.apache.samza.system.azureblob.avro.AzureBlobOutputStream` buffers.<br>Values must be between 32 (default) and `maxBlobSize`.<br>This should be increased for fast-filling buffers when buffer resize operations affect performance. Large values can lead to inefficent memory allocation with the G1 garbage collector. If the size is >= half of a region size, `G1HeapRegionSize`, consider swit [...]
 |systems.**_system-name_**.azureblob.threadPoolCount|2|number of threads for the asynchronous uploading of blocks.|
 |systems.**_system-name_**.azureblob.blockingQueueSize|Thread Pool Count * 2|size of the queue to hold blocks ready to be uploaded by asynchronous threads.<br>If all threads are busy uploading then blocks are queued and if queue is full then main thread will start uploading which will block processing of incoming messages.|
 |systems.**_system-name_**.azureblob.flushTimeoutMs|180000 (3 mins)|timeout to finish uploading all blocks before committing a blob.|
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java
index 3026a89af..37c6ebcf3 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java
@@ -80,6 +80,12 @@ public class AzureBlobConfig extends MapConfig {
   public static final String SYSTEM_MAX_FLUSH_THRESHOLD_SIZE = SYSTEM_AZUREBLOB_PREFIX + "maxFlushThresholdSize";
   private static final int SYSTEM_MAX_FLUSH_THRESHOLD_SIZE_DEFAULT = 10485760;
 
+  // initialization size of in-memory OutputStream
+  // This value should be between SYSTEM_INIT_BUFFER_SIZE_DEFAULT and getMaxFlushThresholdSize() exclusive.
+  public static final String SYSTEM_INIT_BUFFER_SIZE = SYSTEM_AZUREBLOB_PREFIX + "initBufferSize.bytes";
+  // re-use size for parameterless constructor java.io.ByteArrayOutputStream()
+  public static final int SYSTEM_INIT_BUFFER_SIZE_DEFAULT = 32;
+
   // maximum size of uncompressed blob in bytes
   public static final String SYSTEM_MAX_BLOB_SIZE = SYSTEM_AZUREBLOB_PREFIX + "maxBlobSize";
   private static final long SYSTEM_MAX_BLOB_SIZE_DEFAULT = Long.MAX_VALUE; // unlimited
@@ -170,6 +176,12 @@ public class AzureBlobConfig extends MapConfig {
     return getInt(String.format(SYSTEM_MAX_FLUSH_THRESHOLD_SIZE, systemName), SYSTEM_MAX_FLUSH_THRESHOLD_SIZE_DEFAULT);
   }
 
+  // return larger of config value or DEFAULT and smaller of MaxFlushThresholdSize
+  public int getInitBufferSizeBytes(String systemName) {
+    int init = getInt(String.format(SYSTEM_INIT_BUFFER_SIZE, systemName), SYSTEM_INIT_BUFFER_SIZE_DEFAULT);
+    return Math.min(Math.max(init, SYSTEM_INIT_BUFFER_SIZE_DEFAULT), getMaxFlushThresholdSize(systemName));
+  }
+
   public int getAzureBlobThreadPoolCount(String systemName) {
     return getInt(String.format(SYSTEM_THREAD_POOL_COUNT, systemName), SYSTEM_THREAD_POOL_COUNT_DEFAULT);
   }
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
index b2e8c3ebf..e50193669 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
@@ -23,6 +23,7 @@ import com.azure.storage.blob.BlobContainerAsyncClient;
 import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.samza.system.azureblob.AzureBlobConfig;
 import org.apache.samza.system.azureblob.compression.Compression;
 import org.apache.samza.system.azureblob.producer.AzureBlobWriter;
 import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
@@ -114,6 +115,7 @@ public class AzureBlobAvroWriter implements AzureBlobWriter {
   private final String blobURLPrefix;
   private final long maxBlobSize;
   private final long maxRecordsPerBlob;
+  private final int initBufferSize;
   private final boolean useRandomStringInBlobName;
   private final Object currentDataFileWriterLock = new Object();
   private volatile long recordsInCurrentBlob = 0;
@@ -121,12 +123,24 @@ public class AzureBlobAvroWriter implements AzureBlobWriter {
   private Config blobMetadataGeneratorConfig;
   private String streamName;
 
+  @Deprecated
   public AzureBlobAvroWriter(BlobContainerAsyncClient containerAsyncClient, String blobURLPrefix,
       Executor blobThreadPool, AzureBlobWriterMetrics metrics,
       BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
       int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
       long maxBlobSize, long maxRecordsPerBlob) {
 
+    this(containerAsyncClient, blobURLPrefix, blobThreadPool, metrics, blobMetadataGeneratorFactory,
+        blobMetadataGeneratorConfig, streamName, maxBlockFlushThresholdSize, flushTimeoutMs, compression,
+        useRandomStringInBlobName, maxBlobSize, maxRecordsPerBlob, AzureBlobConfig.SYSTEM_INIT_BUFFER_SIZE_DEFAULT);
+  }
+
+  public AzureBlobAvroWriter(BlobContainerAsyncClient containerAsyncClient, String blobURLPrefix,
+      Executor blobThreadPool, AzureBlobWriterMetrics metrics,
+      BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
+      int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
+      long maxBlobSize, long maxRecordsPerBlob, int initBufferSize) {
+
     this.blobThreadPool = blobThreadPool;
     this.metrics = metrics;
     this.maxBlockFlushThresholdSize = maxBlockFlushThresholdSize;
@@ -140,6 +154,7 @@ public class AzureBlobAvroWriter implements AzureBlobWriter {
     this.blobMetadataGeneratorFactory = blobMetadataGeneratorFactory;
     this.blobMetadataGeneratorConfig = blobMetadataGeneratorConfig;
     this.streamName = streamName;
+    this.initBufferSize = initBufferSize;
   }
 
   /**
@@ -244,7 +259,9 @@ public class AzureBlobAvroWriter implements AzureBlobWriter {
       DataFileWriter<Object> dataFileWriter,
       AzureBlobOutputStream azureBlobOutputStream, BlockBlobAsyncClient blockBlobAsyncClient,
       BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
-      long maxBlobSize, long maxRecordsPerBlob, Compression compression, boolean useRandomStringInBlobName) {
+      long maxBlobSize, long maxRecordsPerBlob, Compression compression, boolean useRandomStringInBlobName,
+      int initBufferSize) {
+
     if (dataFileWriter == null || azureBlobOutputStream == null || blockBlobAsyncClient == null) {
       this.currentBlobWriterComponents = null;
     } else {
@@ -265,6 +282,7 @@ public class AzureBlobAvroWriter implements AzureBlobWriter {
     this.blobMetadataGeneratorFactory = blobMetadataGeneratorFactory;
     this.blobMetadataGeneratorConfig = blobMetadataGeneratorConfig;
     this.streamName = streamName;
+    this.initBufferSize = initBufferSize;
   }
 
   @VisibleForTesting
@@ -351,7 +369,7 @@ public class AzureBlobAvroWriter implements AzureBlobWriter {
     try {
       azureBlobOutputStream = new AzureBlobOutputStream(blockBlobAsyncClient, blobThreadPool, metrics,
           blobMetadataGeneratorFactory, blobMetadataGeneratorConfig,
-          streamName, flushTimeoutMs, maxBlockFlushThresholdSize, compression);
+          streamName, flushTimeoutMs, maxBlockFlushThresholdSize, compression, initBufferSize);
     } catch (Exception e) {
       throw new SamzaException("Unable to create AzureBlobOutputStream", e);
     }
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriterFactory.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriterFactory.java
index 0a4e01908..6434145ad 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriterFactory.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriterFactory.java
@@ -35,13 +35,14 @@ public class AzureBlobAvroWriterFactory implements AzureBlobWriterFactory {
   /**
    * {@inheritDoc}
    */
+  @Override
   public AzureBlobWriter getWriterInstance(BlobContainerAsyncClient containerAsyncClient, String blobURL,
       Executor blobUploadThreadPool, AzureBlobWriterMetrics metrics,
       BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
       int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
-      long maxBlobSize, long maxMessagesPerBlob) throws IOException {
+      long maxBlobSize, long maxMessagesPerBlob, int initBufferSize) throws IOException {
     return new AzureBlobAvroWriter(containerAsyncClient, blobURL, blobUploadThreadPool, metrics,
           blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, streamName, maxBlockFlushThresholdSize, flushTimeoutMs,
-          compression, useRandomStringInBlobName, maxBlobSize, maxMessagesPerBlob);
+          compression, useRandomStringInBlobName, maxBlobSize, maxMessagesPerBlob, initBufferSize);
   }
 }
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
index 0e05aefa6..51d983cfd 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
@@ -51,7 +51,8 @@ import reactor.core.scheduler.Schedulers;
 
 /**
  * This class extends {@link java.io.OutputStream} and uses {@link java.io.ByteArrayOutputStream}
- * for caching the write calls till upload is not called.
+ * for caching the write calls till upload is not called. The initialization size of the
+ * underlying {@link java.io.ByteArrayOutputStream} can be set by the caller or from config.
  *
  * It asynchronously uploads the blocks and waits on them to finish at close.
  * The blob is persisted at close.
@@ -105,13 +106,13 @@ public class AzureBlobOutputStream extends OutputStream {
    * @param flushTimeoutMs timeout for uploading a block
    * @param maxBlockFlushThresholdSize max block size
    * @param compression type of compression to be used before uploading a block
+   * @param initBufferSize initial size of {@link ByteArrayOutputStream}
    */
   public AzureBlobOutputStream(BlockBlobAsyncClient blobAsyncClient, Executor blobThreadPool, AzureBlobWriterMetrics metrics,
       BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
-      long flushTimeoutMs, int maxBlockFlushThresholdSize, Compression compression) {
+      long flushTimeoutMs, int maxBlockFlushThresholdSize, Compression compression, int initBufferSize) {
     this(blobAsyncClient, blobThreadPool, metrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, streamName,
-        flushTimeoutMs, maxBlockFlushThresholdSize,
-        new ByteArrayOutputStream(maxBlockFlushThresholdSize), compression);
+        flushTimeoutMs, maxBlockFlushThresholdSize, new ByteArrayOutputStream(initBufferSize), compression);
   }
 
   /**
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
index 91927d272..2774dba41 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
@@ -499,7 +499,8 @@ public class AzureBlobSystemProducer implements SystemProducer {
           CompressionFactory.getInstance().getCompression(config.getCompressionType(systemName)),
           config.getSuffixRandomStringToBlobName(systemName),
           config.getMaxBlobSize(systemName),
-          config.getMaxMessagesPerBlob(systemName));
+          config.getMaxMessagesPerBlob(systemName),
+          config.getInitBufferSizeBytes(systemName));
     } catch (Exception e) {
       throw new RuntimeException("Failed to create a writer for the producer.", e);
     }
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobWriterFactory.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobWriterFactory.java
index e9fa16d2a..a3d7a1c33 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobWriterFactory.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobWriterFactory.java
@@ -38,6 +38,7 @@ public interface AzureBlobWriterFactory {
    * @param streamName name of the stream that this AzureBlobWriter is associated with
    * @param maxBlockFlushThresholdSize threshold at which to upload
    * @param flushTimeoutMs timeout after which the flush is abandoned
+   * @param initBufferSize initial size of in-memory buffer(s)
    * @return AzureBlobWriter instance
    * @throws IOException if writer creation fails
    */
@@ -45,5 +46,5 @@ public interface AzureBlobWriterFactory {
       Executor blobUploadThreadPool, AzureBlobWriterMetrics metrics,
       BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
       int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
-      long maxBlobSize, long maxMessagesPerBlob) throws IOException;
+      long maxBlobSize, long maxMessagesPerBlob, int initBufferSize) throws IOException;
 }
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
index 3f7cefc99..8cca792a8 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
@@ -22,6 +22,7 @@ package org.apache.samza.system.azureblob.avro;
 import com.azure.storage.blob.BlobAsyncClient;
 import com.azure.storage.blob.BlobContainerAsyncClient;
 import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
+import org.apache.samza.system.azureblob.AzureBlobConfig;
 import org.apache.samza.system.azureblob.compression.Compression;
 import org.apache.samza.system.azureblob.compression.CompressionFactory;
 import org.apache.samza.system.azureblob.compression.CompressionType;
@@ -89,6 +90,7 @@ public class TestAzureBlobAvroWriter {
   private static final String VALUE = "FAKE_VALUE";
   private static final String SYSTEM_NAME = "FAKE_SYSTEM";
   private static final int THRESHOLD = 100;
+  private static final int INIT_SIZE = AzureBlobConfig.SYSTEM_INIT_BUFFER_SIZE_DEFAULT;
 
   private class SpecificRecordEvent extends org.apache.avro.specific.SpecificRecordBase
       implements org.apache.avro.specific.SpecificRecord {
@@ -181,7 +183,7 @@ public class TestAzureBlobAvroWriter {
         spy(new AzureBlobAvroWriter(mockContainerAsyncClient, mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD,
             60000, "test", mockDataFileWriter, mockAzureBlobOutputStream, mockBlockBlobAsyncClient,
             blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
-            Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false)); // keeping blob size and number of records unlimited
+            Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false, INIT_SIZE)); // keeping blob size and number of records unlimited
     doReturn(encodedRecord).when(azureBlobAvroWriter).encodeRecord((IndexedRecord) ome.getMessage());
   }
   @Test
@@ -236,7 +238,7 @@ public class TestAzureBlobAvroWriter {
         spy(new AzureBlobAvroWriter(PowerMockito.mock(BlobContainerAsyncClient.class), mock(AzureBlobWriterMetrics.class),
             threadPool, THRESHOLD, 60000, "test",
             null, null, null, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
-            1000, 100, mockCompression, false));
+            1000, 100, mockCompression, false, INIT_SIZE));
     OutgoingMessageEnvelope omeEncoded = new OutgoingMessageEnvelope(new SystemStream(SYSTEM_NAME, "Topic1"), new byte[100]);
     azureBlobAvroWriter.write(omeEncoded);
   }
@@ -289,7 +291,7 @@ public class TestAzureBlobAvroWriter {
         spy(new AzureBlobAvroWriter(PowerMockito.mock(BlobContainerAsyncClient.class), mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD,
             60000, "test", null, null, null,
             blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
-            Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false)); // keeping blob size and number of records unlimited
+            Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false, INIT_SIZE)); // keeping blob size and number of records unlimited
     when(azureBlobAvroWriter.encodeRecord((IndexedRecord) ome.getMessage())).thenThrow(IllegalStateException.class);
     azureBlobAvroWriter.flush(); // No NPE because has null check for currentBlobWriterComponents
   }
@@ -304,7 +306,7 @@ public class TestAzureBlobAvroWriter {
     azureBlobAvroWriter = spy(new AzureBlobAvroWriter(mockContainerClient,
         mockMetrics, threadPool, THRESHOLD, 60000, blobUrlPrefix,
         null, null, null, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
-        maxBlobSize, 10, mockCompression, true));
+        maxBlobSize, 10, mockCompression, true, INIT_SIZE));
 
     DataFileWriter<Object> mockDataFileWriter1 = (DataFileWriter<Object>) mock(DataFileWriter.class);
     PowerMockito.whenNew(DataFileWriter.class).withAnyArguments().thenReturn(mockDataFileWriter1);
@@ -317,7 +319,7 @@ public class TestAzureBlobAvroWriter {
     AzureBlobOutputStream mockAzureBlobOutputStream1 = mock(AzureBlobOutputStream.class);
     PowerMockito.whenNew(AzureBlobOutputStream.class).withArguments(mockBlockBlobAsyncClient1, threadPool,
         mockMetrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
-        (long) 60000, THRESHOLD, mockCompression).thenReturn(mockAzureBlobOutputStream1);
+        (long) 60000, THRESHOLD, mockCompression, INIT_SIZE).thenReturn(mockAzureBlobOutputStream1);
     when(mockAzureBlobOutputStream1.getSize()).thenReturn((long) maxBlobSize - 1);
 
     // first OME creates the first blob
@@ -335,7 +337,7 @@ public class TestAzureBlobAvroWriter {
     AzureBlobOutputStream mockAzureBlobOutputStream2 = mock(AzureBlobOutputStream.class);
     PowerMockito.whenNew(AzureBlobOutputStream.class).withArguments(mockBlockBlobAsyncClient2, threadPool,
         mockMetrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
-        (long) 60000, THRESHOLD, mockCompression).thenReturn(mockAzureBlobOutputStream2);
+        (long) 60000, THRESHOLD, mockCompression, INIT_SIZE).thenReturn(mockAzureBlobOutputStream2);
     when(mockAzureBlobOutputStream2.getSize()).thenReturn((long) maxBlobSize - 1);
 
     // Second OME creates the second blob because maxBlobSize is 1000 and mockAzureBlobOutputStream.getSize is 999.
@@ -367,7 +369,7 @@ public class TestAzureBlobAvroWriter {
     azureBlobAvroWriter = spy(new AzureBlobAvroWriter(mockContainerClient,
         mockMetrics, threadPool, THRESHOLD, 60000, blobUrlPrefix,
         null, null, null, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
-        maxBlobSize, maxRecordsPerBlob, mockCompression, true));
+        maxBlobSize, maxRecordsPerBlob, mockCompression, true, INIT_SIZE));
 
     DataFileWriter<Object> mockDataFileWriter1 = (DataFileWriter<Object>) mock(DataFileWriter.class);
     PowerMockito.whenNew(DataFileWriter.class).withAnyArguments().thenReturn(mockDataFileWriter1);
@@ -380,7 +382,7 @@ public class TestAzureBlobAvroWriter {
     AzureBlobOutputStream mockAzureBlobOutputStream1 = mock(AzureBlobOutputStream.class);
     PowerMockito.whenNew(AzureBlobOutputStream.class).withArguments(mockBlockBlobAsyncClient1, threadPool,
         mockMetrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
-        (long) 60000, THRESHOLD, mockCompression).thenReturn(mockAzureBlobOutputStream1);
+        (long) 60000, THRESHOLD, mockCompression, INIT_SIZE).thenReturn(mockAzureBlobOutputStream1);
     when(mockAzureBlobOutputStream1.getSize()).thenReturn((long) 1);
 
     // first OME creates the first blob and 11th OME (ome2) creates the second blob.
@@ -401,7 +403,7 @@ public class TestAzureBlobAvroWriter {
     AzureBlobOutputStream mockAzureBlobOutputStream2 = mock(AzureBlobOutputStream.class);
     PowerMockito.whenNew(AzureBlobOutputStream.class).withArguments(mockBlockBlobAsyncClient2, threadPool,
         mockMetrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
-        (long) 60000, THRESHOLD, mockCompression).thenReturn(mockAzureBlobOutputStream2);
+        (long) 60000, THRESHOLD, mockCompression, INIT_SIZE).thenReturn(mockAzureBlobOutputStream2);
     when(mockAzureBlobOutputStream2.getSize()).thenReturn((long) 1);
 
     azureBlobAvroWriter.write(ome2);
@@ -430,7 +432,7 @@ public class TestAzureBlobAvroWriter {
     azureBlobAvroWriter = spy(new AzureBlobAvroWriter(mockContainerClient,
         mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD, 60000, blobUrlPrefix,
         mockDataFileWriter, mockAzureBlobOutputStream, mockBlockBlobAsyncClient, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
-        maxBlobSize, maxRecordsPerBlob, mockCompression, false));
+        maxBlobSize, maxRecordsPerBlob, mockCompression, false, INIT_SIZE));
 
     DataFileWriter<Object> mockDataFileWriter2 = mock(DataFileWriter.class);
     AzureBlobOutputStream mockAzureBlobOutputStream2 = mock(AzureBlobOutputStream.class);
@@ -457,7 +459,7 @@ public class TestAzureBlobAvroWriter {
         mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD,
         60000, "test", mockDataFileWriter, mockAzureBlobOutputStream, mockBlockBlobAsyncClient,
         blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
-        Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false));
+        Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false, INIT_SIZE));
     IndexedRecord record = new GenericRecordEvent();
     Assert.assertTrue(Arrays.equals(encodeRecord(record), azureBlobAvroWriter.encodeRecord(record)));
   }
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
index e4f3cc56a..19a802a20 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.concurrent.CountDownLatch;
 import org.apache.samza.AzureException;
+import org.apache.samza.system.azureblob.AzureBlobConfig;
 import org.apache.samza.system.azureblob.compression.Compression;
 import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
 import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
@@ -75,6 +76,7 @@ public class TestAzureBlobOutputStream {
   private ThreadPoolExecutor threadPool;
   private ByteArrayOutputStream mockByteArrayOutputStream;
   private static final int THRESHOLD = 100;
+  private static final int INIT_SIZE = AzureBlobConfig.SYSTEM_INIT_BUFFER_SIZE_DEFAULT;
   private BlockBlobAsyncClient mockBlobAsyncClient;
   private AzureBlobOutputStream azureBlobOutputStream;
   private static final String RANDOM_STRING = "roZzozzLiR7GCEjcB0UsRUNgBAip8cSLGXQSo3RQvbIDoxOaaRs4hrec2s5rMPWgTPRY4UnE959worEtyhRjwUFnRnVuNFZ554yuPQCbI69qFkQX7MmrB4blmpSnFeGjWKjFjIRLFNVSsQBYMkr5jT4T83uVtuGumsjACVrpcilihdd194H8Y71rQcrXZoTQtw5OvmPicbwptawpHoRNzHihyaDVYgAs0dQbvVEu1gitKpamzYdMLFtc5h8PFZSVEB";
@@ -98,7 +100,7 @@ public class TestAzureBlobOutputStream {
         TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
 
 
-    mockByteArrayOutputStream = spy(new ByteArrayOutputStream(THRESHOLD));
+    mockByteArrayOutputStream = spy(new ByteArrayOutputStream(INIT_SIZE));
 
     mockBlobAsyncClient = PowerMockito.mock(BlockBlobAsyncClient.class);