You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2022/04/04 13:20:03 UTC

[nifi] branch NIFI-9846 created (now 4a90334728)

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a change to branch NIFI-9846
in repository https://gitbox.apache.org/repos/asf/nifi.git


      at 4a90334728 NIFI-9846 Implement pagination listing for Azure List processors

This branch includes the following new commits:

     new 4a90334728 NIFI-9846 Implement pagination listing for Azure List processors

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[nifi] 01/01: NIFI-9846 Implement pagination listing for Azure List processors

Posted by tu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch NIFI-9846
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 4a90334728372d35b326948e112fb79457bd5548
Author: Timea Barna <ti...@gmail.com>
AuthorDate: Wed Mar 30 14:43:06 2022 +0200

    NIFI-9846 Implement pagination listing for Azure List processors
    
    NIFI-9846 removing paging from ListAzureBlobStorage_v12 and ListAzureDataLakeStorage, adding = to filtering
    
    This closes #5916.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../azure/storage/ListAzureBlobStorage.java        | 66 +++++++++++++---------
 .../azure/storage/ListAzureBlobStorage_v12.java    | 49 +++++++++-------
 .../azure/storage/ListAzureDataLakeStorage.java    |  8 ++-
 3 files changed, 73 insertions(+), 50 deletions(-)

diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
index eb439b4126..3e08e481f7 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
@@ -27,6 +27,8 @@ import java.util.List;
 import java.util.Map;
 
 import com.microsoft.azure.storage.OperationContext;
+import com.microsoft.azure.storage.ResultContinuation;
+import com.microsoft.azure.storage.ResultSegment;
 import com.microsoft.azure.storage.StorageUri;
 import com.microsoft.azure.storage.blob.BlobListingDetails;
 import com.microsoft.azure.storage.blob.BlobProperties;
@@ -183,6 +185,8 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
         final String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
         final String prefix = Optional.ofNullable(context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue()).orElse("");
         final List<BlobInfo> listing = new ArrayList<>();
