You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/04/04 18:29:25 UTC

[nifi] 30/31: NIFI-9846 Implement pagination listing for Azure List processors

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.16
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 869422d24db5041d600dd77b18ed0d1a940e07f9
Author: Timea Barna <ti...@gmail.com>
AuthorDate: Wed Mar 30 14:43:06 2022 +0200

    NIFI-9846 Implement pagination listing for Azure List processors
    
    NIFI-9846 removing paging from ListAzureBlobStorage_v12 and ListAzureDataLakeStorage, adding = to filtering
    
    This closes #5916.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../azure/storage/ListAzureBlobStorage.java        | 66 +++++++++++++---------
 .../azure/storage/ListAzureBlobStorage_v12.java    | 49 +++++++++-------
 .../azure/storage/ListAzureDataLakeStorage.java    |  8 ++-
 3 files changed, 73 insertions(+), 50 deletions(-)

diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
index eb439b4126..3e08e481f7 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
@@ -27,6 +27,8 @@ import java.util.List;
 import java.util.Map;
 
 import com.microsoft.azure.storage.OperationContext;
+import com.microsoft.azure.storage.ResultContinuation;
+import com.microsoft.azure.storage.ResultSegment;
 import com.microsoft.azure.storage.StorageUri;
 import com.microsoft.azure.storage.blob.BlobListingDetails;
 import com.microsoft.azure.storage.blob.BlobProperties;
@@ -183,6 +185,8 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
         final String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
         final String prefix = Optional.ofNullable(context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue()).orElse("");
         final List<BlobInfo> listing = new ArrayList<>();
