You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2022/01/10 19:36:08 UTC
[nifi] branch main updated: NIFI-8676 Added 'Tracking Entities' listing strategy to 'ListS3' and 'ListGCSBucket'
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 990285b NIFI-8676 Added 'Tracking Entities' listing strategy to 'ListS3' and 'ListGCSBucket'
990285b is described below
commit 990285ba1c1429b436af1cab4b36ab8a3797bb14
Author: Tamas Palfy <ta...@gmail.com>
AuthorDate: Thu Sep 23 18:36:03 2021 +0200
NIFI-8676 Added 'Tracking Entities' listing strategy to 'ListS3' and 'ListGCSBucket'
This closes #5413.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../nifi-aws-bundle/nifi-aws-processors/pom.xml | 4 +
.../org/apache/nifi/processors/aws/s3/ListS3.java | 321 +++++++++++++++++----
.../processor/util/list/ListableEntityWrapper.java | 67 +++++
.../processor/util/list/ListedEntityTracker.java | 18 +-
.../nifi-gcp-bundle/nifi-gcp-processors/pom.xml | 4 +
.../nifi/processors/gcp/storage/ListGCSBucket.java | 235 ++++++++++++++-
6 files changed, 566 insertions(+), 83 deletions(-)
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
index 03e2b41..1c8b255 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
@@ -31,6 +31,10 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index 5ee1a83..57065f3 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -43,6 +43,9 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
@@ -60,8 +63,12 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.list.ListableEntityWrapper;
+import org.apache.nifi.processor.util.list.ListedEntity;
+import org.apache.nifi.processor.util.list.ListedEntityTracker;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
@@ -87,6 +94,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
@PrimaryNodeOnly
@TriggerSerially
@@ -117,6 +126,45 @@ import java.util.concurrent.atomic.AtomicReference;
@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
+ public static final AllowableValue BY_TIMESTAMPS = new AllowableValue("timestamps", "Tracking Timestamps",
+ "This strategy tracks the latest timestamp of listed entity to determine new/updated entities." +
+ " Since it only tracks few timestamps, it can manage listing state efficiently." +
+ " This strategy will not pick up any newly added or modified entity if their timestamps are older than the tracked latest timestamp." +
+ " Also may miss files when multiple subdirectories are being written at the same time while listing is running.");
+
+ public static final AllowableValue BY_ENTITIES = new AllowableValue("entities", "Tracking Entities",
+ "This strategy tracks information of all the listed entities within the latest 'Entity Tracking Time Window' to determine new/updated entities." +
+ " This strategy can pick entities having old timestamp that can be missed with 'Tracing Timestamps'." +
+ " Works even when multiple subdirectories are being written at the same time while listing is running." +
+ " However an additional DistributedMapCache controller service is required and more JVM heap memory is used." +
+ " For more information on how the 'Entity Tracking Time Window' property works, see the description.");
+
+ public static final PropertyDescriptor LISTING_STRATEGY = new Builder()
+ .name("listing-strategy")
+ .displayName("Listing Strategy")
+ .description("Specify how to determine new/updated entities. See each strategy descriptions for detail.")
+ .required(true)
+ .allowableValues(BY_TIMESTAMPS, BY_ENTITIES)
+ .defaultValue(BY_TIMESTAMPS.getValue())
+ .build();
+
+ public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(ListedEntityTracker.TRACKING_STATE_CACHE)
+ .dependsOn(LISTING_STRATEGY, BY_ENTITIES)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor INITIAL_LISTING_TARGET = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(ListedEntityTracker.INITIAL_LISTING_TARGET)
+ .dependsOn(LISTING_STRATEGY, BY_ENTITIES)
+ .build();
+
+ public static final PropertyDescriptor TRACKING_TIME_WINDOW = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(ListedEntityTracker.TRACKING_TIME_WINDOW)
+ .dependsOn(INITIAL_LISTING_TARGET, ListedEntityTracker.INITIAL_LISTING_TARGET_WINDOW)
+ .required(true)
+ .build();
+
public static final PropertyDescriptor DELIMITER = new Builder()
.name("delimiter")
.displayName("Delimiter")
@@ -223,6 +271,10 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
+ LISTING_STRATEGY,
+ TRACKING_STATE_CACHE,
+ INITIAL_LISTING_TARGET,
+ TRACKING_TIME_WINDOW,
BUCKET,
REGION,
ACCESS_KEY,
@@ -257,6 +309,40 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
// State tracking
private final AtomicReference<ListingSnapshot> listing = new AtomicReference<>(new ListingSnapshot(0L, Collections.emptySet()));
+ private volatile boolean justElectedPrimaryNode = false;
+ private volatile boolean resetEntityTrackingState = false;
+ private volatile ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>> listedEntityTracker;
+
+ @OnPrimaryNodeStateChange
+ public void onPrimaryNodeChange(final PrimaryNodeState newState) {
+ justElectedPrimaryNode = (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE);
+ }
+
+ @OnScheduled
+ public void initListedEntityTracker(ProcessContext context) {
+ final boolean isTrackingEntityStrategy = BY_ENTITIES.getValue().equals(context.getProperty(LISTING_STRATEGY).getValue());
+ if (listedEntityTracker != null && (resetEntityTrackingState || !isTrackingEntityStrategy)) {
+ try {
+ listedEntityTracker.clearListedEntities();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to reset previously listed entities due to " + e, e);
+ }
+ }
+ resetEntityTrackingState = false;
+
+ if (isTrackingEntityStrategy) {
+ if (listedEntityTracker == null) {
+ listedEntityTracker = createListedEntityTracker();
+ }
+ } else {
+ listedEntityTracker = null;
+ }
+ }
+
+ protected ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>> createListedEntityTracker() {
+ return new ListedS3VersionSummaryTracker();
+ }
+
private static Validator createRequesterPaysValidator() {
return new Validator() {
@Override
@@ -334,6 +420,18 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ final String listingStrategy = context.getProperty(LISTING_STRATEGY).getValue();
+
+ if (BY_TIMESTAMPS.equals(listingStrategy)) {
+ listByTrackingTimestamps(context, session);
+ } else if (BY_ENTITIES.equals(listingStrategy)) {
+ listByTrackingEntities(context, session);
+ } else {
+ throw new ProcessException("Unknown listing strategy: " + listingStrategy);
+ }
+ }
+
+ private void listByTrackingTimestamps(ProcessContext context, ProcessSession session) {
try {
restoreState(session);
} catch (IOException ioe) {
@@ -360,7 +458,6 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
int totalListCount = 0;
long latestListedTimestampInThisCycle = currentTimestamp;
- VersionListing versionListing;
final Set<String> listedKeys = new HashSet<>();
getLogger().trace("Start listing, listingTimestamp={}, currentTimestamp={}, currentKeys={}", new Object[]{listingTimestamp, currentTimestamp, currentKeys});
@@ -375,69 +472,51 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
try {
writer.beginListing();
- do {
- versionListing = bucketLister.listVersions();
- for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
- long lastModified = versionSummary.getLastModified().getTime();
- if (lastModified < currentTimestamp
- || lastModified == currentTimestamp && currentKeys.contains(versionSummary.getKey())
- || lastModified > (listingTimestamp - minAgeMilliseconds)) {
- continue;
- }
-
- getLogger().trace("Listed key={}, lastModified={}, currentKeys={}", new Object[]{versionSummary.getKey(), lastModified, currentKeys});
-
- // Get object tags if configured to do so
- GetObjectTaggingResult taggingResult = null;
- if (context.getProperty(WRITE_OBJECT_TAGS).asBoolean()) {
- try {
- taggingResult = client.getObjectTagging(new GetObjectTaggingRequest(versionSummary.getBucketName(), versionSummary.getKey()));
- } catch (final Exception e) {
- getLogger().warn("Failed to obtain Object Tags for S3 Object {} in bucket {}. Will list S3 Object without the object tags",
- new Object[] {versionSummary.getKey(), versionSummary.getBucketName()}, e);
- }
- }
-
- // Get user metadata if configured to do so
- ObjectMetadata objectMetadata = null;
- if (context.getProperty(WRITE_USER_METADATA).asBoolean()) {
- try {
- objectMetadata = client.getObjectMetadata(new GetObjectMetadataRequest(versionSummary.getBucketName(), versionSummary.getKey()));
- } catch (final Exception e) {
- getLogger().warn("Failed to obtain User Metadata for S3 Object {} in bucket {}. Will list S3 Object without the user metadata",
- new Object[] {versionSummary.getKey(), versionSummary.getBucketName()}, e);
- }
- }
-
- // Write the entity to the listing
- writer.addToListing(versionSummary, taggingResult, objectMetadata);
-
- // Track the latest lastModified timestamp and keys having that timestamp.
- // NOTE: Amazon S3 lists objects in UTF-8 character encoding in lexicographical order. Not ordered by timestamps.
- if (lastModified > latestListedTimestampInThisCycle) {
- latestListedTimestampInThisCycle = lastModified;
- listedKeys.clear();
- listedKeys.add(versionSummary.getKey());
-
- } else if (lastModified == latestListedTimestampInThisCycle) {
- listedKeys.add(versionSummary.getKey());
- }
-
- listCount++;
+ do {
+ VersionListing versionListing = bucketLister.listVersions();
+ for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
+ long lastModified = versionSummary.getLastModified().getTime();
+ if (lastModified < currentTimestamp
+ || lastModified == currentTimestamp && currentKeys.contains(versionSummary.getKey())
+ || lastModified > (listingTimestamp - minAgeMilliseconds)) {
+ continue;
}
- bucketLister.setNextMarker();
- totalListCount += listCount;
+ getLogger().trace("Listed key={}, lastModified={}, currentKeys={}", new Object[]{versionSummary.getKey(), lastModified, currentKeys});
- if (listCount >= batchSize && writer.isCheckpoint()) {
- getLogger().info("Successfully listed {} new files from S3; routing to success", new Object[] {listCount});
- session.commitAsync();
+ GetObjectTaggingResult taggingResult = getTaggingResult(context, client, versionSummary);
+
+ ObjectMetadata objectMetadata = getObjectMetadata(context, client, versionSummary);
+
+ // Write the entity to the listing
+ writer.addToListing(versionSummary, taggingResult, objectMetadata);
+
+ // Track the latest lastModified timestamp and keys having that timestamp.
+ // NOTE: Amazon S3 lists objects in UTF-8 character encoding in lexicographical order. Not ordered by timestamps.
+ if (lastModified > latestListedTimestampInThisCycle) {
+ latestListedTimestampInThisCycle = lastModified;
+ listedKeys.clear();
+ listedKeys.add(versionSummary.getKey());
+
+ } else if (lastModified == latestListedTimestampInThisCycle) {
+ listedKeys.add(versionSummary.getKey());
}
- listCount = 0;
- } while (bucketLister.isTruncated());
+ listCount++;
+ }
+ bucketLister.setNextMarker();
- writer.finishListing();
+ totalListCount += listCount;
+
+ if (listCount >= batchSize && writer.isCheckpoint()) {
+ getLogger().info("Successfully listed {} new files from S3; routing to success", listCount);
+ session.commitAsync();
+ }
+
+ listCount = 0;
+ } while (bucketLister.isTruncated());
+
+ writer.finishListing();
} catch (final Exception e) {
getLogger().error("Failed to list contents of bucket due to {}", new Object[] {e}, e);
writer.finishListingExceptionally(e);
@@ -468,7 +547,123 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
}
}
- private S3BucketLister getS3BucketLister(final ProcessContext context, final AmazonS3 client) {
+ private void listByTrackingEntities(ProcessContext context, ProcessSession session) {
+ listedEntityTracker.trackEntities(context, session, justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
+ S3BucketLister bucketLister = getS3BucketLister(context, getClient());
+
+ List<ListableEntityWrapper<S3VersionSummary>> listedEntities = bucketLister.listVersions().getVersionSummaries()
+ .stream()
+ .filter(s3VersionSummary -> s3VersionSummary.getLastModified().getTime() >= minTimestampToList)
+ .map(s3VersionSummary -> new ListableEntityWrapper<S3VersionSummary>(
+ s3VersionSummary,
+ S3VersionSummary::getKey,
+ summary -> summary.getKey() + "_" + summary.getVersionId(),
+ summary -> summary.getLastModified().getTime(),
+ S3VersionSummary::getSize
+ ))
+ .collect(Collectors.toList());
+
+ return listedEntities;
+ }, null);
+
+ justElectedPrimaryNode = false;
+ }
+
+ private class ListedS3VersionSummaryTracker extends ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>> {
+ public ListedS3VersionSummaryTracker() {
+ super(getIdentifier(), getLogger(), RecordObjectWriter.RECORD_SCHEMA);
+ }
+
+ @Override
+ protected void createRecordsForEntities(
+ ProcessContext context,
+ ProcessSession session,
+ List<ListableEntityWrapper<S3VersionSummary>> updatedEntities
+ ) throws IOException, SchemaNotFoundException {
+ publishListing(context, session, updatedEntities);
+ }
+
+ @Override
+ protected void createFlowFilesForEntities(
+ ProcessContext context,
+ ProcessSession session,
+ List<ListableEntityWrapper<S3VersionSummary>> updatedEntities,
+ Function<ListableEntityWrapper<S3VersionSummary>, Map<String, String>> createAttributes
+ ) {
+ publishListing(context, session, updatedEntities);
+ }
+
+ private void publishListing(ProcessContext context, ProcessSession session, List<ListableEntityWrapper<S3VersionSummary>> updatedEntities) {
+ final S3ObjectWriter writer;
+ final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ if (writerFactory == null) {
+ writer = new AttributeObjectWriter(session);
+ } else {
+ writer = new RecordObjectWriter(session, writerFactory, getLogger());
+ }
+
+ try {
+ writer.beginListing();
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
+ int listCount = 0;
+ for (ListableEntityWrapper<S3VersionSummary> updatedEntity : updatedEntities) {
+ S3VersionSummary s3VersionSummary = updatedEntity.getRawEntity();
+
+ GetObjectTaggingResult taggingResult = getTaggingResult(context, getClient(), s3VersionSummary);
+ ObjectMetadata objectMetadata = getObjectMetadata(context, getClient(), s3VersionSummary);
+
+ writer.addToListing(s3VersionSummary, taggingResult, objectMetadata);
+
+ listCount++;
+
+ if (listCount >= batchSize && writer.isCheckpoint()) {
+ getLogger().info("Successfully listed {} new files from S3; routing to success", new Object[]{listCount});
+ session.commitAsync();
+ }
+
+ final ListedEntity listedEntity = new ListedEntity(updatedEntity.getTimestamp(), updatedEntity.getSize());
+ alreadyListedEntities.put(updatedEntity.getIdentifier(), listedEntity);
+ }
+
+ writer.finishListing();
+ } catch (final Exception e) {
+ getLogger().error("Failed to list contents of bucket due to {}", new Object[]{e}, e);
+ writer.finishListingExceptionally(e);
+ session.rollback();
+ context.yield();
+ return;
+ }
+ }
+ }
+
+ private GetObjectTaggingResult getTaggingResult(ProcessContext context, AmazonS3 client, S3VersionSummary versionSummary) {
+ GetObjectTaggingResult taggingResult = null;
+ if (context.getProperty(WRITE_OBJECT_TAGS).asBoolean()) {
+ try {
+ taggingResult = client.getObjectTagging(new GetObjectTaggingRequest(versionSummary.getBucketName(), versionSummary.getKey()));
+ } catch (final Exception e) {
+ getLogger().warn("Failed to obtain Object Tags for S3 Object {} in bucket {}. Will list S3 Object without the object tags",
+ new Object[] {versionSummary.getKey(), versionSummary.getBucketName()}, e);
+ }
+ }
+ return taggingResult;
+ }
+
+ private ObjectMetadata getObjectMetadata(ProcessContext context, AmazonS3 client, S3VersionSummary versionSummary) {
+ ObjectMetadata objectMetadata = null;
+ if (context.getProperty(WRITE_USER_METADATA).asBoolean()) {
+ try {
+ objectMetadata = client.getObjectMetadata(new GetObjectMetadataRequest(versionSummary.getBucketName(), versionSummary.getKey()));
+ } catch (final Exception e) {
+ getLogger().warn("Failed to obtain User Metadata for S3 Object {} in bucket {}. Will list S3 Object without the user metadata",
+ new Object[] {versionSummary.getKey(), versionSummary.getBucketName()}, e);
+ }
+ }
+ return objectMetadata;
+ }
+
+ private S3BucketLister getS3BucketLister(ProcessContext context, AmazonS3 client) {
final boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean();
final boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean();
@@ -479,10 +674,10 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
final int listType = context.getProperty(LIST_TYPE).asInteger();
final S3BucketLister bucketLister = useVersions
- ? new S3VersionBucketLister(client)
- : listType == 2
- ? new S3ObjectBucketListerVersion2(client)
- : new S3ObjectBucketLister(client);
+ ? new S3VersionBucketLister(client)
+ : listType == 2
+ ? new S3ObjectBucketListerVersion2(client)
+ : new S3ObjectBucketLister(client);
bucketLister.setBucketName(bucket);
bucketLister.setRequesterPays(requesterPays);
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntityWrapper.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntityWrapper.java
new file mode 100644
index 0000000..7d8de91
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntityWrapper.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processor.util.list;
+
+import java.util.function.Function;
+
+public class ListableEntityWrapper<T> implements ListableEntity {
+
+ private T rawEntity;
+ private final Function<T, String> toName;
+ private final Function<T, String> toIdentifier;
+ private final Function<T, Long> toTimestamp;
+ private final Function<T, Long> toSize;
+
+ public ListableEntityWrapper(
+ T rawEntity,
+ Function<T, String> toName,
+ Function<T, String> toIdentifier,
+ Function<T, Long> toTimestamp,
+ Function<T, Long> toSize
+ ) {
+ this.rawEntity = rawEntity;
+ this.toName = toName;
+ this.toIdentifier = toIdentifier;
+ this.toTimestamp = toTimestamp;
+ this.toSize = toSize;
+ }
+
+ public T getRawEntity() {
+ return rawEntity;
+ }
+
+ @Override
+ public String getName() {
+ return toName.apply(rawEntity);
+ }
+
+ @Override
+ public String getIdentifier() {
+ return toIdentifier.apply(rawEntity);
+ }
+
+ @Override
+ public long getTimestamp() {
+ return toTimestamp.apply(rawEntity);
+ }
+
+ @Override
+ public long getSize() {
+ return toSize.apply(rawEntity);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
index 1a70ecb..13f1f16 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
@@ -63,7 +63,7 @@ import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCC
public class ListedEntityTracker<T extends ListableEntity> {
private final ObjectMapper objectMapper = new ObjectMapper();
- private volatile Map<String, ListedEntity> alreadyListedEntities;
+ protected volatile Map<String, ListedEntity> alreadyListedEntities;
private static final String NOTE = "Used by 'Tracking Entities' strategy.";
public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder()
@@ -100,9 +100,9 @@ public class ListedEntityTracker<T extends ListableEntity> {
.defaultValue("3 hours")
.build();
- private static final AllowableValue INITIAL_LISTING_TARGET_ALL = new AllowableValue("all", "All Available",
+ public static final AllowableValue INITIAL_LISTING_TARGET_ALL = new AllowableValue("all", "All Available",
"Regardless of entities timestamp, all existing entities will be listed at the initial listing activity.");
- private static final AllowableValue INITIAL_LISTING_TARGET_WINDOW = new AllowableValue("window", "Tracking Time Window",
+ public static final AllowableValue INITIAL_LISTING_TARGET_WINDOW = new AllowableValue("window", "Tracking Time Window",
"Ignore entities having timestamp older than the specified 'Tracking Time Window' at the initial listing activity.");
public static final PropertyDescriptor INITIAL_LISTING_TARGET = new PropertyDescriptor.Builder()
@@ -158,7 +158,7 @@ public class ListedEntityTracker<T extends ListableEntity> {
private String nodeId;
private DistributedMapCacheClient mapCacheClient;
- ListedEntityTracker(final String componentId, final ComponentLog logger, final RecordSchema recordSchema) {
+ public ListedEntityTracker(final String componentId, final ComponentLog logger, final RecordSchema recordSchema) {
this(componentId, logger, DEFAULT_CURRENT_TIMESTAMP_SUPPLIER, recordSchema);
}
@@ -166,7 +166,7 @@ public class ListedEntityTracker<T extends ListableEntity> {
* This constructor is used by unit test code so that it can produce the consistent result by controlling current timestamp.
* @param currentTimestampSupplier a function to return current timestamp.
*/
- ListedEntityTracker(final String componentId, final ComponentLog logger, final Supplier<Long> currentTimestampSupplier, final RecordSchema recordSchema) {
+ public ListedEntityTracker(final String componentId, final ComponentLog logger, final Supplier<Long> currentTimestampSupplier, final RecordSchema recordSchema) {
this.componentId = componentId;
this.logger = logger;
this.currentTimestampSupplier = currentTimestampSupplier;
@@ -222,7 +222,7 @@ public class ListedEntityTracker<T extends ListableEntity> {
return listedEntities;
}
- void clearListedEntities() throws IOException {
+ public void clearListedEntities() throws IOException {
alreadyListedEntities = null;
if (mapCacheClient != null) {
final String cacheKey = getCacheKey();
@@ -327,7 +327,7 @@ public class ListedEntityTracker<T extends ListableEntity> {
logger.error("Failed to create records for listed entities", e);
}
} else {
- createFlowFilesForEntities(session, updatedEntities, createAttributes);
+ createFlowFilesForEntities(context, session, updatedEntities, createAttributes);
}
// Commit ProcessSession before persisting listed entities.
@@ -344,7 +344,7 @@ public class ListedEntityTracker<T extends ListableEntity> {
});
}
- private void createRecordsForEntities(final ProcessContext context, final ProcessSession session, final List<T> updatedEntities) throws IOException, SchemaNotFoundException {
+ protected void createRecordsForEntities(final ProcessContext context, final ProcessSession session, final List<T> updatedEntities) throws IOException, SchemaNotFoundException {
if (updatedEntities.isEmpty()) {
logger.debug("No entities to write records for");
return;
@@ -376,7 +376,7 @@ public class ListedEntityTracker<T extends ListableEntity> {
session.transfer(flowFile, REL_SUCCESS);
}
- private void createFlowFilesForEntities(final ProcessSession session, final List<T> updatedEntities, final Function<T, Map<String, String>> createAttributes) {
+ protected void createFlowFilesForEntities(ProcessContext context, final ProcessSession session, final List<T> updatedEntities, final Function<T, Map<String, String>> createAttributes) {
for (T updatedEntity : updatedEntities) {
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, createAttributes.apply(updatedEntity));
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
index 890cb0b..5ae7453 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
@@ -52,6 +52,10 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
index dd43e78..8656327 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
@@ -35,6 +35,10 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
@@ -46,6 +50,9 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.list.ListableEntityWrapper;
+import org.apache.nifi.processor.util.list.ListedEntity;
+import org.apache.nifi.processor.util.list.ListedEntityTracker;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
@@ -68,6 +75,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_ATTR;
@@ -159,6 +167,45 @@ import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_DESC;
@WritesAttribute(attribute = URI_ATTR, description = URI_DESC)
})
public class ListGCSBucket extends AbstractGCSProcessor {
+ public static final AllowableValue BY_TIMESTAMPS = new AllowableValue("timestamps", "Tracking Timestamps",
+ "This strategy tracks the latest timestamp of listed entity to determine new/updated entities." +
+ " Since it only tracks few timestamps, it can manage listing state efficiently." +
+ " This strategy will not pick up any newly added or modified entity if their timestamps are older than the tracked latest timestamp." +
+ " Also may miss files when multiple subdirectories are being written at the same time while listing is running.");
+
+ public static final AllowableValue BY_ENTITIES = new AllowableValue("entities", "Tracking Entities",
+ "This strategy tracks information of all the listed entities within the latest 'Entity Tracking Time Window' to determine new/updated entities." +
+ " This strategy can pick entities having old timestamp that can be missed with 'Tracing Timestamps'." +
+ " Works even when multiple subdirectories are being written at the same time while listing is running." +
+ " However an additional DistributedMapCache controller service is required and more JVM heap memory is used." +
+ " For more information on how the 'Entity Tracking Time Window' property works, see the description.");
+
+ public static final PropertyDescriptor LISTING_STRATEGY = new PropertyDescriptor.Builder()
+ .name("listing-strategy")
+ .displayName("Listing Strategy")
+ .description("Specify how to determine new/updated entities. See each strategy descriptions for detail.")
+ .required(true)
+ .allowableValues(BY_TIMESTAMPS, BY_ENTITIES)
+ .defaultValue(BY_TIMESTAMPS.getValue())
+ .build();
+
+ public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(ListedEntityTracker.TRACKING_STATE_CACHE)
+ .dependsOn(LISTING_STRATEGY, BY_ENTITIES)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor INITIAL_LISTING_TARGET = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(ListedEntityTracker.INITIAL_LISTING_TARGET)
+ .dependsOn(LISTING_STRATEGY, BY_ENTITIES)
+ .build();
+
+ public static final PropertyDescriptor TRACKING_TIME_WINDOW = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(ListedEntityTracker.TRACKING_TIME_WINDOW)
+ .dependsOn(INITIAL_LISTING_TARGET, ListedEntityTracker.INITIAL_LISTING_TARGET_WINDOW)
+ .required(true)
+ .build();
+
public static final PropertyDescriptor BUCKET = new PropertyDescriptor
.Builder().name("gcs-bucket")
.displayName("Bucket")
@@ -201,6 +248,10 @@ public class ListGCSBucket extends AbstractGCSProcessor {
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return ImmutableList.<PropertyDescriptor>builder()
+ .add(LISTING_STRATEGY)
+ .add(TRACKING_STATE_CACHE)
+ .add(INITIAL_LISTING_TARGET)
+ .add(TRACKING_TIME_WINDOW)
.add(BUCKET)
.add(RECORD_WRITER)
.addAll(super.getSupportedPropertyDescriptors())
@@ -222,6 +273,39 @@ public class ListGCSBucket extends AbstractGCSProcessor {
private volatile long currentTimestamp = 0L;
private final Set<String> currentKeys = Collections.synchronizedSet(new HashSet<>());
+ private volatile boolean justElectedPrimaryNode = false;
+ private volatile boolean resetEntityTrackingState = false;
+ private volatile ListedEntityTracker<ListableBlob> listedEntityTracker;
+
+ @OnPrimaryNodeStateChange
+ public void onPrimaryNodeChange(final PrimaryNodeState newState) {
+ justElectedPrimaryNode = (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE);
+ }
+
+ @OnScheduled
+ public void initListedEntityTracker(ProcessContext context) {
+ final boolean isTrackingEntityStrategy = BY_ENTITIES.getValue().equals(context.getProperty(LISTING_STRATEGY).getValue());
+ if (listedEntityTracker != null && (resetEntityTrackingState || !isTrackingEntityStrategy)) {
+ try {
+ listedEntityTracker.clearListedEntities();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to reset previously listed entities due to " + e, e);
+ }
+ }
+ resetEntityTrackingState = false;
+
+ if (isTrackingEntityStrategy) {
+ if (listedEntityTracker == null) {
+ listedEntityTracker = createListedEntityTracker();
+ }
+ } else {
+ listedEntityTracker = null;
+ }
+ }
+
+ protected ListedEntityTracker<ListableBlob> createListedEntityTracker() {
+ return new ListedBlobTracker();
+ }
private Set<String> extractKeys(final StateMap stateMap) {
return stateMap.toMap().entrySet().parallelStream()
@@ -303,7 +387,19 @@ public class ListGCSBucket extends AbstractGCSProcessor {
}
@Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ final String listingStrategy = context.getProperty(LISTING_STRATEGY).getValue();
+
+ if (BY_TIMESTAMPS.equals(listingStrategy)) {
+ listByTrackingTimestamps(context, session);
+ } else if (BY_ENTITIES.equals(listingStrategy)) {
+ listByTrackingEntities(context, session);
+ } else {
+ throw new ProcessException("Unknown listing strategy: " + listingStrategy);
+ }
+ }
+
+ private void listByTrackingTimestamps(ProcessContext context, ProcessSession session) {
try {
restoreState(session);
} catch (IOException e) {
@@ -331,16 +427,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
private void listBucket(final ProcessContext context, final ListingAction listingAction) throws IOException, SchemaNotFoundException {
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
- final String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
- final boolean useGenerations = context.getProperty(USE_GENERATIONS).asBoolean();
-
- final List<Storage.BlobListOption> listOptions = new ArrayList<>();
- if (prefix != null) {
- listOptions.add(Storage.BlobListOption.prefix(prefix));
- }
- if (useGenerations) {
- listOptions.add(Storage.BlobListOption.versions(true));
- }
+ final List<Storage.BlobListOption> listOptions = getBlobListOptions(context);
final Storage storage = listingAction.getCloudService();
@@ -388,6 +475,51 @@ public class ListGCSBucket extends AbstractGCSProcessor {
listingAction.finishListing(listCount, maxTimestamp, keysMatchingTimestamp);
}
+ private List<Storage.BlobListOption> getBlobListOptions(ProcessContext context) {
+ final String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
+ final boolean useGenerations = context.getProperty(USE_GENERATIONS).asBoolean();
+
+ final List<Storage.BlobListOption> listOptions = new ArrayList<>();
+
+ if (prefix != null) {
+ listOptions.add(Storage.BlobListOption.prefix(prefix));
+ }
+ if (useGenerations) {
+ listOptions.add(Storage.BlobListOption.versions(true));
+ }
+
+ return listOptions;
+ }
+
+ private void listByTrackingEntities(ProcessContext context, ProcessSession session) {
+ listedEntityTracker.trackEntities(context, session, justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
+ List<ListableBlob> listedEntities = new ArrayList<>();
+
+ Storage storage = getCloudService();
+ String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+ final List<Storage.BlobListOption> listOptions = getBlobListOptions(context);
+
+ Page<Blob> blobPage = storage.list(bucket, listOptions.toArray(new Storage.BlobListOption[0]));
+ int pageNr=0;
+ do {
+ for (final Blob blob : blobPage.getValues()) {
+ if (blob.getUpdateTime() >= minTimestampToList) {
+ listedEntities.add(new ListableBlob(
+ blob,
+ pageNr
+ ));
+ }
+ }
+ blobPage = blobPage.getNextPage();
+ pageNr++;
+ } while (blobPage != null);
+
+ return listedEntities;
+ }, null);
+
+ justElectedPrimaryNode = false;
+ }
+
private void commit(final ProcessSession session, final int listCount) {
if (listCount > 0) {
getLogger().info("Successfully listed {} new files from GCS; routing to success", new Object[] {listCount});
@@ -461,6 +593,87 @@ public class ListGCSBucket extends AbstractGCSProcessor {
}
}
+ protected class ListedBlobTracker extends ListedEntityTracker<ListableBlob> {
+ public ListedBlobTracker() {
+ super(getIdentifier(), getLogger(), RecordBlobWriter.RECORD_SCHEMA);
+ }
+
+ @Override
+ protected void createRecordsForEntities(ProcessContext context, ProcessSession session, List<ListableBlob> updatedEntities) throws IOException, SchemaNotFoundException {
+ publishListing(context, session, updatedEntities);
+ }
+
+ @Override
+ protected void createFlowFilesForEntities(ProcessContext context, ProcessSession session, List<ListableBlob> updatedEntities, Function<ListableBlob, Map<String, String>> createAttributes) {
+ publishListing(context, session, updatedEntities);
+ }
+
+ private void publishListing(ProcessContext context, ProcessSession session, List<ListableBlob> updatedEntities) {
+ final BlobWriter writer;
+ final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ if (writerFactory == null) {
+ writer = new AttributeBlobWriter(session);
+ } else {
+ writer = new RecordBlobWriter(session, writerFactory, getLogger());
+ }
+
+ try {
+ writer.beginListing();
+
+ int listCount = 0;
+ int pageNr = -1;
+ for (ListableBlob listableBlob : updatedEntities) {
+ Blob blob = listableBlob.getRawEntity();
+ int currentPageNr = listableBlob.getPageNr();
+
+ writer.addToListing(blob);
+
+ listCount++;
+
+ if (pageNr != -1 && pageNr != currentPageNr && writer.isCheckpoint()) {
+ commit(session, listCount);
+ listCount = 0;
+ }
+
+ pageNr = currentPageNr;
+
+ final ListedEntity listedEntity = new ListedEntity(listableBlob.getTimestamp(), listableBlob.getSize());
+ alreadyListedEntities.put(listableBlob.getIdentifier(), listedEntity);
+ }
+
+ writer.finishListing();
+ } catch (final Exception e) {
+ getLogger().error("Failed to list contents of bucket due to {}", new Object[] {e}, e);
+ writer.finishListingExceptionally(e);
+ session.rollback();
+ context.yield();
+ return;
+ }
+ }
+ }
+
+ private static class ListableBlob extends ListableEntityWrapper<Blob> {
+ private final int pageNr;
+
+ public ListableBlob(
+ Blob blob,
+ int pageNr
+ ) {
+ super(
+ blob,
+ Blob::getName,
+ Blob::getGeneratedId,
+ Blob::getUpdateTime,
+ Blob::getSize
+ );
+ this.pageNr = pageNr;
+ }
+
+ public int getPageNr() {
+ return pageNr;
+ }
+ }
+
private class VerifyListingAction implements ListingAction<CountingBlobWriter> {
final ProcessContext context;
final CountingBlobWriter blobWriter;