You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2020/05/27 23:56:20 UTC

[samza] branch master updated: SAMZA-2534: AzureBlobSystemProducer: Enable adding of number of records in blob as metadata of the blob (#1364)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 583da77  SAMZA-2534: AzureBlobSystemProducer: Enable adding of number of records in blob as metadata of the blob (#1364)
583da77 is described below

commit 583da7764e2db6541b974ce61db7e1f555d02c4e
Author: lakshmi-manasa-g <mg...@linkedin.com>
AuthorDate: Wed May 27 16:55:29 2020 -0700

    SAMZA-2534: AzureBlobSystemProducer: Enable adding of number of records in blob as metadata of the blob (#1364)
    
    API changes: BlobMetadataContext passed as input to BlobMetadataGenerator interface now have an additional field giving the number of records in the blob. This can be used as needed by the generator.
    Upgrade Instructions: None
    Usage Instructions: None
---
 .../system/azureblob/avro/AzureBlobAvroWriter.java     |  3 +++
 .../system/azureblob/avro/AzureBlobOutputStream.java   | 18 +++++++++++++++++-
 .../system/azureblob/utils/BlobMetadataContext.java    |  8 +++++++-
 .../system/azureblob/avro/TestAzureBlobAvroWriter.java |  7 +++++++
 .../azureblob/avro/TestAzureBlobOutputStream.java      |  8 ++++++++
 .../azureblob/utils/TestNullBlobMetadataGenerator.java |  6 +++---
 6 files changed, 45 insertions(+), 5 deletions(-)

diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
index ba8e3aa..32a8532 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
@@ -178,6 +178,9 @@ public class AzureBlobAvroWriter implements AzureBlobWriter {
       }
       currentBlobWriterComponents.dataFileWriter.appendEncoded(ByteBuffer.wrap(encodedRecord));
       recordsInCurrentBlob++;
+      // incrementNumberOfRecordsInBlob should always be invoked every time appendEncoded above is invoked.
+      // this is to count the number records in a blob and then use that count as a metadata of the blob.
+      currentBlobWriterComponents.azureBlobOutputStream.incrementNumberOfRecordsInBlob();
     }
   }
   /**
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
index 716e488..d787351 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
@@ -85,6 +85,7 @@ public class AzureBlobOutputStream extends OutputStream {
 
   private volatile boolean isClosed = false;
   private long totalUploadedBlockSize = 0;
+  private long totalNumberOfRecordsInBlob = 0;
   private int blockNum;
   private final BlobMetadataGeneratorFactory blobMetadataGeneratorFactory;
   private final Config blobMetadataGeneratorConfig;
@@ -161,6 +162,7 @@ public class AzureBlobOutputStream extends OutputStream {
   /**
    * This api waits for all pending upload (stageBlock task) futures to finish.
    * It then synchronously commits the list of blocks to persist the actual blob on storage.
+   * Note: this method does not invoke flush and flush has to be explicitly called before close.
    * @throws IllegalStateException when
    *       - when closing an already closed stream
    * @throws RuntimeException when
@@ -194,7 +196,7 @@ public class AzureBlobOutputStream extends OutputStream {
       LOG.info("For blob: {} committing blockList size:{}", blobAsyncClient.getBlobUrl().toString(), blockList.size());
       metrics.updateAzureCommitMetrics();
       BlobMetadataGenerator blobMetadataGenerator = getBlobMetadataGenerator();
-      commitBlob(blockList, blobMetadataGenerator.getBlobMetadata(new BlobMetadataContext(streamName, totalUploadedBlockSize)));
+      commitBlob(blockList, blobMetadataGenerator.getBlobMetadata(new BlobMetadataContext(streamName, totalUploadedBlockSize, totalNumberOfRecordsInBlob)));
     } catch (Exception e) {
       String msg = String.format("Close blob %s failed with exception. Total pending sends %d",
           blobAsyncClient.getBlobUrl().toString(), pendingUpload.size());
@@ -230,6 +232,20 @@ public class AzureBlobOutputStream extends OutputStream {
     }
   }
 
+  /**
+   * This method is to be used for tracking the number of records written to the outputstream.
+   * However, since records are written in chunks through write(byte[],int,int) method,
+   * it is possible that all records are not completely written until flush is invoked.
+   *
+   * Additionally, the count of number of records is intended to be used only as part of
+   * blob's metadata at blob commit time which happens at close.
+   * Thus, the totalNumberOfRecordsInBlob is not fetched until close method.
+   * Since flush is called before close, this totalNumberOfRecordsInBlob is accurate.
+   */
+  public synchronized void incrementNumberOfRecordsInBlob() {
+    totalNumberOfRecordsInBlob++;
+  }
+
   @VisibleForTesting
   AzureBlobOutputStream(BlockBlobAsyncClient blobAsyncClient, Executor blobThreadPool, AzureBlobWriterMetrics metrics,
       BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/BlobMetadataContext.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/BlobMetadataContext.java
index a3460eb..93ed0a5 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/BlobMetadataContext.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/BlobMetadataContext.java
@@ -25,10 +25,12 @@ package org.apache.samza.system.azureblob.utils;
 public class BlobMetadataContext {
   private final String streamName;
   private final long blobSize;
+  private final long numberOfMessagesInBlob;
 
-  public BlobMetadataContext(String streamName, long blobSize) {
+  public BlobMetadataContext(String streamName, long blobSize, long numberOfMessagesInBlob) {
     this.streamName = streamName;
     this.blobSize = blobSize;
+    this.numberOfMessagesInBlob = numberOfMessagesInBlob;
   }
 
   public String getStreamName() {
@@ -38,4 +40,8 @@ public class BlobMetadataContext {
   public long getBlobSize() {
     return blobSize;
   }
+
+  public long getNumberOfMessagesInBlob() {
+    return numberOfMessagesInBlob;
+  }
 }
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
index f52b484..68544db 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
@@ -164,6 +164,7 @@ public class TestAzureBlobAvroWriter {
       azureBlobAvroWriter.write(ome);
     }
     verify(mockDataFileWriter, times(numberOfMessages)).appendEncoded(ByteBuffer.wrap(encodedRecord));
+    verify(mockAzureBlobOutputStream, times(numberOfMessages)).incrementNumberOfRecordsInBlob();
   }
 
   @Test
@@ -175,6 +176,7 @@ public class TestAzureBlobAvroWriter {
       azureBlobAvroWriter.write(omeGenericRecord);
     }
     verify(mockDataFileWriter, times(numberOfMessages)).appendEncoded(ByteBuffer.wrap(encodedRecord));
+    verify(mockAzureBlobOutputStream, times(numberOfMessages)).incrementNumberOfRecordsInBlob();
   }
 
   @Test
@@ -187,6 +189,7 @@ public class TestAzureBlobAvroWriter {
     }
     verify(mockDataFileWriter).appendEncoded(ByteBuffer.wrap(encodedRecord));
     verify(mockDataFileWriter, times(numberOfMessages)).appendEncoded(ByteBuffer.wrap((byte[]) omeEncoded.getMessage()));
+    verify(mockAzureBlobOutputStream, times(numberOfMessages + 1)).incrementNumberOfRecordsInBlob(); // +1 to account for first ome which is not encoded
   }
 
   @Test(expected = IllegalStateException.class)
@@ -422,6 +425,7 @@ public class TestAzureBlobAvroWriter {
 
     verify(mockDataFileWriter, times(10)).appendEncoded(ByteBuffer.wrap(encodedRecord));
     verify(mockDataFileWriter, times(10)).appendEncoded(ByteBuffer.wrap(encodeRecord((IndexedRecord) ome2.getMessage())));
+    verify(mockAzureBlobOutputStream, times(20)).incrementNumberOfRecordsInBlob();
   }
 
   @Test
@@ -435,6 +439,7 @@ public class TestAzureBlobAvroWriter {
     t2.join(60000);
 
     verify(mockDataFileWriter, times(10)).appendEncoded(ByteBuffer.wrap(encodedRecord));
+    verify(mockAzureBlobOutputStream, times(10)).incrementNumberOfRecordsInBlob();
     verify(mockDataFileWriter).flush();
   }
 
@@ -452,6 +457,7 @@ public class TestAzureBlobAvroWriter {
     verify(mockDataFileWriter, times(10)).appendEncoded(ByteBuffer.wrap(encodedRecord));
     verify(mockDataFileWriter, times(10)).appendEncoded(ByteBuffer.wrap(encodeRecord((IndexedRecord) ome2.getMessage())));
     verify(mockDataFileWriter, times(2)).flush();
+    verify(mockAzureBlobOutputStream, times(20)).incrementNumberOfRecordsInBlob();
   }
 
   @Test
@@ -470,6 +476,7 @@ public class TestAzureBlobAvroWriter {
     verify(mockDataFileWriter, times(10)).appendEncoded(ByteBuffer.wrap(encodeRecord((IndexedRecord) ome2.getMessage())));
     verify(mockDataFileWriter, times(2)).flush();
     verify(mockDataFileWriter).close();
+    verify(mockAzureBlobOutputStream, times(20)).incrementNumberOfRecordsInBlob();
   }
 
   private byte[] encodeRecord(IndexedRecord record) throws Exception {
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
index f4780fd..1614384 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
@@ -82,6 +82,7 @@ public class TestAzureBlobOutputStream {
   private static final String FAKE_STREAM = "FAKE_STREAM";
   private static final String BLOB_RAW_SIZE_BYTES_METADATA = "rawSizeBytes";
   private static final String BLOB_STREAM_NAME_METADATA = "streamName";
+  private static final String BLOB_RECORD_NUMBER_METADATA = "numberOfRecords";
   private final BlobMetadataGeneratorFactory blobMetadataGeneratorFactory = mock(BlobMetadataGeneratorFactory.class);
   private final Config blobMetadataGeneratorConfig = mock(Config.class);
 
@@ -107,9 +108,11 @@ public class TestAzureBlobOutputStream {
         BlobMetadataContext blobMetadataContext = invocation.getArgumentAt(0, BlobMetadataContext.class);
         String streamName = blobMetadataContext.getStreamName();
         Long blobSize = blobMetadataContext.getBlobSize();
+        Long numberOfRecords = blobMetadataContext.getNumberOfMessagesInBlob();
         Map<String, String> metadataProperties = new HashMap<>();
         metadataProperties.put(BLOB_STREAM_NAME_METADATA, streamName);
         metadataProperties.put(BLOB_RAW_SIZE_BYTES_METADATA, Long.toString(blobSize));
+        metadataProperties.put(BLOB_RECORD_NUMBER_METADATA, Long.toString(numberOfRecords));
         return metadataProperties;
       }).when(mockBlobMetadataGenerator).getBlobMetadata(anyObject());
 
@@ -229,6 +232,7 @@ public class TestAzureBlobOutputStream {
   @Test
   public void testClose() {
     azureBlobOutputStream.write(BYTES, 0, THRESHOLD);
+    azureBlobOutputStream.incrementNumberOfRecordsInBlob();
     int blockNum = 0;
     String blockId = String.format("%05d", blockNum);
     String blockIdEncoded = Base64.getEncoder().encodeToString(blockId.getBytes());
@@ -243,12 +247,15 @@ public class TestAzureBlobOutputStream {
     Map<String, String> blobMetadata = (Map<String, String>) blobMetadataArg.getAllValues().get(0);
     Assert.assertEquals(blobMetadata.get(BLOB_RAW_SIZE_BYTES_METADATA), Long.toString(THRESHOLD));
     Assert.assertEquals(blobMetadata.get(BLOB_STREAM_NAME_METADATA), FAKE_STREAM);
+    Assert.assertEquals(blobMetadata.get(BLOB_RECORD_NUMBER_METADATA), Long.toString(1));
   }
 
   @Test
   public void testCloseMultipleBlocks() {
     azureBlobOutputStream.write(BYTES, 0, THRESHOLD);
+    azureBlobOutputStream.incrementNumberOfRecordsInBlob();
     azureBlobOutputStream.write(BYTES, 0, THRESHOLD);
+    azureBlobOutputStream.incrementNumberOfRecordsInBlob();
 
     int blockNum = 0;
     String blockId = String.format("%05d", blockNum);
@@ -267,6 +274,7 @@ public class TestAzureBlobOutputStream {
     Map<String, String> blobMetadata = (Map<String, String>) blobMetadataArg.getAllValues().get(0);
     Assert.assertEquals(blobMetadata.get(BLOB_RAW_SIZE_BYTES_METADATA), Long.toString(2 * THRESHOLD));
     Assert.assertEquals(blobMetadata.get(BLOB_STREAM_NAME_METADATA), FAKE_STREAM);
+    Assert.assertEquals(blobMetadata.get(BLOB_RECORD_NUMBER_METADATA), Long.toString(2));
   }
 
   @Test(expected = AzureException.class)
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/utils/TestNullBlobMetadataGenerator.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/utils/TestNullBlobMetadataGenerator.java
index 072a51c..b7de317 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/utils/TestNullBlobMetadataGenerator.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/utils/TestNullBlobMetadataGenerator.java
@@ -34,16 +34,16 @@ public class TestNullBlobMetadataGenerator {
 
   @Test
   public void testGetBlobMetadata() {
-    Assert.assertNull(nullBlobMetadataGenerator.getBlobMetadata(new BlobMetadataContext("fake_stream", 100)));
+    Assert.assertNull(nullBlobMetadataGenerator.getBlobMetadata(new BlobMetadataContext("fake_stream", 100, 10)));
   }
 
   @Test
   public void testGetBlobMetadataEmptyInput() {
-    Assert.assertNull(nullBlobMetadataGenerator.getBlobMetadata(new BlobMetadataContext("", 0)));
+    Assert.assertNull(nullBlobMetadataGenerator.getBlobMetadata(new BlobMetadataContext("", 0, 0)));
   }
 
   @Test
   public void testGetBlobMetadataNullInput() {
-    Assert.assertNull(nullBlobMetadataGenerator.getBlobMetadata(new BlobMetadataContext(null, 0)));
+    Assert.assertNull(nullBlobMetadataGenerator.getBlobMetadata(new BlobMetadataContext(null, 0, 0)));
   }
 }