+        final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp;
+
         try {
             final CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
             final CloudBlobContainer container = blobClient.getContainerReference(containerName);
@@ -190,34 +194,44 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
             final OperationContext operationContext = new OperationContext();
             AzureStorageUtils.setProxy(operationContext, context);
 
-            for (final ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, operationContext)) {
-                if (blob instanceof CloudBlob) {
-                    final CloudBlob cloudBlob = (CloudBlob) blob;
-                    final BlobProperties properties = cloudBlob.getProperties();
-                    final StorageUri uri = cloudBlob.getSnapshotQualifiedStorageUri();
-
-                    final Builder builder = new BlobInfo.Builder()
-                                              .primaryUri(uri.getPrimaryUri().toString())
-                                              .blobName(cloudBlob.getName())
-                                              .containerName(containerName)
-                                              .contentType(properties.getContentType())
-                                              .contentLanguage(properties.getContentLanguage())
-                                              .etag(properties.getEtag())
-                                              .lastModifiedTime(properties.getLastModified().getTime())
-                                              .length(properties.getLength());
-
-                    if (uri.getSecondaryUri() != null) {
-                        builder.secondaryUri(uri.getSecondaryUri().toString());
-                    }
-
-                    if (blob instanceof CloudBlockBlob) {
-                        builder.blobType(AzureStorageUtils.BLOCK);
-                    } else {
-                        builder.blobType(AzureStorageUtils.PAGE);
+            ResultContinuation continuationToken = null;
+
+            do {
+                final ResultSegment<ListBlobItem> result = container.listBlobsSegmented(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, continuationToken, null, operationContext);
+                continuationToken = result.getContinuationToken();
+
+                for (final ListBlobItem blob : result.getResults()) {
+                    if (blob instanceof CloudBlob) {
+                        final CloudBlob cloudBlob = (CloudBlob) blob;
+                        final BlobProperties properties = cloudBlob.getProperties();
+
+                        if (properties.getLastModified().getTime() >= minimumTimestamp) {
+                            final StorageUri uri = cloudBlob.getSnapshotQualifiedStorageUri();
+
+                            final Builder builder = new BlobInfo.Builder()
+                                    .primaryUri(uri.getPrimaryUri().toString())
+                                    .blobName(cloudBlob.getName())
+                                    .containerName(containerName)
+                                    .contentType(properties.getContentType())
+                                    .contentLanguage(properties.getContentLanguage())
+                                    .etag(properties.getEtag())
+                                    .lastModifiedTime(properties.getLastModified().getTime())
+                                    .length(properties.getLength());
+
+                            if (uri.getSecondaryUri() != null) {
+                                builder.secondaryUri(uri.getSecondaryUri().toString());
+                            }
+
+                            if (blob instanceof CloudBlockBlob) {
+                                builder.blobType(AzureStorageUtils.BLOCK);
+                            } else {
+                                builder.blobType(AzureStorageUtils.PAGE);
+                            }
+                            listing.add(builder.build());
+                        }
                     }
-                    listing.add(builder.build());
                 }
-            }
+            } while (continuationToken != null);
         } catch (final Throwable t) {
             throw new IOException(ExceptionUtils.getRootCause(t));
         }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java
index de776a88e3..82bb821774 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java
@@ -53,6 +53,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -199,33 +200,39 @@ public class ListAzureBlobStorage_v12 extends AbstractListProcessor<BlobInfo> {
     }
 
     @Override
-    protected List<BlobInfo> performListing(ProcessContext context, Long minTimestamp, ListingMode listingMode) throws IOException {
-        String containerName = context.getProperty(CONTAINER).evaluateAttributeExpressions().getValue();
-        String prefix = context.getProperty(BLOB_NAME_PREFIX).evaluateAttributeExpressions().getValue();
+    protected List<BlobInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode) throws IOException {
+        final String containerName = context.getProperty(CONTAINER).evaluateAttributeExpressions().getValue();
+        final String prefix = context.getProperty(BLOB_NAME_PREFIX).evaluateAttributeExpressions().getValue();
+        final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp;
 
         try {
-            List<BlobInfo> listing = new ArrayList<>();
+            final List<BlobInfo> listing = new ArrayList<>();
 
-            BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName);
+            final BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName);
 
-            ListBlobsOptions options = new ListBlobsOptions()
+            final ListBlobsOptions options = new ListBlobsOptions()
                     .setPrefix(prefix);
 
-            for (BlobItem blob : containerClient.listBlobs(options, null)) {
-                BlobItemProperties properties = blob.getProperties();
-
-                Builder builder = new Builder()
-                        .containerName(containerName)
-                        .blobName(blob.getName())
-                        .primaryUri(String.format("%s/%s", containerClient.getBlobContainerUrl(), blob.getName()))
-                        .etag(properties.getETag())
-                        .blobType(properties.getBlobType().toString())
-                        .contentType(properties.getContentType())
-                        .contentLanguage(properties.getContentLanguage())
-                        .lastModifiedTime(properties.getLastModified().toInstant().toEpochMilli())
-                        .length(properties.getContentLength());
-
-                listing.add(builder.build());
+            final Iterator<BlobItem> result = containerClient.listBlobs(options, null).iterator();
+
+            while (result.hasNext()) {
+                final BlobItem blob = result.next();
+                final BlobItemProperties properties = blob.getProperties();
+
+                if (properties.getLastModified().toInstant().toEpochMilli() >= minimumTimestamp) {
+                    final Builder builder = new Builder()
+                            .containerName(containerName)
+                            .blobName(blob.getName())
+                            .primaryUri(String.format("%s/%s", containerClient.getBlobContainerUrl(), blob.getName()))
+                            .etag(properties.getETag())
+                            .blobType(properties.getBlobType().toString())
+                            .contentType(properties.getContentType())
+                            .contentLanguage(properties.getContentLanguage())
+                            .lastModifiedTime(properties.getLastModified().toInstant().toEpochMilli())
+                            .length(properties.getContentLength());
+
+                    listing.add(builder.build());
+                }
             }
 
             return listing;
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
index 268b8c168e..1de2ee9b99 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
@@ -210,12 +210,12 @@ public class ListAzureDataLakeStorage extends AbstractListProcessor<ADLSFileInfo
 
     @Override
     protected List<ADLSFileInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode) throws IOException {
-        return performListing(context, listingMode, true);
+        return performListing(context, minTimestamp, listingMode, true);
     }
 
     @Override
     protected Integer countUnfilteredListing(final ProcessContext context) throws IOException {
-        return performListing(context, ListingMode.CONFIGURATION_VERIFICATION, false).size();
+        return performListing(context, null, ListingMode.CONFIGURATION_VERIFICATION, false).size();
     }
 
     @Override
@@ -238,7 +238,7 @@ public class ListAzureDataLakeStorage extends AbstractListProcessor<ADLSFileInfo
         return attributes;
     }
 
-    private List<ADLSFileInfo> performListing(final ProcessContext context, final ListingMode listingMode,
+    private List<ADLSFileInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode,
                                               final boolean applyFilters) throws IOException {
         try {
             final String fileSystem = evaluateFileSystemProperty(context, null);
@@ -256,9 +256,11 @@ public class ListAzureDataLakeStorage extends AbstractListProcessor<ADLSFileInfo
             options.setRecursive(recurseSubdirectories);
 
             final Pattern baseDirectoryPattern = Pattern.compile("^" + baseDirectory + "/?");
+            final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp;
 
             final List<ADLSFileInfo> listing = fileSystemClient.listPaths(options, null).stream()
                     .filter(pathItem -> !pathItem.isDirectory())
+                    .filter(pathItem -> pathItem.getLastModified().toInstant().toEpochMilli() >= minimumTimestamp)
                     .map(pathItem -> new ADLSFileInfo.Builder()
                             .fileSystem(fileSystem)
                             .filePath(pathItem.getName())