You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/04/04 18:29:25 UTC
[nifi] 30/31: NIFI-9846 Implement pagination listing for Azure List processors
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch support/nifi-1.16
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 869422d24db5041d600dd77b18ed0d1a940e07f9
Author: Timea Barna <ti...@gmail.com>
AuthorDate: Wed Mar 30 14:43:06 2022 +0200
NIFI-9846 Implement pagination listing for Azure List processors
NIFI-9846 removing paging from ListAzureBlobStorage_v12 and ListAzureDataLakeStorage, adding = to filtering
This closes #5916.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../azure/storage/ListAzureBlobStorage.java | 66 +++++++++++++---------
.../azure/storage/ListAzureBlobStorage_v12.java | 49 +++++++++-------
.../azure/storage/ListAzureDataLakeStorage.java | 8 ++-
3 files changed, 73 insertions(+), 50 deletions(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
index eb439b4126..3e08e481f7 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
@@ -27,6 +27,8 @@ import java.util.List;
import java.util.Map;
import com.microsoft.azure.storage.OperationContext;
+import com.microsoft.azure.storage.ResultContinuation;
+import com.microsoft.azure.storage.ResultSegment;
import com.microsoft.azure.storage.StorageUri;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
@@ -183,6 +185,8 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
final String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
final String prefix = Optional.ofNullable(context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue()).orElse("");
final List<BlobInfo> listing = new ArrayList<>();
+ final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp;
+
try {
final CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
final CloudBlobContainer container = blobClient.getContainerReference(containerName);
@@ -190,34 +194,44 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
final OperationContext operationContext = new OperationContext();
AzureStorageUtils.setProxy(operationContext, context);
- for (final ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, operationContext)) {
- if (blob instanceof CloudBlob) {
- final CloudBlob cloudBlob = (CloudBlob) blob;
- final BlobProperties properties = cloudBlob.getProperties();
- final StorageUri uri = cloudBlob.getSnapshotQualifiedStorageUri();
-
- final Builder builder = new BlobInfo.Builder()
- .primaryUri(uri.getPrimaryUri().toString())
- .blobName(cloudBlob.getName())
- .containerName(containerName)
- .contentType(properties.getContentType())
- .contentLanguage(properties.getContentLanguage())
- .etag(properties.getEtag())
- .lastModifiedTime(properties.getLastModified().getTime())
- .length(properties.getLength());
-
- if (uri.getSecondaryUri() != null) {
- builder.secondaryUri(uri.getSecondaryUri().toString());
- }
-
- if (blob instanceof CloudBlockBlob) {
- builder.blobType(AzureStorageUtils.BLOCK);
- } else {
- builder.blobType(AzureStorageUtils.PAGE);
+ ResultContinuation continuationToken = null;
+
+ do {
+ final ResultSegment<ListBlobItem> result = container.listBlobsSegmented(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, continuationToken, null, operationContext);
+ continuationToken = result.getContinuationToken();
+
+ for (final ListBlobItem blob : result.getResults()) {
+ if (blob instanceof CloudBlob) {
+ final CloudBlob cloudBlob = (CloudBlob) blob;
+ final BlobProperties properties = cloudBlob.getProperties();
+
+ if (properties.getLastModified().getTime() >= minimumTimestamp) {
+ final StorageUri uri = cloudBlob.getSnapshotQualifiedStorageUri();
+
+ final Builder builder = new BlobInfo.Builder()
+ .primaryUri(uri.getPrimaryUri().toString())
+ .blobName(cloudBlob.getName())
+ .containerName(containerName)
+ .contentType(properties.getContentType())
+ .contentLanguage(properties.getContentLanguage())
+ .etag(properties.getEtag())
+ .lastModifiedTime(properties.getLastModified().getTime())
+ .length(properties.getLength());
+
+ if (uri.getSecondaryUri() != null) {
+ builder.secondaryUri(uri.getSecondaryUri().toString());
+ }
+
+ if (blob instanceof CloudBlockBlob) {
+ builder.blobType(AzureStorageUtils.BLOCK);
+ } else {
+ builder.blobType(AzureStorageUtils.PAGE);
+ }
+ listing.add(builder.build());
+ }
}
- listing.add(builder.build());
}
- }
+ } while (continuationToken != null);
} catch (final Throwable t) {
throw new IOException(ExceptionUtils.getRootCause(t));
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java
index de776a88e3..82bb821774 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java
@@ -53,6 +53,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -199,33 +200,39 @@ public class ListAzureBlobStorage_v12 extends AbstractListProcessor<BlobInfo> {
}
@Override
- protected List<BlobInfo> performListing(ProcessContext context, Long minTimestamp, ListingMode listingMode) throws IOException {
- String containerName = context.getProperty(CONTAINER).evaluateAttributeExpressions().getValue();
- String prefix = context.getProperty(BLOB_NAME_PREFIX).evaluateAttributeExpressions().getValue();
+ protected List<BlobInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode) throws IOException {
+ final String containerName = context.getProperty(CONTAINER).evaluateAttributeExpressions().getValue();
+ final String prefix = context.getProperty(BLOB_NAME_PREFIX).evaluateAttributeExpressions().getValue();
+ final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp;
try {
- List<BlobInfo> listing = new ArrayList<>();
+ final List<BlobInfo> listing = new ArrayList<>();
- BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName);
+ final BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName);
- ListBlobsOptions options = new ListBlobsOptions()
+ final ListBlobsOptions options = new ListBlobsOptions()
.setPrefix(prefix);
- for (BlobItem blob : containerClient.listBlobs(options, null)) {
- BlobItemProperties properties = blob.getProperties();
-
- Builder builder = new Builder()
- .containerName(containerName)
- .blobName(blob.getName())
- .primaryUri(String.format("%s/%s", containerClient.getBlobContainerUrl(), blob.getName()))
- .etag(properties.getETag())
- .blobType(properties.getBlobType().toString())
- .contentType(properties.getContentType())
- .contentLanguage(properties.getContentLanguage())
- .lastModifiedTime(properties.getLastModified().toInstant().toEpochMilli())
- .length(properties.getContentLength());
-
- listing.add(builder.build());
+ final Iterator<BlobItem> result = containerClient.listBlobs(options, null).iterator();
+
+ while (result.hasNext()) {
+ final BlobItem blob = result.next();
+ final BlobItemProperties properties = blob.getProperties();
+
+ if (properties.getLastModified().toInstant().toEpochMilli() >= minimumTimestamp) {
+ final Builder builder = new Builder()
+ .containerName(containerName)
+ .blobName(blob.getName())
+ .primaryUri(String.format("%s/%s", containerClient.getBlobContainerUrl(), blob.getName()))
+ .etag(properties.getETag())
+ .blobType(properties.getBlobType().toString())
+ .contentType(properties.getContentType())
+ .contentLanguage(properties.getContentLanguage())
+ .lastModifiedTime(properties.getLastModified().toInstant().toEpochMilli())
+ .length(properties.getContentLength());
+
+ listing.add(builder.build());
+ }
}
return listing;
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
index 268b8c168e..1de2ee9b99 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
@@ -210,12 +210,12 @@ public class ListAzureDataLakeStorage extends AbstractListProcessor<ADLSFileInfo
@Override
protected List<ADLSFileInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode) throws IOException {
- return performListing(context, listingMode, true);
+ return performListing(context, minTimestamp, listingMode, true);
}
@Override
protected Integer countUnfilteredListing(final ProcessContext context) throws IOException {
- return performListing(context, ListingMode.CONFIGURATION_VERIFICATION, false).size();
+ return performListing(context, null, ListingMode.CONFIGURATION_VERIFICATION, false).size();
}
@Override
@@ -238,7 +238,7 @@ public class ListAzureDataLakeStorage extends AbstractListProcessor<ADLSFileInfo
return attributes;
}
- private List<ADLSFileInfo> performListing(final ProcessContext context, final ListingMode listingMode,
+ private List<ADLSFileInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode,
final boolean applyFilters) throws IOException {
try {
final String fileSystem = evaluateFileSystemProperty(context, null);
@@ -256,9 +256,11 @@ public class ListAzureDataLakeStorage extends AbstractListProcessor<ADLSFileInfo
options.setRecursive(recurseSubdirectories);
final Pattern baseDirectoryPattern = Pattern.compile("^" + baseDirectory + "/?");
+ final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp;
final List<ADLSFileInfo> listing = fileSystemClient.listPaths(options, null).stream()
.filter(pathItem -> !pathItem.isDirectory())
+ .filter(pathItem -> pathItem.getLastModified().toInstant().toEpochMilli() >= minimumTimestamp)
.map(pathItem -> new ADLSFileInfo.Builder()
.fileSystem(fileSystem)
.filePath(pathItem.getName())