You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2020/05/15 21:16:26 UTC
[samza] branch master updated: SAMZA-2522: Add metadata generator
for azure storage blob (#1358)
This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 9f24f15 SAMZA-2522: Add metadata generator for azure storage blob (#1358)
9f24f15 is described below
commit 9f24f1526d8f1c47e6e8fa4d9f68577cc2995f69
Author: lakshmi-manasa-g <mg...@linkedin.com>
AuthorDate: Fri May 15 14:16:17 2020 -0700
SAMZA-2522: Add metadata generator for azure storage blob (#1358)
API changes: New interfaces BlobMetadataGeneratorFactory, BlobMetadataGenerator and class BlobMetadataContext added. New configs with prefix systems..azureblob.metadataGeneratorConfig. The factory is wired in through systems..azureblob.metadataPropertiesGeneratorFactory.
Prior to this PR, "rawSizeBytes" metadata was always added by default to the blobs committed. Now, that will not be the case. A BlobMetadataGenerator implementation has to be provided to add this.
Upgrade instructions: Backwards incompatible. "rawSizeBytes" metadata will no longer be added by default. Instead a generator will have to be provided.
Usage instructions: Wire in the generator factory through systems..azureblob.metadataPropertiesGeneratorFactory and pass in additional configs with prefix systems..azureblob.metadataGeneratorConfig
---
.../versioned/jobs/samza-configurations.md | 2 +
.../samza/system/azureblob/AzureBlobConfig.java | 24 ++++++++++-
.../system/azureblob/avro/AzureBlobAvroWriter.java | 23 +++++++++-
.../azureblob/avro/AzureBlobAvroWriterFactory.java | 6 ++-
.../azureblob/avro/AzureBlobOutputStream.java | 40 +++++++++++++++---
.../producer/AzureBlobSystemProducer.java | 18 +++++++-
.../azureblob/producer/AzureBlobWriterFactory.java | 5 +++
.../azureblob/utils/BlobMetadataContext.java | 41 ++++++++++++++++++
.../azureblob/utils/BlobMetadataGenerator.java | 37 ++++++++++++++++
.../utils/BlobMetadataGeneratorFactory.java | 31 ++++++++++++++
.../azureblob/utils/NullBlobMetadataGenerator.java | 33 +++++++++++++++
.../utils/NullBlobMetadataGeneratorFactory.java | 33 +++++++++++++++
.../azureblob/avro/TestAzureBlobAvroWriter.java | 32 +++++++++-----
.../azureblob/avro/TestAzureBlobOutputStream.java | 33 ++++++++++++++-
.../producer/TestAzureBlobSystemProducer.java | 20 +++++++--
.../utils/TestNullBlobMetadataGenerator.java | 49 ++++++++++++++++++++++
16 files changed, 399 insertions(+), 28 deletions(-)
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 5d4daed..932c4c8 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -271,6 +271,8 @@ Configs for producing to [Azure Blob Storage](https://azure.microsoft.com/en-us/
|systems.**_system-name_**.azureblob.flushTimeoutMs|180000 (3 mins)|timeout to finish uploading all blocks before committing a blob.|
|systems.**_system-name_**.azureblob.closeTimeoutMs|300000 (5 mins)|timeout to finish committing all the blobs currently being written to. This does not include the flush timeout per blob.|
|systems.**_system-name_**.azureblob.suffixRandomStringToBlobName|true|if true, a random string of 8 chars is suffixed to the blob name to prevent name collision when more than one Samza tasks are writing to the same SSP.|
+|systems.**_system-name_**.azureblob.metadataPropertiesGeneratorFactory|`org.apache.samza.system.`<br>`azureblob.utils.`<br>`NullBlobMetadataGeneratorFactory`|Fully qualified class name of the `org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory` impl for the system producer. <br><br>The default metadata generator does not add any metadata to the blob.|
+|systems.**_system-name_**.azureblob.metadataGeneratorConfig| |Additional configs for the metadata generator should be prefixed with this string which is passed to the generator.<br>For example, to pass a "key":"value" pair to the metadata generator, add config like systems.<system-name>.azureblob.metadataGeneratorConfig.\<key\> with value \<value\>|
### <a name="state-storage"></a>[4. State Storage](#state-storage)
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java
index 92d76c4..58f206e 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java
@@ -39,7 +39,8 @@ public class AzureBlobConfig extends MapConfig {
public static final String SYSTEM_WRITER_FACTORY_CLASS_NAME_DEFAULT = "org.apache.samza.system.azureblob.avro.AzureBlobAvroWriterFactory";
// Azure Storage Account name under which the Azure container representing this system is.
- // System name = Azure container name (https://docs.microsoft.com/en-us/rest/api/storageservices/naming-and-referencing-containers--blobs--and-metadata#container-names)
+ // System name = Azure container name
+ // (https://docs.microsoft.com/en-us/rest/api/storageservices/naming-and-referencing-containers--blobs--and-metadata#container-names)
public static final String SYSTEM_AZURE_ACCOUNT_NAME = Config.SENSITIVE_PREFIX + SYSTEM_AZUREBLOB_PREFIX + "account.name";
// Azure Storage Account key associated with the Azure Storage Account
@@ -94,6 +95,18 @@ public class AzureBlobConfig extends MapConfig {
public static final String SYSTEM_SUFFIX_RANDOM_STRING_TO_BLOB_NAME = SYSTEM_AZUREBLOB_PREFIX + "suffixRandomStringToBlobName";
private static final boolean SYSTEM_SUFFIX_RANDOM_STRING_TO_BLOB_NAME_DEFAULT = true;
+ // full class name of an implementation of org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory
+ // this factory should return an implementation of org.apache.samza.system.azureblob.utils.BlobMetadataGenerator
+ // this generator will be invoked when a blob is committed to add metadata properties to it
+ public static final String SYSTEM_BLOB_METADATA_PROPERTIES_GENERATOR_FACTORY = SYSTEM_AZUREBLOB_PREFIX + "metadataPropertiesGeneratorFactory";
+ private static final String SYSTEM_BLOB_METADATA_PROPERTIES_GENERATOR_FACTORY_DEFAULT =
+ "org.apache.samza.system.azureblob.utils.NullBlobMetadataGeneratorFactory";
+
+ // Additional configs for the metadata generator should be prefixed with this string which is passed to the generator.
+ // for example, to pass a "key":"value" pair to the metadata generator, add config like
+ // systems.<system-name>.azureblob.metadataGeneratorConfig.<key> with value <value>
+ public static final String SYSTEM_BLOB_METADATA_GENERATOR_CONFIG_PREFIX = SYSTEM_AZUREBLOB_PREFIX + "metadataGeneratorConfig.";
+
public AzureBlobConfig(Config config) {
super(config);
}
@@ -185,4 +198,13 @@ public class AzureBlobConfig extends MapConfig {
public long getMaxMessagesPerBlob(String systemName) {
return getLong(String.format(SYSTEM_MAX_MESSAGES_PER_BLOB, systemName), SYSTEM_MAX_MESSAGES_PER_BLOB_DEFAULT);
}
+
+ public String getSystemBlobMetadataPropertiesGeneratorFactory(String systemName) {
+ return get(String.format(SYSTEM_BLOB_METADATA_PROPERTIES_GENERATOR_FACTORY, systemName),
+ SYSTEM_BLOB_METADATA_PROPERTIES_GENERATOR_FACTORY_DEFAULT);
+ }
+
+ public Config getSystemBlobMetadataGeneratorConfigs(String systemName) {
+ return subset(String.format(SYSTEM_BLOB_METADATA_GENERATOR_CONFIG_PREFIX, systemName));
+ }
}
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
index 7f9a926..6e74461 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
@@ -46,7 +46,9 @@ import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -109,9 +111,13 @@ public class AzureBlobAvroWriter implements AzureBlobWriter {
private final boolean useRandomStringInBlobName;
private final Object currentDataFileWriterLock = new Object();
private volatile long recordsInCurrentBlob = 0;
+ private BlobMetadataGeneratorFactory blobMetadataGeneratorFactory;
+ private Config blobMetadataGeneratorConfig;
+ private String streamName;
public AzureBlobAvroWriter(BlobContainerAsyncClient containerAsyncClient, String blobURLPrefix,
Executor blobThreadPool, AzureBlobWriterMetrics metrics,
+ BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
long maxBlobSize, long maxRecordsPerBlob) {
@@ -125,6 +131,9 @@ public class AzureBlobAvroWriter implements AzureBlobWriter {
this.useRandomStringInBlobName = useRandomStringInBlobName;
this.maxBlobSize = maxBlobSize;
this.maxRecordsPerBlob = maxRecordsPerBlob;
+ this.blobMetadataGeneratorFactory = blobMetadataGeneratorFactory;
+ this.blobMetadataGeneratorConfig = blobMetadataGeneratorConfig;
+ this.streamName = streamName;
}
/**
@@ -217,6 +226,7 @@ public class AzureBlobAvroWriter implements AzureBlobWriter {
Executor blobThreadPool, int maxBlockFlushThresholdSize, int flushTimeoutMs, String blobURLPrefix,
DataFileWriter<IndexedRecord> dataFileWriter,
AzureBlobOutputStream azureBlobOutputStream, BlockBlobAsyncClient blockBlobAsyncClient,
+ BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
long maxBlobSize, long maxRecordsPerBlob, Compression compression, boolean useRandomStringInBlobName) {
if (dataFileWriter == null || azureBlobOutputStream == null || blockBlobAsyncClient == null) {
this.currentBlobWriterComponents = null;
@@ -235,6 +245,9 @@ public class AzureBlobAvroWriter implements AzureBlobWriter {
this.useRandomStringInBlobName = useRandomStringInBlobName;
this.maxBlobSize = maxBlobSize;
this.maxRecordsPerBlob = maxRecordsPerBlob;
+ this.blobMetadataGeneratorFactory = blobMetadataGeneratorFactory;
+ this.blobMetadataGeneratorConfig = blobMetadataGeneratorConfig;
+ this.streamName = streamName;
}
@VisibleForTesting
@@ -318,8 +331,14 @@ public class AzureBlobAvroWriter implements AzureBlobWriter {
BlockBlobAsyncClient blockBlobAsyncClient = containerAsyncClient.getBlobAsyncClient(blobURL).getBlockBlobAsyncClient();
DataFileWriter<IndexedRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
- AzureBlobOutputStream azureBlobOutputStream = new AzureBlobOutputStream(blockBlobAsyncClient, blobThreadPool, metrics,
- flushTimeoutMs, maxBlockFlushThresholdSize, compression);
+ AzureBlobOutputStream azureBlobOutputStream;
+ try {
+ azureBlobOutputStream = new AzureBlobOutputStream(blockBlobAsyncClient, blobThreadPool, metrics,
+ blobMetadataGeneratorFactory, blobMetadataGeneratorConfig,
+ streamName, flushTimeoutMs, maxBlockFlushThresholdSize, compression);
+ } catch (Exception e) {
+ throw new SamzaException("Unable to create AzureBlobOutputStream", e);
+ }
dataFileWriter.create(schema, azureBlobOutputStream);
dataFileWriter.setFlushOnEveryBlock(false);
this.currentBlobWriterComponents = new BlobWriterComponents(dataFileWriter, azureBlobOutputStream, blockBlobAsyncClient);
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriterFactory.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriterFactory.java
index 2510766..0a4e019 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriterFactory.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriterFactory.java
@@ -26,6 +26,8 @@ import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
import com.azure.storage.blob.BlobContainerAsyncClient;
import java.io.IOException;
import java.util.concurrent.Executor;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
public class AzureBlobAvroWriterFactory implements AzureBlobWriterFactory {
@@ -35,9 +37,11 @@ public class AzureBlobAvroWriterFactory implements AzureBlobWriterFactory {
*/
public AzureBlobWriter getWriterInstance(BlobContainerAsyncClient containerAsyncClient, String blobURL,
Executor blobUploadThreadPool, AzureBlobWriterMetrics metrics,
+ BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
long maxBlobSize, long maxMessagesPerBlob) throws IOException {
return new AzureBlobAvroWriter(containerAsyncClient, blobURL, blobUploadThreadPool, metrics,
- maxBlockFlushThresholdSize, flushTimeoutMs, compression, useRandomStringInBlobName, maxBlobSize, maxMessagesPerBlob);
+ blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, streamName, maxBlockFlushThresholdSize, flushTimeoutMs,
+ compression, useRandomStringInBlobName, maxBlobSize, maxMessagesPerBlob);
}
}
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
index 9db15a3..716e488 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
@@ -30,7 +30,6 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Base64;
-import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -38,6 +37,10 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.azureblob.utils.BlobMetadataContext;
+import org.apache.samza.system.azureblob.utils.BlobMetadataGenerator;
+import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
@@ -68,7 +71,6 @@ public class AzureBlobOutputStream extends OutputStream {
private static final Logger LOG = LoggerFactory.getLogger(AzureBlobOutputStream.class);
private static final int MAX_ATTEMPT = 3;
private static final int MAX_BLOCKS_IN_AZURE_BLOB = 50000;
- public static final String BLOB_RAW_SIZE_BYTES_METADATA = "rawSizeBytes";
private final long flushTimeoutMs;
private final BlockBlobAsyncClient blobAsyncClient;
private final Executor blobThreadPool;
@@ -84,10 +86,27 @@ public class AzureBlobOutputStream extends OutputStream {
private volatile boolean isClosed = false;
private long totalUploadedBlockSize = 0;
private int blockNum;
+ private final BlobMetadataGeneratorFactory blobMetadataGeneratorFactory;
+ private final Config blobMetadataGeneratorConfig;
+ private String streamName;
+ /**
+ *
+ * @param blobAsyncClient Client to communicate with Azure Blob Storage.
+ * @param blobThreadPool threads to be used for uploading blocks to Azure Blob Storage.
+ * @param metrics needed for emitting metrics about bytes written, blocks uploaded, blobs committed.
+ * @param blobMetadataGeneratorFactory impl of {@link org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory}
+ * to be used for generating metadata properties for a blob
+ * @param streamName name of the stream to which the blob generated corresponds to. Used in metadata properties.
+ * @param flushTimeoutMs timeout for uploading a block
+ * @param maxBlockFlushThresholdSize max block size
+ * @param compression type of compression to be used before uploading a block
+ */
public AzureBlobOutputStream(BlockBlobAsyncClient blobAsyncClient, Executor blobThreadPool, AzureBlobWriterMetrics metrics,
+ BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
long flushTimeoutMs, int maxBlockFlushThresholdSize, Compression compression) {
- this(blobAsyncClient, blobThreadPool, metrics, flushTimeoutMs, maxBlockFlushThresholdSize,
+ this(blobAsyncClient, blobThreadPool, metrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, streamName,
+ flushTimeoutMs, maxBlockFlushThresholdSize,
new ByteArrayOutputStream(maxBlockFlushThresholdSize), compression);
}
@@ -148,6 +167,8 @@ public class AzureBlobOutputStream extends OutputStream {
* - byteArrayOutputStream.close fails or
* - any of the pending uploads fails or
* - blob's commitBlockList fails
+ * throws ClassNotFoundException or IllegalAccessException or InstantiationException
+ * - while creating an instance of BlobMetadataGenerator
*/
@Override
public synchronized void close() {
@@ -172,8 +193,8 @@ public class AzureBlobOutputStream extends OutputStream {
future.get((long) flushTimeoutMs, TimeUnit.MILLISECONDS);
LOG.info("For blob: {} committing blockList size:{}", blobAsyncClient.getBlobUrl().toString(), blockList.size());
metrics.updateAzureCommitMetrics();
- Map<String, String> blobMetadata = Collections.singletonMap(BLOB_RAW_SIZE_BYTES_METADATA, Long.toString(totalUploadedBlockSize));
- commitBlob(blockList, blobMetadata);
+ BlobMetadataGenerator blobMetadataGenerator = getBlobMetadataGenerator();
+ commitBlob(blockList, blobMetadataGenerator.getBlobMetadata(new BlobMetadataContext(streamName, totalUploadedBlockSize)));
} catch (Exception e) {
String msg = String.format("Close blob %s failed with exception. Total pending sends %d",
blobAsyncClient.getBlobUrl().toString(), pendingUpload.size());
@@ -211,6 +232,7 @@ public class AzureBlobOutputStream extends OutputStream {
@VisibleForTesting
AzureBlobOutputStream(BlockBlobAsyncClient blobAsyncClient, Executor blobThreadPool, AzureBlobWriterMetrics metrics,
+ BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
long flushTimeoutMs, int maxBlockFlushThresholdSize,
ByteArrayOutputStream byteArrayOutputStream, Compression compression) {
this.byteArrayOutputStream = Optional.of(byteArrayOutputStream);
@@ -222,6 +244,9 @@ public class AzureBlobOutputStream extends OutputStream {
this.maxBlockFlushThresholdSize = maxBlockFlushThresholdSize;
this.metrics = metrics;
this.compression = compression;
+ this.blobMetadataGeneratorFactory = blobMetadataGeneratorFactory;
+ this.blobMetadataGeneratorConfig = blobMetadataGeneratorConfig;
+ this.streamName = streamName;
}
// SAMZA-2476 stubbing BlockBlobAsyncClient.commitBlockListWithResponse was causing flaky tests.
@@ -245,6 +270,11 @@ public class AzureBlobOutputStream extends OutputStream {
isClosed = true;
}
+ @VisibleForTesting
+ BlobMetadataGenerator getBlobMetadataGenerator() throws Exception {
+ return blobMetadataGeneratorFactory.getBlobMetadataGeneratorInstance(blobMetadataGeneratorConfig);
+ }
+
/**
* This api will async upload the outputstream into block using stageBlocks,
* reint outputstream
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
index a2fa2ac..d8205a5 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
@@ -50,10 +50,12 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemProducerException;
+import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -136,6 +138,9 @@ public class AzureBlobSystemProducer implements SystemProducer {
private final Map<String, Object> sourceWriterCreationLockMap = new ConcurrentHashMap<>();
private final Map<String, ReadWriteLock> sourceSendFlushLockMap = new ConcurrentHashMap<>();
+ private final BlobMetadataGeneratorFactory blobMetadataGeneratorFactory;
+ private final Config blobMetadataGeneratorConfig;
+
public AzureBlobSystemProducer(String systemName, AzureBlobConfig config, MetricsRegistry metricsRegistry) {
Preconditions.checkNotNull(systemName, "System name can not be null when creating AzureBlobSystemProducer");
Preconditions.checkNotNull(config, "Config can not be null when creating AzureBlobSystemProducer");
@@ -171,6 +176,14 @@ public class AzureBlobSystemProducer implements SystemProducer {
this.writerMap = new ConcurrentHashMap<>();
this.metrics = new AzureBlobSystemProducerMetrics(systemName, config.getAzureAccountName(systemName), metricsRegistry);
+
+ String blobMetadataGeneratorFactoryClassName = this.config.getSystemBlobMetadataPropertiesGeneratorFactory(this.systemName);
+ try {
+ blobMetadataGeneratorFactory = (BlobMetadataGeneratorFactory) Class.forName(blobMetadataGeneratorFactoryClassName).newInstance();
+ } catch (Exception e) {
+ throw new SystemProducerException("Could not create blob metadata generator factory with name " + blobMetadataGeneratorFactoryClassName, e);
+ }
+ blobMetadataGeneratorConfig = this.config.getSystemBlobMetadataGeneratorConfigs(systemName);
}
/**
@@ -423,7 +436,7 @@ public class AzureBlobSystemProducer implements SystemProducer {
if (writer == null) {
AzureBlobWriterMetrics writerMetrics =
new AzureBlobWriterMetrics(metrics.getAggregateMetrics(), metrics.getSystemMetrics(), metrics.getSourceMetrics(source));
- writer = createNewWriter(blobURLPrefix, writerMetrics);
+ writer = createNewWriter(blobURLPrefix, writerMetrics, messageEnvelope.getSystemStream().getStream());
sourceWriterMap.put(writerMapKey, writer);
}
}
@@ -502,9 +515,10 @@ public class AzureBlobSystemProducer implements SystemProducer {
}
@VisibleForTesting
- AzureBlobWriter createNewWriter(String blobURL, AzureBlobWriterMetrics writerMetrics) {
+ AzureBlobWriter createNewWriter(String blobURL, AzureBlobWriterMetrics writerMetrics, String streamName) {
try {
return writerFactory.getWriterInstance(containerAsyncClient, blobURL, asyncBlobThreadPool, writerMetrics,
+ blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, streamName,
blockFlushThresholdSize, flushTimeoutMs,
CompressionFactory.getInstance().getCompression(config.getCompressionType(systemName)),
config.getSuffixRandomStringToBlobName(systemName),
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobWriterFactory.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobWriterFactory.java
index 87ca5a6..e9fa16d 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobWriterFactory.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobWriterFactory.java
@@ -23,6 +23,8 @@ import org.apache.samza.system.azureblob.compression.Compression;
import com.azure.storage.blob.BlobContainerAsyncClient;
import java.io.IOException;
import java.util.concurrent.Executor;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
public interface AzureBlobWriterFactory {
@@ -32,6 +34,8 @@ public interface AzureBlobWriterFactory {
* @param blobURL Azure blob url
* @param blobUploadThreadPool thread pool to be used by writer for uploading
* @param metrics metrics to measure the number of bytes written by writer
+ * @param blobMetadataGeneratorFactory factory to get generator for metadata properties for a blob
+ * @param streamName name of the stream that this AzureBlobWriter is associated with
* @param maxBlockFlushThresholdSize threshold at which to upload
* @param flushTimeoutMs timeout after which the flush is abandoned
* @return AzureBlobWriter instance
@@ -39,6 +43,7 @@ public interface AzureBlobWriterFactory {
*/
AzureBlobWriter getWriterInstance(BlobContainerAsyncClient containerAsyncClient, String blobURL,
Executor blobUploadThreadPool, AzureBlobWriterMetrics metrics,
+ BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
long maxBlobSize, long maxMessagesPerBlob) throws IOException;
}
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/BlobMetadataContext.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/BlobMetadataContext.java
new file mode 100644
index 0000000..a3460eb
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/BlobMetadataContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.azureblob.utils;
+
+/**
+ * Properties about a Blob which can then be used to generate metadata for the blob
+ */
+public class BlobMetadataContext {
+ private final String streamName;
+ private final long blobSize;
+
+ public BlobMetadataContext(String streamName, long blobSize) {
+ this.streamName = streamName;
+ this.blobSize = blobSize;
+ }
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ public long getBlobSize() {
+ return blobSize;
+ }
+}
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/BlobMetadataGenerator.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/BlobMetadataGenerator.java
new file mode 100644
index 0000000..76f5f70
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/BlobMetadataGenerator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.azureblob.utils;
+
+import java.util.Map;
+
+
+/**
+ * Interface for generating metadata properties of an Azure Blob.
+ * Implementation is not expected to be thread safe.
+ */
+public interface BlobMetadataGenerator {
+
+ /**
+ * create metadata properties for a blob
+ * @param blobMetatadataConext contains details about the blob that can be used for generating metadata
+ * @return map containing metadata properties to be associated with the blob
+ */
+ Map<String, String> getBlobMetadata(BlobMetadataContext blobMetatadataConext);
+}
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/BlobMetadataGeneratorFactory.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/BlobMetadataGeneratorFactory.java
new file mode 100644
index 0000000..61661a4
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/BlobMetadataGeneratorFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.azureblob.utils;
+
+import org.apache.samza.config.Config;
+
+
+public interface BlobMetadataGeneratorFactory {
+
+ /**
+ * Creates an instance of {@link BlobMetadataGenerator}
+ */
+ BlobMetadataGenerator getBlobMetadataGeneratorInstance(Config blobMetadataGeneratorConfig);
+}
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/NullBlobMetadataGenerator.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/NullBlobMetadataGenerator.java
new file mode 100644
index 0000000..6b6efd2
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/NullBlobMetadataGenerator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.azureblob.utils;
+
+import java.util.Map;
+
+
+public class NullBlobMetadataGenerator implements BlobMetadataGenerator {
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Map<String, String> getBlobMetadata(BlobMetadataContext blobMetadataContext) {
+ return null;
+ }
+}
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/NullBlobMetadataGeneratorFactory.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/NullBlobMetadataGeneratorFactory.java
new file mode 100644
index 0000000..107e1c6
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/utils/NullBlobMetadataGeneratorFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.azureblob.utils;
+
+import org.apache.samza.config.Config;
+
+
+public class NullBlobMetadataGeneratorFactory implements BlobMetadataGeneratorFactory {
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public BlobMetadataGenerator getBlobMetadataGeneratorInstance(Config blobMetadataGeneratorConfig) {
+ return new NullBlobMetadataGenerator();
+ }
+}
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
index b4ef4b4..62076aa 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
@@ -45,8 +45,10 @@ import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -79,6 +81,9 @@ public class TestAzureBlobAvroWriter {
private BlockBlobAsyncClient mockBlockBlobAsyncClient;
private Compression mockCompression;
+ private final BlobMetadataGeneratorFactory blobMetadataGeneratorFactory = mock(BlobMetadataGeneratorFactory.class);
+ private final Config blobMetadataGeneratorConfig = mock(Config.class);
+ private static final String STREAM_NAME = "FAKE_STREAM";
private static final String VALUE = "FAKE_VALUE";
private static final String SYSTEM_NAME = "FAKE_SYSTEM";
private static final int THRESHOLD = 100;
@@ -146,8 +151,9 @@ public class TestAzureBlobAvroWriter {
mockCompression = CompressionFactory.getInstance().getCompression(CompressionType.GZIP);
azureBlobAvroWriter =
spy(new AzureBlobAvroWriter(mockContainerAsyncClient, mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD,
- 60000, "test", mockDataFileWriter, mockAzureBlobOutputStream, mockBlockBlobAsyncClient, Long.MAX_VALUE,
- Long.MAX_VALUE, mockCompression, false)); // keeping blob size and number of records unlimited
+ 60000, "test", mockDataFileWriter, mockAzureBlobOutputStream, mockBlockBlobAsyncClient,
+ blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
+ Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false)); // keeping blob size and number of records unlimited
doReturn(encodedRecord).when(azureBlobAvroWriter).encodeRecord((IndexedRecord) ome.getMessage());
}
@Test
@@ -187,7 +193,8 @@ public class TestAzureBlobAvroWriter {
azureBlobAvroWriter =
spy(new AzureBlobAvroWriter(PowerMockito.mock(BlobContainerAsyncClient.class), mock(AzureBlobWriterMetrics.class),
threadPool, THRESHOLD, 60000, "test",
- null, null, null, 1000, 100, mockCompression, false));
+ null, null, null, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
+ 1000, 100, mockCompression, false));
OutgoingMessageEnvelope omeEncoded = new OutgoingMessageEnvelope(new SystemStream(SYSTEM_NAME, "Topic1"), new byte[100]);
azureBlobAvroWriter.write(omeEncoded);
}
@@ -242,7 +249,8 @@ public class TestAzureBlobAvroWriter {
BlobContainerAsyncClient mockContainerClient = PowerMockito.mock(BlobContainerAsyncClient.class);
azureBlobAvroWriter = spy(new AzureBlobAvroWriter(mockContainerClient,
mockMetrics, threadPool, THRESHOLD, 60000, blobUrlPrefix,
- null, null, null, maxBlobSize, 10, mockCompression, true));
+ null, null, null, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
+ maxBlobSize, 10, mockCompression, true));
DataFileWriter mockDataFileWriter1 = mock(DataFileWriter.class);
PowerMockito.whenNew(DataFileWriter.class).withAnyArguments().thenReturn(mockDataFileWriter1);
@@ -254,7 +262,7 @@ public class TestAzureBlobAvroWriter {
AzureBlobOutputStream mockAzureBlobOutputStream1 = mock(AzureBlobOutputStream.class);
PowerMockito.whenNew(AzureBlobOutputStream.class).withArguments(mockBlockBlobAsyncClient1, threadPool,
- mockMetrics,
+ mockMetrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
(long) 60000, THRESHOLD, mockCompression).thenReturn(mockAzureBlobOutputStream1);
when(mockAzureBlobOutputStream1.getSize()).thenReturn((long) maxBlobSize - 1);
@@ -272,7 +280,7 @@ public class TestAzureBlobAvroWriter {
AzureBlobOutputStream mockAzureBlobOutputStream2 = mock(AzureBlobOutputStream.class);
PowerMockito.whenNew(AzureBlobOutputStream.class).withArguments(mockBlockBlobAsyncClient2, threadPool,
- mockMetrics,
+ mockMetrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
(long) 60000, THRESHOLD, mockCompression).thenReturn(mockAzureBlobOutputStream2);
when(mockAzureBlobOutputStream2.getSize()).thenReturn((long) maxBlobSize - 1);
@@ -304,7 +312,8 @@ public class TestAzureBlobAvroWriter {
BlobContainerAsyncClient mockContainerClient = PowerMockito.mock(BlobContainerAsyncClient.class);
azureBlobAvroWriter = spy(new AzureBlobAvroWriter(mockContainerClient,
mockMetrics, threadPool, THRESHOLD, 60000, blobUrlPrefix,
- null, null, null, maxBlobSize, maxRecordsPerBlob, mockCompression, true));
+ null, null, null, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
+ maxBlobSize, maxRecordsPerBlob, mockCompression, true));
DataFileWriter mockDataFileWriter1 = mock(DataFileWriter.class);
PowerMockito.whenNew(DataFileWriter.class).withAnyArguments().thenReturn(mockDataFileWriter1);
@@ -316,7 +325,7 @@ public class TestAzureBlobAvroWriter {
AzureBlobOutputStream mockAzureBlobOutputStream1 = mock(AzureBlobOutputStream.class);
PowerMockito.whenNew(AzureBlobOutputStream.class).withArguments(mockBlockBlobAsyncClient1, threadPool,
- mockMetrics,
+ mockMetrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
(long) 60000, THRESHOLD, mockCompression).thenReturn(mockAzureBlobOutputStream1);
when(mockAzureBlobOutputStream1.getSize()).thenReturn((long) 1);
@@ -337,7 +346,7 @@ public class TestAzureBlobAvroWriter {
AzureBlobOutputStream mockAzureBlobOutputStream2 = mock(AzureBlobOutputStream.class);
PowerMockito.whenNew(AzureBlobOutputStream.class).withArguments(mockBlockBlobAsyncClient2, threadPool,
- mockMetrics,
+ mockMetrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
(long) 60000, THRESHOLD, mockCompression).thenReturn(mockAzureBlobOutputStream2);
when(mockAzureBlobOutputStream2.getSize()).thenReturn((long) 1);
@@ -366,8 +375,8 @@ public class TestAzureBlobAvroWriter {
BlobContainerAsyncClient mockContainerClient = PowerMockito.mock(BlobContainerAsyncClient.class);
azureBlobAvroWriter = spy(new AzureBlobAvroWriter(mockContainerClient,
mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD, 60000, blobUrlPrefix,
- mockDataFileWriter, mockAzureBlobOutputStream, mockBlockBlobAsyncClient, maxBlobSize, maxRecordsPerBlob,
- mockCompression, false));
+ mockDataFileWriter, mockAzureBlobOutputStream, mockBlockBlobAsyncClient, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
+ maxBlobSize, maxRecordsPerBlob, mockCompression, false));
DataFileWriter<IndexedRecord> mockDataFileWriter2 = mock(DataFileWriter.class);
AzureBlobOutputStream mockAzureBlobOutputStream2 = mock(AzureBlobOutputStream.class);
@@ -393,6 +402,7 @@ public class TestAzureBlobAvroWriter {
azureBlobAvroWriter = spy(new AzureBlobAvroWriter(PowerMockito.mock(BlobContainerAsyncClient.class),
mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD,
60000, "test", mockDataFileWriter, mockAzureBlobOutputStream, mockBlockBlobAsyncClient,
+ blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false));
IndexedRecord record = new GenericRecordEvent();
Assert.assertTrue(Arrays.equals(encodeRecord(record), azureBlobAvroWriter.encodeRecord(record)));
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
index d635693..f4780fd 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
@@ -20,6 +20,7 @@
package org.apache.samza.system.azureblob.avro;
import java.util.Arrays;
+import java.util.HashMap;
import org.apache.samza.AzureException;
import org.apache.samza.system.azureblob.compression.Compression;
import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
@@ -33,6 +34,10 @@ import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.azureblob.utils.BlobMetadataContext;
+import org.apache.samza.system.azureblob.utils.BlobMetadataGenerator;
+import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -47,7 +52,9 @@ import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.anyMap;
+import static org.mockito.Mockito.anyObject;
import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
@@ -72,6 +79,11 @@ public class TestAzureBlobOutputStream {
private static final byte[] COMPRESSED_BYTES = RANDOM_STRING.substring(0, THRESHOLD / 2).getBytes();
private AzureBlobWriterMetrics mockMetrics;
private Compression mockCompression;
+ private static final String FAKE_STREAM = "FAKE_STREAM";
+ private static final String BLOB_RAW_SIZE_BYTES_METADATA = "rawSizeBytes";
+ private static final String BLOB_STREAM_NAME_METADATA = "streamName";
+ private final BlobMetadataGeneratorFactory blobMetadataGeneratorFactory = mock(BlobMetadataGeneratorFactory.class);
+ private final Config blobMetadataGeneratorConfig = mock(Config.class);
@Before
public void setup() throws Exception {
@@ -90,12 +102,25 @@ public class TestAzureBlobOutputStream {
mockCompression = mock(Compression.class);
doReturn(COMPRESSED_BYTES).when(mockCompression).compress(BYTES);
+ BlobMetadataGenerator mockBlobMetadataGenerator = mock(BlobMetadataGenerator.class);
+ doAnswer(invocation -> {
+ BlobMetadataContext blobMetadataContext = invocation.getArgumentAt(0, BlobMetadataContext.class);
+ String streamName = blobMetadataContext.getStreamName();
+ Long blobSize = blobMetadataContext.getBlobSize();
+ Map<String, String> metadataProperties = new HashMap<>();
+ metadataProperties.put(BLOB_STREAM_NAME_METADATA, streamName);
+ metadataProperties.put(BLOB_RAW_SIZE_BYTES_METADATA, Long.toString(blobSize));
+ return metadataProperties;
+ }).when(mockBlobMetadataGenerator).getBlobMetadata(anyObject());
+
azureBlobOutputStream = spy(new AzureBlobOutputStream(mockBlobAsyncClient, threadPool, mockMetrics,
+ blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, FAKE_STREAM,
60000, THRESHOLD, mockByteArrayOutputStream, mockCompression));
doNothing().when(azureBlobOutputStream).commitBlob(any(ArrayList.class), anyMap());
doNothing().when(azureBlobOutputStream).stageBlock(anyString(), any(ByteBuffer.class), anyInt());
doNothing().when(azureBlobOutputStream).clearAndMarkClosed();
+ doReturn(mockBlobMetadataGenerator).when(azureBlobOutputStream).getBlobMetadataGenerator();
}
@Test
@@ -216,7 +241,8 @@ public class TestAzureBlobOutputStream {
verify(azureBlobOutputStream).commitBlob(blockListArgument.capture(), blobMetadataArg.capture());
Assert.assertEquals(Arrays.asList(blockIdEncoded), blockListArgument.getAllValues().get(0));
Map<String, String> blobMetadata = (Map<String, String>) blobMetadataArg.getAllValues().get(0);
- Assert.assertEquals(blobMetadata.get(AzureBlobOutputStream.BLOB_RAW_SIZE_BYTES_METADATA), Long.toString(THRESHOLD));
+ Assert.assertEquals(blobMetadata.get(BLOB_RAW_SIZE_BYTES_METADATA), Long.toString(THRESHOLD));
+ Assert.assertEquals(blobMetadata.get(BLOB_STREAM_NAME_METADATA), FAKE_STREAM);
}
@Test
@@ -239,13 +265,15 @@ public class TestAzureBlobOutputStream {
Assert.assertEquals(blockIdEncoded, blockListArgument.getAllValues().get(0).toArray()[0]);
Assert.assertEquals(blockIdEncoded1, blockListArgument.getAllValues().get(0).toArray()[1]);
Map<String, String> blobMetadata = (Map<String, String>) blobMetadataArg.getAllValues().get(0);
- Assert.assertEquals(blobMetadata.get(AzureBlobOutputStream.BLOB_RAW_SIZE_BYTES_METADATA), Long.toString(2 * THRESHOLD));
+ Assert.assertEquals(blobMetadata.get(BLOB_RAW_SIZE_BYTES_METADATA), Long.toString(2 * THRESHOLD));
+ Assert.assertEquals(blobMetadata.get(BLOB_STREAM_NAME_METADATA), FAKE_STREAM);
}
@Test(expected = AzureException.class)
public void testCloseFailed() {
azureBlobOutputStream = spy(new AzureBlobOutputStream(mockBlobAsyncClient, threadPool, mockMetrics,
+ blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, FAKE_STREAM,
60000, THRESHOLD, mockByteArrayOutputStream, mockCompression));
//doNothing().when(azureBlobOutputStream).commitBlob(any(ArrayList.class), anyMap());
@@ -286,6 +314,7 @@ public class TestAzureBlobOutputStream {
@Test (expected = AzureException.class)
public void testFlushFailed() throws IOException {
azureBlobOutputStream = spy(new AzureBlobOutputStream(mockBlobAsyncClient, threadPool, mockMetrics,
+ blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, FAKE_STREAM,
60000, THRESHOLD, mockByteArrayOutputStream, mockCompression));
doNothing().when(azureBlobOutputStream).commitBlob(any(ArrayList.class), anyMap());
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/producer/TestAzureBlobSystemProducer.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/producer/TestAzureBlobSystemProducer.java
index 2969cd0..4ad22ae 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/producer/TestAzureBlobSystemProducer.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/producer/TestAzureBlobSystemProducer.java
@@ -45,6 +45,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
@@ -95,7 +96,7 @@ public class TestAzureBlobSystemProducer {
systemProducer = spy(new AzureBlobSystemProducer(SYSTEM_NAME, azureBlobConfig, mockMetricsRegistry));
// use mock writer impl
- doReturn(mockAzureWriter).when(systemProducer).createNewWriter(anyString(), any());
+ setupWriterForProducer(systemProducer, mockAzureWriter, STREAM);
// bypass Azure connection setup
doNothing().when(systemProducer).setupAzureContainer(anyString(), anyString());
}
@@ -415,7 +416,7 @@ public class TestAzureBlobSystemProducer {
systemProducer.register(source1);
systemProducer.start();
- doReturn(mockAzureWriter1).when(systemProducer).createNewWriter(anyString(), any());
+ setupWriterForProducer(systemProducer, mockAzureWriter1, stream1);
Thread t1 = sendFlushInThread(source1, ome1, systemProducer, sendsInFirstThread);
Thread t2 = sendFlushInThread(source1, ome1, systemProducer, sendsInSecondThread);
@@ -451,7 +452,7 @@ public class TestAzureBlobSystemProducer {
// bypass Azure connection setup
doNothing().when(systemProducer).setupAzureContainer(anyString(), anyString());
- doReturn(mockAzureWriter1).when(systemProducer).createNewWriter(anyString(), any());
+ setupWriterForProducer(systemProducer, mockAzureWriter1, stream1);
systemProducer.register(source1);
systemProducer.start();
@@ -493,7 +494,7 @@ public class TestAzureBlobSystemProducer {
// bypass Azure connection setup
doNothing().when(systemProducer).setupAzureContainer(anyString(), anyString());
- doReturn(mockAzureWriter1).when(systemProducer).createNewWriter(anyString(), any());
+ setupWriterForProducer(systemProducer, mockAzureWriter1, STREAM);
systemProducer.register(source1);
systemProducer.start();
@@ -591,4 +592,15 @@ public class TestAzureBlobSystemProducer {
Config config = new MapConfig(bareConfigs);
return config;
}
+
+ private void setupWriterForProducer(AzureBlobSystemProducer azureBlobSystemProducer,
+ AzureBlobWriter mockAzureBlobWriter, String stream) {
+ doAnswer(invocation -> {
+ String blobUrl = invocation.getArgumentAt(0, String.class);
+ String streamName = invocation.getArgumentAt(2, String.class);
+ Assert.assertEquals(stream, streamName);
+ Assert.assertEquals(stream, blobUrl);
+ return mockAzureBlobWriter;
+ }).when(azureBlobSystemProducer).createNewWriter(anyString(), any(), anyString());
+ }
}
\ No newline at end of file
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/utils/TestNullBlobMetadataGenerator.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/utils/TestNullBlobMetadataGenerator.java
new file mode 100644
index 0000000..072a51c
--- /dev/null
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/utils/TestNullBlobMetadataGenerator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.azureblob.utils;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestNullBlobMetadataGenerator {
+ private NullBlobMetadataGenerator nullBlobMetadataGenerator;
+
+ @Before
+ public void setup() {
+ nullBlobMetadataGenerator = new NullBlobMetadataGenerator();
+ }
+
+ @Test
+ public void testGetBlobMetadata() {
+ Assert.assertNull(nullBlobMetadataGenerator.getBlobMetadata(new BlobMetadataContext("fake_stream", 100)));
+ }
+
+ @Test
+ public void testGetBlobMetadataEmptyInput() {
+ Assert.assertNull(nullBlobMetadataGenerator.getBlobMetadata(new BlobMetadataContext("", 0)));
+ }
+
+ @Test
+ public void testGetBlobMetadataNullInput() {
+ Assert.assertNull(nullBlobMetadataGenerator.getBlobMetadata(new BlobMetadataContext(null, 0)));
+ }
+}