You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "tpalfy (via GitHub)" <gi...@apache.org> on 2023/03/20 14:53:10 UTC

[GitHub] [nifi] tpalfy commented on a diff in pull request #6987: NIFI-11137 Add record support to Consume/PublishJMS

tpalfy commented on code in PR #6987:
URL: https://github.com/apache/nifi/pull/6987#discussion_r1142228008


##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/strategy/consumer/record/RecordUtils.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.strategy.consumer.record;
+
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.Tuple;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RecordUtils {
+
+    public static Record append(final Record originalRecord, final Map<String, String> decoratorValues, final String decoratorPrefix) {

Review Comment:
   ```suggestion
       public static Record merge(final Record originalRecord, final Map<String, String> decoratorValues, final String decoratorPrefix) {
   ```



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/strategy/consumer/record/RecordWriter.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.strategy.consumer.record;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.jms.processors.strategy.consumer.AttributeSupplier;
+import org.apache.nifi.jms.processors.strategy.consumer.FlowFileWriter;
+import org.apache.nifi.jms.processors.strategy.consumer.MessageConsumerCallback;
+import org.apache.nifi.jms.processors.strategy.consumer.Serializer;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SchemaValidationException;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.nifi.jms.processors.strategy.consumer.record.OutputStrategy.USE_APPENDER;
+import static org.apache.nifi.jms.processors.strategy.consumer.record.OutputStrategy.USE_VALUE;
+import static org.apache.nifi.jms.processors.strategy.consumer.record.OutputStrategy.USE_WRAPPER;
+
+public class RecordWriter<T> implements FlowFileWriter<T> {

Review Comment:
   I see a discrepancy in the naming between `RecordBasedFlowFileReader` vs `RecordWriter`.
   Maybe this should be `RecordBasedFlowFileWriter`...



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/strategy/consumer/record/OutputStrategy.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.strategy.consumer.record;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Enumeration of supported JMS Output Strategies
+ */
+public enum OutputStrategy implements DescribedValue {

Review Comment:
   ```suggestion
   /**
    * Enumeration for strategies about how to add extra attributes to records.
    */
   public enum AttributeAdditionStrategy implements DescribedValue {
       DONT_ADD("Don't Add Attributes", "Write only the original record into the FlowFile."),
   
       WRAP("Add Attributes As Child Object", "Create a new record in which both the original record and the additional attributes are in two separate fields."),
   
       MERGE("Merge Attributes Into Record", "Write the additional attributes into the original record prefixed with \"_\".");
   
       private final String displayName;
   
       private final String description;
   
       AttributeAdditionStrategy(final String displayName, final String description) {
           this.displayName = displayName;
           this.description = description;
       }
   
       @Override
       public String getValue() {
           return name();
       }
   
       @Override
       public String getDisplayName() {
           return displayName;
       }
   
       @Override
       public String getDescription() {
           return description;
       }
   }
   ```



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java:
##########
@@ -199,36 +219,50 @@ protected void rendezvousWithJms(ProcessContext context, ProcessSession processS
                     }
                 }
 
-                switch (context.getProperty(MESSAGE_BODY).getValue()) {
-                    case TEXT_MESSAGE:
-                        try {
-                            publisher.publish(destinationName, this.extractTextMessageBody(flowFile, processSession, charset), attributesToSend);
-                        } catch(Exception e) {
-                            publisher.setValid(false);
-                            throw e;
+                if (context.getProperty(RECORD_READER).isSet()) {
+                    final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+                    final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+                    final MessagePublisher messagePublisher = aMessagePublisher()

Review Comment:
   ```suggestion
                       RecordBasedFlowFileReader flowFileReader = new RecordBasedFlowFileReader(
                               getIdentifier(),
                               readerFactory,
                               writerFactory,
                               StandardRecordsPublishedEventReporter.of(destinationName),
                               getLogger()
                       );
   
                       MessagePublisherCallback messagePublisherCallback = new MessagePublisherCallback() {
                           @Override
                           public void onSuccess(FlowFile flowFile) {
                               processSession.transfer(flowFile, REL_SUCCESS);
                           }
   
                           @Override
                           public void onFailure(FlowFile flowFile, Exception e) {
                               handleException(context, processSession, publisher, flowFile, e);
                           }
                       };
   
                       flowFileReader.processFlowFileContent(
                               processSession,
                               flowFile,
                               content -> publisher.publish(destinationName, content, attributesToSend),
                               messagePublisherCallback
                       );
   ```



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java:
##########
@@ -305,6 +326,97 @@ public void accept(final JMSResponse response) {
         }
     }
 
+    private void processSingleMessage(ProcessSession processSession, JMSConsumer consumer, String destinationName, String errorQueueName,
+                                      boolean durable, boolean shared, String subscriptionName, String messageSelector, String charset) {
+
+        consumer.consumeSingleMessage(destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset, response -> {
+            if (response == null) {
+                return;
+            }
+
+            try {
+                final FlowFile flowFile = createFlowFileFromMessage(processSession, destinationName, response);
+
+                processSession.getProvenanceReporter().receive(flowFile, destinationName);
+                processSession.transfer(flowFile, REL_SUCCESS);
+                processSession.commitAsync(
+                        () -> withLog(() -> acknowledge(response)),
+                        __ -> withLog(() -> response.reject()));
+            } catch (final Throwable t) {
+                response.reject();
+                throw t;
+            }
+        });
+    }
+
+    private FlowFile createFlowFileFromMessage(ProcessSession processSession, String destinationName, JMSResponse response) {
+        FlowFile flowFile = processSession.create();
+        flowFile = processSession.write(flowFile, out -> out.write(response.getMessageBody()));
+
+        final Map<String, String> jmsHeaders = response.getMessageHeaders();
+        final Map<String, String> jmsProperties = response.getMessageProperties();
+
+        flowFile = updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession);
+        flowFile = updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession);
+        flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName);
+
+        flowFile = processSession.putAttribute(flowFile, JMS_MESSAGETYPE, response.getMessageType());
+
+        return flowFile;
+    }
+
+    private void processMessageSet(ProcessContext context, ProcessSession session, JMSConsumer consumer, String destinationName,String errorQueueName,
+                                   boolean durable, boolean shared, String subscriptionName, String messageSelector, String charset) {
+
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final OutputStrategy outputStrategy = OutputStrategy.valueOf(context.getProperty(OUTPUT_STRATEGY).getValue());
+
+        consumer.consumeMessageSet(destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset, jmsResponses -> {
+            final MessageConsumer<JMSResponse> messageConsumer = new MessageConsumer.Builder<JMSResponse>()
+                    .withFlowFileWriter((new RecordWriter.Builder<JMSResponse>()
+                            .withReaderFactory(readerFactory)
+                            .withWriterFactory(writerFactory)
+                            .withSerializer(message -> message.getMessageBody() == null ? new byte[0] : message.getMessageBody())
+                            .withOutputStrategy(outputStrategy)
+                            .withLogger(getLogger()).build()))
+                    .withAttributeSupplier(message -> mergeJmsAttributes(message.getMessageHeaders(), message.getMessageProperties()))
+                    .withEventReporter(StandardRecordReceivedEventReporter.of(destinationName))
+                    .build();
+
+            messageConsumer.consumeMessages(
+                    session,
+                    jmsResponses,
+                    new MessageConsumerCallback<>() {
+                        @Override
+                        public void onSuccess(FlowFile flowFile, List<JMSResponse> processedMessages, List<JMSResponse> failedMessages) {
+                            session.transfer(flowFile, REL_SUCCESS);
+                            session.commitAsync(
+                                    () -> withLog(() -> acknowledge(processedMessages, failedMessages)),
+                                    __ -> withLog(() -> reject(processedMessages, failedMessages))
+                            );
+                            session.adjustCounter(COUNTER_RECORDS_RECEIVED, processedMessages.size() + failedMessages.size(), false);
+                            session.adjustCounter(COUNTER_RECORDS_PROCESSED, processedMessages.size(), false);
+                        }
+
+                        @Override
+                        public void onParseFailure(FlowFile flowFile, JMSResponse message, Exception e) {
+                            final FlowFile failedMessage = createFlowFileFromMessage(session, destinationName, message);
+                            session.transfer(failedMessage, REL_PARSE_FAILURE);
+                            session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false);
+                        }
+
+                        @Override
+                        public void onFailure(FlowFile flowFile, List<JMSResponse> processedMessages, List<JMSResponse> failedMessages, Exception e) {
+                            reject(processedMessages, failedMessages);
+                            // It would be nicer to call rollback and yield here, but we are rethrowing the exception to have the same error handling with processSingleMessage.
+                            throw new ProcessException(e);
+                        }
+                    }
+            );

Review Comment:
   The overall complexity could be drastically decreased and multiple classes (namely `StandardRecordReceivedEventReporter`, `EventReporter`, `FlowFileWriter`, `MessageConsumer`, `RecordSupplier`, `FlowFileReader`, `MessagePublisher`) could be removed when following a more direct approach at this level, like this:
   
   ```suggestion
               RecordWriter<JMSResponse> recordWriter = new RecordWriter<>(
                       readerFactory,
                       writerFactory,
                       message -> message.getMessageBody() == null ? new byte[0] : message.getMessageBody(),
                       attributeAdditionStrategy,
                       getLogger()
               );
   
               MessageConsumerCallback<JMSResponse> messageConsumerCallback = new MessageConsumerCallback<>() {
                   @Override
                   public void onSuccess(FlowFile flowFile, List<JMSResponse> processedMessages, List<JMSResponse> failedMessages) {
                       session.getProvenanceReporter().receive(flowFile, destinationName);
                       session.transfer(flowFile, REL_SUCCESS);
                       session.commitAsync(
                               () -> withLog(() -> acknowledge(processedMessages, failedMessages)),
                               commitError -> withLog(() -> reject(processedMessages, failedMessages))
                       );
                       session.adjustCounter(COUNTER_RECORDS_RECEIVED, processedMessages.size() + failedMessages.size(), false);
                       session.adjustCounter(COUNTER_RECORDS_PROCESSED, processedMessages.size(), false);
                   }
   
                   @Override
                   public void onParseFailure(FlowFile flowFile, JMSResponse message, Exception e) {
                       final FlowFile failedMessage = createFlowFileFromMessage(session, destinationName, message);
                       session.transfer(failedMessage, REL_PARSE_FAILURE);
                       session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false);
                   }
   
                   @Override
                   public void onFailure(FlowFile flowFile, List<JMSResponse> processedMessages, List<JMSResponse> failedMessages, Exception e) {
                       reject(processedMessages, failedMessages);
                       // It would be nicer to call rollback and yield here, but we are rethrowing the exception to have the same error handling with processSingleMessage.
                       throw new ProcessException(e);
                   }
               };
               AttributeSupplier<JMSResponse> attributeSupplier = message -> mergeJmsAttributes(message.getMessageHeaders(), message.getMessageProperties());
               
               recordWriter.write(
                       session,
                       jmsResponses,
                       attributeSupplier,
                       messageConsumerCallback
               );
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org