You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/12/17 18:53:37 UTC

[GitHub] [nifi] turcsanyip commented on a change in pull request #5413: NIFI-8676 Added 'Tracking Entities' listing strategy to 'ListS3' and 'ListGCSBucket'

turcsanyip commented on a change in pull request #5413:
URL: https://github.com/apache/nifi/pull/5413#discussion_r771628390



##########
File path: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
##########
@@ -461,6 +592,84 @@ public void finishListing(final int listCount, final long maxTimestamp, final Se
         }
     }
 
+    protected class ListedBlobTracker extends ListedEntityTracker<ListableBlob> {
+        public ListedBlobTracker() {
+            super(getIdentifier(), getLogger(), RecordBlobWriter.RECORD_SCHEMA);
+        }
+
+        @Override
+        protected void createRecordsForEntities(ProcessContext context, ProcessSession session, List<ListableBlob> updatedEntities) throws IOException, SchemaNotFoundException {
+            publishListing(context, session, updatedEntities);
+        }
+
+        @Override
+        protected void createFlowFilesForEntities(ProcessContext context, ProcessSession session, List<ListableBlob> updatedEntities, Function<ListableBlob, Map<String, String>> createAttributes) {
+            publishListing(context, session, updatedEntities);
+        }
+
+        private void publishListing(ProcessContext context, ProcessSession session, List<ListableBlob> updatedEntities) {
+            final BlobWriter writer;
+            final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+            if (writerFactory == null) {
+                writer = new AttributeBlobWriter(session);
+            } else {
+                writer = new RecordBlobWriter(session, writerFactory, getLogger());
+            }
+
+            try {
+                writer.beginListing();
+
+                int listCount = 0;
+                int pageNr = -1;
+                for (ListableBlob listableBlob : updatedEntities) {
+                    Blob blob = listableBlob.getRawEntity();
+                    int currentPageNr = listableBlob.getPageNr();
+
+                    writer.addToListing(blob);
+
+                    listCount++;
+
+                    if (pageNr != -1 && pageNr != currentPageNr && writer.isCheckpoint()) {
+                        commit(session, listCount);
+                        listCount = 0;
+                    }
+
+                    pageNr = currentPageNr;
+                }
+
+                writer.finishListing();

Review comment:
       It seems ListGCSBucket does not store the listed items but lists all the files again and again.
   Maybe `alreadyListedEntities.put(updatedEntity.getIdentifier(), listedEntity);` is missing here (see the same method in ListS3).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org