+        final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp;
+
         try {
             final CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
             final CloudBlobContainer container = blobClient.getContainerReference(containerName);
@@ -190,34 +194,44 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
             final OperationContext operationContext = new OperationContext();
             AzureStorageUtils.setProxy(operationContext, context);
 
-            for (final ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, operationContext)) {
-                if (blob instanceof CloudBlob) {
-                    final CloudBlob cloudBlob = (CloudBlob) blob;
-                    final BlobProperties properties = cloudBlob.getProperties();
-                    final StorageUri uri = cloudBlob.getSnapshotQualifiedStorageUri();
-
-                    final Builder builder = new BlobInfo.Builder()
-                                              .primaryUri(uri.getPrimaryUri().toString())
-                                              .blobName(cloudBlob.getName())
-                                              .containerName(containerName)
-                                              .contentType(properties.getContentType())
-                                              .contentLanguage(properties.getContentLanguage())
-                                              .etag(properties.getEtag())
-                                              .lastModifiedTime(properties.getLastModified().getTime())
-                                              .length(properties.getLength());
-
-                    if (uri.getSecondaryUri() != null) {
-                        builder.secondaryUri(uri.getSecondaryUri().toString());
-                    }
-
-                    if (blob instanceof CloudBlockBlob) {
-                        builder.blobType(AzureStorageUtils.BLOCK);
-                    } else {
-                        builder.blobType(AzureStorageUtils.PAGE);
+            ResultContinuation continuationToken = null;
+
+            do {
+                final ResultSegment<ListBlobItem> result = container.listBlobsSegmented(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, continuationToken, null, operationContext);
+                continuationToken = result.getContinuationToken();
+
+                for (final ListBlobItem blob : result.getResults()) {
+                    if (blob instanceof CloudBlob) {
+                        final CloudBlob cloudBlob = (CloudBlob) blob;
+                        final BlobProperties properties = cloudBlob.getProperties();
+
+                        if (properties.getLastModified().getTime() >= minimumTimestamp) {
+                            final StorageUri uri = cloudBlob.getSnapshotQualifiedStorageUri();
+
+                            final Builder builder = new BlobInfo.Builder()
+                                    .primaryUri(uri.getPrimaryUri().toString())
+                                    .blobName(cloudBlob.getName())
+                                    .containerName(containerName)
+                                    .contentType(properties.getContentType())
+                                    .contentLanguage(properties.getContentLanguage())
+                                    .etag(properties.getEtag())
+                                    .lastModifiedTime(properties.getLastModified().getTime())
+                                    .length(properties.getLength());
+
+                            if (uri.getSecondaryUri() != null) {
+                                builder.secondaryUri(uri.getSecondaryUri().toString());
+                            }
+
+                            if (blob instanceof CloudBlockBlob) {
+                                builder.blobType(AzureStorageUtils.BLOCK);
+                            } else {
+                                builder.blobType(AzureStorageUtils.PAGE);
+                            }
+                            listing.add(builder.build());
+                        }
                     }
-                    listing.add(builder.build());
                 }
-            }
+            } while (continuationToken != null);
         } catch (final Throwable t) {
             throw new IOException(ExceptionUtils.getRootCause(t));
         }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java
index de776a88e3..82bb821774 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java
@@ -53,6 +53,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -199,33 +200,39 @@ public class ListAzureBlobStorage_v12 extends AbstractListProcessor<BlobInfo> {
     }
 
     @Override
-    protected List<BlobInfo> performListing(ProcessContext context, Long minTimestamp, ListingMode listingMode) throws IOException {
-        String containerName = context.getProperty(CONTAINER).evaluateAttributeExpressions().getValue();
-        String prefix = context.getProperty(BLOB_NAME_PREFIX).evaluateAttributeExpressions().getValue();
+    protected List<BlobInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode) throws IOException {
+        final String containerName = context.getProperty(CONTAINER).evaluateAttributeExpressions().getValue();
+        final String prefix = context.getProperty(BLOB_NAME_PREFIX).evaluateAttributeExpressions().getValue();
+        final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp;
 
         try {
-            List<BlobInfo> listing = new ArrayList<>();
+            final List<BlobInfo> listing = new ArrayList<>();
 
-            BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName);
+            final BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName);
 
-            ListBlobsOptions options = new ListBlobsOptions()
+            final ListBlobsOptions options = new ListBlobsOptions()
                     .setPrefix(prefix);
 
-            for (BlobItem blob : containerClient.listBlobs(options, null)) {
-                BlobItemProperties properties = blob.getProperties();
-
-                Builder builder = new Builder()
-                        .containerName(containerName)
-                        .blobName(blob.getName())
-                        .primaryUri(String.format("%s/%s", containerClient.getBlobContainerUrl(), blob.getName()))
-                        .etag(properties.getETag())
-                        .blobType(properties.getBlobType().toString())
-                        .contentType(properties.getContentType())
-                        .contentLanguage(properties.getContentLanguage())
-                        .lastModifiedTime(properties.getLastModified().toInstant().toEpochMilli())
-                        .length(properties.getContentLength());
-
-                listing.add(builder.build());
+            final Iterator<BlobItem> result = containerClient.listBlobs(options, null).iterator();
+
+            while (result.hasNext()) {
+                final BlobItem blob = result.next();
+                final BlobItemProperties properties = blob.getProperties();
+
+                if (properties.getLastModified().toInstant().toEpochMilli() >= minimumTimestamp) {
+                    final Builder builder = new Builder()
+                            .containerName(containerName)
+                            .blobName(blob.getName())
+                            .primaryUri(String.format("%s/%s", containerClient.getBlobContainerUrl(), blob.getName()))
+                            .etag(properties.getETag())
+                            .blobType(properties.getBlobType().toString())
+                            .contentType(properties.getContentType())
+                            .contentLanguage(properties.getContentLanguage())
+                            .lastModifiedTime(properties.getLastModified().toInstant().toEpochMilli())
+                            .length(properties.getContentLength());
+
+                    listing.add(builder.build());
+                }
             }
 
             return listing;
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
index 268b8c168e..1de2ee9b99 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
@@ -210,12 +210,12 @@ public class ListAzureDataLakeStorage extends AbstractListProcessor<ADLSFileInfo
 
     @Override
     protected List<ADLSFileInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode) throws IOException {
-        return performListing(context, listingMode, true);
+        return performListing(context, minTimestamp, listingMode, true);
     }
 
     @Override
     protected Integer countUnfilteredListing(final ProcessContext context) throws IOException {
-        return performListing(context, ListingMode.CONFIGURATION_VERIFICATION, false).size();
+        return performListing(context, null, ListingMode.CONFIGURATION_VERIFICATION, false).size();
     }
 
     @Override
@@ -238,7 +238,7 @@ public class ListAzureDataLakeStorage extends AbstractListProcessor<ADLSFileInfo
         return attributes;
     }
 
-    private List<ADLSFileInfo> performListing(final ProcessContext context, final ListingMode listingMode,
+    private List<ADLSFileInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode,
                                               final boolean applyFilters) throws IOException {
         try {
             final String fileSystem = evaluateFileSystemProperty(context, null);
@@ -256,9 +256,11 @@ public class ListAzureDataLakeStorage extends AbstractListProcessor<ADLSFileInfo
             options.setRecursive(recurseSubdirectories);
 
             final Pattern baseDirectoryPattern = Pattern.compile("^" + baseDirectory + "/?");
+            final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp;
 
             final List<ADLSFileInfo> listing = fileSystemClient.listPaths(options, null).stream()
                     .filter(pathItem -> !pathItem.isDirectory())
+                    .filter(pathItem -> pathItem.getLastModified().toInstant().toEpochMilli() >= minimumTimestamp)
                     .map(pathItem -> new ADLSFileInfo.Builder()
                             .fileSystem(fileSystem)
                             .filePath(pathItem.getName())