You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "greyp9 (via GitHub)" <gi...@apache.org> on 2023/06/01 20:51:20 UTC

[GitHub] [nifi] greyp9 commented on a diff in pull request #7274: NIFI-11553 - additional configurability for GCP processors; PublishGC…

greyp9 commented on code in PR #7274:
URL: https://github.com/apache/nifi/pull/7274#discussion_r1213667620


##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java:
##########
@@ -213,61 +289,139 @@ public List<ConfigVerificationResult> verify(final ProcessContext context, final
 
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
-        final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger();
-        final List<FlowFile> flowFiles = session.get(flowFileCount);
-
-        if (flowFiles.isEmpty() || publisher == null) {
-            if (storedException.get() != null) {
-                getLogger().error("Google Cloud PubSub Publisher was not properly created due to {}", new Object[]{storedException.get()});
-            }
+        final StopWatch stopWatch = new StopWatch(true);
+        final ContentInputStrategy inputStrategy = ContentInputStrategy.valueOf(context.getProperty(CONTENT_INPUT_STRATEGY).getValue());
+        final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
+        final List<FlowFile> flowFileBatch = session.get(maxBatchSize);
+        if (flowFileBatch.isEmpty()) {
             context.yield();
-            return;
+        } else if (storedException.get() != null) {
+            getLogger().error("Google Cloud PubSub Publisher was not properly created due to {}", storedException.get());
+            context.yield();
+        } else if (ContentInputStrategy.FLOWFILE_ORIENTED.equals(inputStrategy)) {
+            onTriggerFlowFileStrategy(context, session, stopWatch, flowFileBatch);
+        } else if (ContentInputStrategy.RECORD_ORIENTED.equals(inputStrategy)) {
+            onTriggerRecordStrategy(context, session, stopWatch, flowFileBatch);
+        } else {
+            throw new IllegalStateException(inputStrategy.getValue());
         }
+    }
+
+    private void onTriggerFlowFileStrategy(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch) throws ProcessException {
+        final long maxMessageSize = context.getProperty(MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).longValue();
 
-        final long startNanos = System.nanoTime();
-        final List<FlowFile> successfulFlowFiles = new ArrayList<>();
-        final String topicName = getTopicName(context).toString();
+        final MessageTracker messageTracker = new MessageTracker();
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        for (final FlowFile flowFile : flowFileBatch) {
+            if (flowFile.getSize() > maxMessageSize) {
+                final String message = String.format("FlowFile size %d exceeds MAX_MESSAGE_SIZE", flowFile.getSize());
+                messageTracker.add(new FlowFileResult(flowFile, Collections.emptyList(), new IllegalArgumentException(message)));
+            } else {
+                baos.reset();
+                session.exportTo(flowFile, baos);
+                final ApiFuture<String> future = publishOneMessage(context, flowFile, baos.toByteArray());
+                messageTracker.add(new FlowFileResult(flowFile, Collections.singletonList(future)));
+            }
+        }
+        finishBatch(session, flowFileBatch, stopWatch, messageTracker);
+    }
 
+    private void onTriggerRecordStrategy(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch) throws ProcessException {
         try {
-            for (FlowFile flowFile : flowFiles) {
-                try {
-                    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                    session.exportTo(flowFile, baos);
-                    final ByteString flowFileContent = ByteString.copyFrom(baos.toByteArray());
-
-                    PubsubMessage message = PubsubMessage.newBuilder().setData(flowFileContent)
-                            .setPublishTime(Timestamp.newBuilder().build())
-                            .putAllAttributes(getDynamicAttributesMap(context, flowFile))
-                            .build();
+            onTriggerRecordStrategyInner(context, session, stopWatch, flowFileBatch);
+        } catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
+            throw new ProcessException(e);
+        }
+    }
 
-                    ApiFuture<String> messageIdFuture = publisher.publish(message);
+    private void onTriggerRecordStrategyInner(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch)
+            throws ProcessException, IOException, SchemaNotFoundException, MalformedRecordException {
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-                    final Map<String, String> attributes = new HashMap<>();
-                    attributes.put(MESSAGE_ID_ATTRIBUTE, messageIdFuture.get());
-                    attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
+        final MessageTracker messageTracker = new MessageTracker();
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        for (final FlowFile flowFile : flowFileBatch) {
+            final Map<String, String> attributes = flowFile.getAttributes();
+            final RecordReader reader = readerFactory.createRecordReader(
+                    attributes, session.read(flowFile), flowFile.getSize(), getLogger());
+            final RecordSet recordSet = reader.createRecordSet();
+            final RecordSchema schema = writerFactory.getSchema(attributes, recordSet.getSchema());
 
-                    flowFile = session.putAllAttributes(flowFile, attributes);
-                    successfulFlowFiles.add(flowFile);
-                } catch (InterruptedException | ExecutionException e) {
-                    if (e.getCause() instanceof DeadlineExceededException) {
-                        getLogger().error("Failed to publish the message to Google Cloud PubSub topic '{}' due to {} but attempting again may succeed " +
-                                        "so routing to retry", new Object[]{topicName, e.getLocalizedMessage()}, e);
-                        session.transfer(flowFile, REL_RETRY);
-                    } else {
-                        getLogger().error("Failed to publish the message to Google Cloud PubSub topic '{}'", topicName, e);
-                        session.transfer(flowFile, REL_FAILURE);
-                    }
-                    context.yield();
-                }
+            final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, baos, attributes);
+            final PushBackRecordSet pushBackRecordSet = new PushBackRecordSet(recordSet);
+            final List<ApiFuture<String>> futures = new ArrayList<>();
+            while (pushBackRecordSet.isAnotherRecord()) {
+                final ApiFuture<String> future = publishOneRecord(context, flowFile, baos, writer, pushBackRecordSet.next());
+                futures.add(future);
             }
-        } finally {
-            if (!successfulFlowFiles.isEmpty()) {
-                session.transfer(successfulFlowFiles, REL_SUCCESS);
-                for (FlowFile flowFile : successfulFlowFiles) {
-                    final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-                    session.getProvenanceReporter().send(flowFile, topicName, transmissionMillis);
+            messageTracker.add(new FlowFileResult(flowFile, futures));
+            getLogger().trace("Parsing of FlowFile (ID:{}) records complete, now {} messages", flowFile.getId(), messageTracker.size());
+        }
+        finishBatch(session, flowFileBatch, stopWatch, messageTracker);
+    }
+
+    private ApiFuture<String> publishOneRecord(
+            final ProcessContext context,
+            final FlowFile flowFile,
+            final ByteArrayOutputStream baos,
+            final RecordSetWriter writer,
+            final Record record) throws IOException {
+        baos.reset();
+        writer.write(record);
+        writer.flush();
+        return publishOneMessage(context, flowFile, baos.toByteArray());
+    }
+
+    private ApiFuture<String> publishOneMessage(final ProcessContext context,
+                                                final FlowFile flowFile,
+                                                final byte[] content) {
+        final PubsubMessage message = PubsubMessage.newBuilder()
+                .setData(ByteString.copyFrom(content))
+                .setPublishTime(Timestamp.newBuilder().build())
+                .putAllAttributes(getDynamicAttributesMap(context, flowFile))
+                .build();
+        return publisher.publish(message);
+    }
+
+    private void finishBatch(final ProcessSession session,
+                             final List<FlowFile> flowFileBatch,
+                             final StopWatch stopWatch,
+                             final MessageTracker messageTracker) {
+        try {
+            getLogger().trace("Submit of batch complete, size {}", messageTracker.size());
+            final List<String> messageIdsSuccess = ApiFutures.successfulAsList(messageTracker.getFutures()).get();
+            getLogger().trace("Send of batch complete, success size {}", messageIdsSuccess.size());
+            messageTracker.reconcile(messageIdsSuccess);
+            final String topicName = publisher.getTopicNameString();
+            for (final FlowFileResult flowFileResult : messageTracker.getFlowFileResults()) {
+                final Map<String, String> attributes = new HashMap<>();
+                //attributes.put(MESSAGE_ID_ATTRIBUTE, messageIdFuture.get());  // what to do if using record strategy?
+                attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
+                final FlowFile flowFile = session.putAllAttributes(flowFileResult.getFlowFile(), attributes);
+                final String transitUri = String.format(TRANSIT_URI_FORMAT_STRING, topicName);
+                session.getProvenanceReporter().send(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+                session.transfer(flowFile, flowFileResult.getRelationship());
+                if (flowFileResult.getException() != null) {
+                    getLogger().error("FlowFile send failure", flowFileResult.getException());
                 }
             }
+        } catch (final InterruptedException | ExecutionException e) {
+            session.rollback();

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org