You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2019/12/23 22:21:08 UTC

[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1239: SAMZA-2421: Add SystemProducer for Azure Blob Storage

lakshmi-manasa-g commented on a change in pull request #1239: SAMZA-2421: Add SystemProducer for Azure Blob Storage
URL: https://github.com/apache/samza/pull/1239#discussion_r361015404
 
 

 ##########
 File path: samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
 ##########
 @@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.azureblob.avro;
+
+import com.azure.storage.blob.BlobContainerAsyncClient;
+import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.samza.system.azureblob.compression.Compression;
+import org.apache.samza.system.azureblob.compression.GzipCompression;
+import org.apache.samza.system.azureblob.producer.AzureBlobWriter;
+import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements {@link org.apache.samza.system.azureblob.producer.AzureBlobWriter}
+ * for writing avro records to Azure Blob Storage.
+ *
+ * It uses {@link org.apache.avro.file.DataFileWriter} to convert avro records it receives to byte[].
+ * This byte[] is passed on to {@link org.apache.samza.system.azureblob.avro.AzureBlobOutputStream}.
+ * AzureBlobOutputStream in turn uploads data to Storage as a blob.
+ *
+ * It also accepts encoded records as byte[] as long as the first OutgoingMessageEnvelope this writer receives
+ * is a decoded record from which to get the schema and record type (GenericRecord vs SpecificRecord).
+ * The subsequent encoded records are written directly to AzureBlobOutputStream without checking if they conform
+ * to the schema. It is the responsibility of the user to ensure this. Failing to do so may result in an
+ * unreadable avro blob.
+ *
+ * It expects all OutgoingMessageEnvelopes to be of the same schema.
+ * To handle schema evolution (sending envelopes of different schema), this writer has to be closed and a new writer
+ * has to be created. The first envelope of the new writer should contain a valid record to get schema from.
+ * If used by AzureBlobSystemProducer, this is done through systemProducer.flush(source).
+ *
+ * Once closed this object can not be used.
+ * This is a thread safe class.
+ *
+ * If the number of records or size of the current blob exceeds the specified limits then a new blob is created.
+ */
+public class AzureBlobAvroWriter implements AzureBlobWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(AzureBlobAvroWriter.class);
+  private static final String PUBLISHED_FILE_NAME_DATE_FORMAT = "yyyy/MM/dd/HH/mm-ss";
+  private static final String BLOB_NAME_AVRO = "%s/%s-%s.avro%s";
+  private static final String BLOB_NAME_RANDOM_STRING_AVRO = "%s/%s-%s-%s.avro%s";
+  private static final SimpleDateFormat UTC_FORMATTER = buildUTCFormatter();
+
+  // Avro's DataFileWriter has internal buffers and also adds metadata.
+  // Based on the current default sizes of these buffers and metadata, the data overhead is a little less than 100KB
+  // However, taking the overhead to be capped at 1MB to ensure enough room if the default values are increased.
+  static final long DATAFILEWRITER_OVERHEAD = 1000000; // 1MB
+
+  // currentBlobWriterComponents == null only for the first blob immediately after this AzureBlobAvroWriter object has been created.
+  // rest of this object's lifecycle, currentBlobWriterComponents is not null.
+  private BlobWriterComponents currentBlobWriterComponents = null;
+  private final List<BlobWriterComponents> allBlobWriterComponents = new ArrayList<>();
+  private Schema schema = null;
+  // datumWriter == null only for the first blob immediately after this AzureBlobAvroWriter object has been created.
+  // It is created from the schema taken from the first OutgoingMessageEnvelope. Hence the first OME has to be a decoded avro record.
+  // For rest of this object's lifecycle, datumWriter is not null.
+  private DatumWriter<IndexedRecord> datumWriter = null;
 
 Review comment:
   Reason these were not marked volatile is  because these are not accessed by multiple threads at a time. The currentDataFileWriterLock ensures only one thread can access these at a time. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services