You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/02/05 21:29:20 UTC

[GitHub] [nifi] jfrazee commented on a change in pull request #4576: NIFI-7886: FetchAzureBlobStorage, FetchS3Object, and FetchGCSObject processors should be able to fetch ranges

jfrazee commented on a change in pull request #4576:
URL: https://github.com/apache/nifi/pull/4576#discussion_r571251391



##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
##########
@@ -53,6 +59,34 @@
 })
 public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
 
+    public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder()
+            .name("azure-blob-range-start")
+            .displayName("Range Start")
+            .description("The byte position at which to start reading from the blob.  An empty value or a value of " +
+                    "zero will start reading at the beginning of the blob.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor RANGE_LENGTH = new PropertyDescriptor.Builder()
+            .name("azure-blob-range-length")

Review comment:
       ```suggestion
               .name("range-length")
   ```

##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
##########
@@ -62,8 +96,10 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
 
         final long startNanos = System.nanoTime();
 
-        String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
-        String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
+        final String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
+        final String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
+        final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : 0L);
+        final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : null);

Review comment:
       I have a question about the semantics of the `azure.length` attribute. It now won't match the length of the retrieved object, but instead the length of the remote object. What should be the behavior here? If we keep it as the length of the actual object, then I think we need to document that clearly.

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
##########
@@ -147,6 +171,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             request = new GetObjectRequest(bucket, key, versionId);
         }
         request.setRequesterPays(requesterPays);
+        if(rangeLength != null) {

Review comment:
       ```suggestion
           if (rangeLength != null) {
   ```

##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
##########
@@ -39,6 +48,45 @@
 @InputRequirement(Requirement.INPUT_REQUIRED)
 public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
 
+    public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder()
+            .name("adls-range-start")
+            .displayName("Range Start")
+            .description("The byte position at which to start reading from the object.  An empty value or a value of " +
+                    "zero will start reading at the beginning of the object.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor RANGE_LENGTH = new PropertyDescriptor.Builder()
+            .name("adls-range-length")
+            .displayName("Range Length")
+            .description("The number of bytes to download from the object, starting from the Range Start.  An empty " +
+                    "value will read to the end of the object.")
+            .addValidator(StandardValidators.createDataSizeBoundsValidator(1, Long.MAX_VALUE))
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor NUM_RETRIES = new PropertyDescriptor.Builder()
+            .name("adls-number-of-retries")

Review comment:
       ```suggestion
               .name("number-of-retries")
   ```

##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
##########
@@ -61,7 +115,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
                 throw new ProcessException(FILE.getDisplayName() + " (" + fileName + ") points to a directory. Full path: " + fileClient.getFilePath());
             }
 
-            flowFile = session.write(flowFile, os -> fileClient.read(os));
+            flowFile = session.write(flowFile, os -> fileClient.readWithResponse(os,  fileRange, retryOptions, null, false, null, Context.NONE));

Review comment:
       ```suggestion
               flowFile = session.write(flowFile, os -> fileClient.readWithResponse(os, fileRange, retryOptions, null, false, null, Context.NONE));
   ```

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
##########
@@ -95,10 +97,30 @@
             .defaultValue("false")
             .build();
 
+    public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder()
+            .name("s3-object-range-start")

Review comment:
       ```suggestion
               .name("range-start")
   ```

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
##########
@@ -95,10 +97,30 @@
             .defaultValue("false")
             .build();
 
+    public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder()
+            .name("s3-object-range-start")
+            .displayName("Range Start")
+            .description("The byte position at which to start reading from the object.  An empty value or a value of " +
+                    "zero will start reading at the beginning of the object.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor RANGE_LENGTH = new PropertyDescriptor.Builder()
+            .name("s3-object-range-length")

Review comment:
       ```suggestion
               .name("range-length")
   ```

##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
##########
@@ -39,6 +48,45 @@
 @InputRequirement(Requirement.INPUT_REQUIRED)
 public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
 
+    public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder()
+            .name("adls-range-start")

Review comment:
       ```suggestion
               .name("range-start")
   ```

##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
##########
@@ -53,6 +59,34 @@
 })
 public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
 
+    public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder()
+            .name("azure-blob-range-start")

Review comment:
       ```suggestion
               .name("range-start")
   ```

##########
File path: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
##########
@@ -206,11 +234,16 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
             }
 
             final ReadChannel reader = storage.reader(blobId, blobSourceOptions.toArray(new Storage.BlobSourceOption[0]));
-            flowFile = session.importFrom(Channels.newInputStream(reader), flowFile);
+            reader.seek(rangeStart);
+            if(rangeLength == null) {

Review comment:
       ```suggestion
               if (rangeLength == null) {
   ```

##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
##########
@@ -39,6 +48,45 @@
 @InputRequirement(Requirement.INPUT_REQUIRED)
 public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
 
+    public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder()
+            .name("adls-range-start")
+            .displayName("Range Start")
+            .description("The byte position at which to start reading from the object.  An empty value or a value of " +
+                    "zero will start reading at the beginning of the object.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor RANGE_LENGTH = new PropertyDescriptor.Builder()
+            .name("adls-range-length")

Review comment:
       ```suggestion
               .name("range-length")
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org