You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2023/02/21 21:55:30 UTC

[nifi] branch main updated: NIFI-11187 Removed ActiveMQ from Standard Processors

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 57a1144f34 NIFI-11187 Removed ActiveMQ from Standard Processors
57a1144f34 is described below

commit 57a1144f34ab60bb9ed78a8d86861fb7bcd378b2
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Wed Feb 15 13:41:36 2023 -0600

    NIFI-11187 Removed ActiveMQ from Standard Processors
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #6961
---
 .../nifi-standard-processors/pom.xml               |   9 -
 .../nifi/processors/standard/JmsConsumer.java      | 240 ----------
 .../standard/util/DocumentReaderCallback.java      |  57 ---
 .../nifi/processors/standard/util/JmsFactory.java  | 514 ---------------------
 .../standard/util/JmsProcessingSummary.java        |  81 ----
 .../processors/standard/util/JmsProperties.java    | 189 --------
 .../standard/util/UDPStreamConsumer.java           | 214 ---------
 .../standard/util/WrappedMessageConsumer.java      |  77 ---
 .../standard/util/WrappedMessageProducer.java      |  77 ---
 .../standard/TestWaitNotifyProtocol.java           |   2 +-
 nifi-nar-bundles/nifi-standard-bundle/pom.xml      |  11 -
 11 files changed, 1 insertion(+), 1470 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 51d63fd09f..1f28987cbe 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -269,10 +269,6 @@
             <groupId>javax.jms</groupId>
             <artifactId>javax.jms-api</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.activemq</groupId>
-            <artifactId>activemq-client</artifactId>
-        </dependency>
         <dependency>
             <groupId>com.jayway.jsonpath</groupId>
             <artifactId>json-path</artifactId>
@@ -345,11 +341,6 @@
             <version>2.0.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.activemq</groupId>
-            <artifactId>activemq-broker</artifactId>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-ssl-context-service</artifactId>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
deleted file mode 100644
index 2d8f96f5f4..0000000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processors.standard.util.JmsFactory;
-import org.apache.nifi.processors.standard.util.JmsProcessingSummary;
-import org.apache.nifi.processors.standard.util.WrappedMessageConsumer;
-import org.apache.nifi.util.StopWatch;
-
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.nifi.processors.standard.util.JmsProperties.ACKNOWLEDGEMENT_MODE;
-import static org.apache.nifi.processors.standard.util.JmsProperties.ACK_MODE_CLIENT;
-import static org.apache.nifi.processors.standard.util.JmsProperties.BATCH_SIZE;
-import static org.apache.nifi.processors.standard.util.JmsProperties.CLIENT_ID_PREFIX;
-import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_NAME;
-import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROPS_TO_ATTRIBUTES;
-import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER;
-import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_SELECTOR;
-import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD;
-import static org.apache.nifi.processors.standard.util.JmsProperties.SSL_CONTEXT_SERVICE;
-import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT;
-import static org.apache.nifi.processors.standard.util.JmsProperties.URL;
-import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME;
-
-public abstract class JmsConsumer extends AbstractProcessor {
-
-    public static final String MAP_MESSAGE_PREFIX = "jms.mapmessage.";
-
-    public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("All FlowFiles are routed to success")
-            .build();
-
-    private final Set<Relationship> relationships;
-    private final List<PropertyDescriptor> propertyDescriptors;
-
-    public JmsConsumer() {
-        final Set<Relationship> rels = new HashSet<>();
-        rels.add(REL_SUCCESS);
-        this.relationships = Collections.unmodifiableSet(rels);
-
-        final List<PropertyDescriptor> descriptors = new ArrayList<>();
-        descriptors.add(JMS_PROVIDER);
-        descriptors.add(URL);
-        descriptors.add(DESTINATION_NAME);
-        descriptors.add(TIMEOUT);
-        descriptors.add(BATCH_SIZE);
-        descriptors.add(USERNAME);
-        descriptors.add(PASSWORD);
-        descriptors.add(SSL_CONTEXT_SERVICE);
-        descriptors.add(ACKNOWLEDGEMENT_MODE);
-        descriptors.add(MESSAGE_SELECTOR);
-        descriptors.add(JMS_PROPS_TO_ATTRIBUTES);
-        descriptors.add(CLIENT_ID_PREFIX);
-        this.propertyDescriptors = Collections.unmodifiableList(descriptors);
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        return relationships;
-    }
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return propertyDescriptors;
-    }
-
-    public void consume(final ProcessContext context, final ProcessSession session, final WrappedMessageConsumer wrappedConsumer) throws ProcessException {
-        final ComponentLog logger = getLogger();
-
-        final MessageConsumer consumer = wrappedConsumer.getConsumer();
-        final boolean clientAcknowledge = context.getProperty(ACKNOWLEDGEMENT_MODE).getValue().equalsIgnoreCase(ACK_MODE_CLIENT);
-        final long timeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
-        final boolean addAttributes = context.getProperty(JMS_PROPS_TO_ATTRIBUTES).asBoolean();
-        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
-
-        final JmsProcessingSummary processingSummary = new JmsProcessingSummary();
-
-        final StopWatch stopWatch = new StopWatch(true);
-        for (int i = 0; i < batchSize; i++) {
-
-            final Message message;
-            try {
-                // If we haven't received a message, wait until one is available. If we have already received at least one
-                // message, then we are not willing to wait for more to become available, but we are willing to keep receiving
-                // all messages that are immediately available.
-                if (processingSummary.getMessagesReceived() == 0) {
-                    message = consumer.receive(timeout);
-                } else {
-                    message = consumer.receiveNoWait();
-                }
-            } catch (final JMSException e) {
-                logger.error("Failed to receive JMS Message due to {}", e);
-                wrappedConsumer.close(logger);
-                break;
-            }
-
-            if (message == null) { // if no messages, we're done
-                break;
-            }
-
-            try {
-                processingSummary.add(map2FlowFile(context, session, message, addAttributes, logger));
-            } catch (Exception e) {
-                logger.error("Failed to receive JMS Message due to {}", e);
-                wrappedConsumer.close(logger);
-                break;
-            }
-        }
-
-        if (processingSummary.getFlowFilesCreated() == 0) {
-            context.yield();
-            return;
-        }
-
-        session.commitAsync(() -> {
-            // if we need to acknowledge the messages, do so now.
-            final Message lastMessage = processingSummary.getLastMessageReceived();
-            if (clientAcknowledge && lastMessage != null) {
-                try {
-                    lastMessage.acknowledge();  // acknowledge all received messages by acknowledging only the last.
-                } catch (final JMSException e) {
-                    logger.error("Failed to acknowledge {} JMS Message(s). This may result in duplicate messages. Reason for failure: {}",
-                        new Object[]{processingSummary.getMessagesReceived(), e});
-                }
-            }
-        });
-
-        stopWatch.stop();
-        if (processingSummary.getFlowFilesCreated() > 0) {
-            final float secs = (stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000F);
-            float messagesPerSec = (processingSummary.getMessagesReceived()) / secs;
-            final String dataRate = stopWatch.calculateDataRate(processingSummary.getBytesReceived());
-            logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}",
-                    new Object[]{processingSummary.getMessagesReceived(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate});
-        }
-    }
-
-    public static JmsProcessingSummary map2FlowFile(final ProcessContext context, final ProcessSession session, final Message message, final boolean addAttributes, ComponentLog logger)
-            throws Exception {
-
-        // Currently not very useful, because always one Message == one FlowFile
-        final AtomicInteger msgsThisFlowFile = new AtomicInteger(1);
-
-        FlowFile flowFile = session.create();
-        try {
-            // MapMessage is exception, add only name-value pairs to FlowFile attributes
-            if (message instanceof MapMessage) {
-                MapMessage mapMessage = (MapMessage) message;
-                flowFile = session.putAllAttributes(flowFile, createMapMessageValues(mapMessage));
-            } else { // all other message types, write Message body to FlowFile content
-                flowFile = session.write(flowFile, new OutputStreamCallback() {
-                    @Override
-                    public void process(final OutputStream rawOut) throws IOException {
-                        try (final OutputStream out = new BufferedOutputStream(rawOut, 65536)) {
-                            final byte[] messageBody = JmsFactory.createByteArray(message);
-                            out.write(messageBody);
-                        } catch (final JMSException e) {
-                            throw new ProcessException("Failed to receive JMS Message due to " + e.getMessage(), e);
-                        }
-                    }
-                });
-            }
-
-            if (addAttributes) {
-                flowFile = session.putAllAttributes(flowFile, JmsFactory.createAttributeMap(message));
-            }
-
-            session.getProvenanceReporter().receive(flowFile, context.getProperty(URL).getValue());
-            session.transfer(flowFile, REL_SUCCESS);
-            logger.info("Created {} from {} messages received from JMS Server and transferred to 'success'",
-                    new Object[]{flowFile, msgsThisFlowFile.get()});
-
-            return new JmsProcessingSummary(flowFile.getSize(), message, flowFile);
-
-        } catch (Exception e) {
-            session.remove(flowFile);
-            throw e;
-        }
-    }
-
-    public static Map<String, String> createMapMessageValues(final MapMessage mapMessage) throws JMSException {
-        final Map<String, String> valueMap = new HashMap<>();
-
-        final Enumeration<?> enumeration = mapMessage.getMapNames();
-        while (enumeration.hasMoreElements()) {
-            final String name = (String) enumeration.nextElement();
-
-            final Object value = mapMessage.getObject(name);
-            if (value == null) {
-                valueMap.put(MAP_MESSAGE_PREFIX + name, "");
-            } else {
-                valueMap.put(MAP_MESSAGE_PREFIX + name, value.toString());
-            }
-        }
-
-        return valueMap;
-    }
-
-}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/DocumentReaderCallback.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/DocumentReaderCallback.java
deleted file mode 100644
index f2230e2c3f..0000000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/DocumentReaderCallback.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.util;
-
-import java.io.IOException;
-import java.io.InputStream;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.xml.processing.ProcessingException;
-import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider;
-import org.w3c.dom.Document;
-
-public class DocumentReaderCallback implements InputStreamCallback {
-
-    private final boolean isNamespaceAware;
-    private Document document;
-
-    /**
-     * Creates a new DocumentReaderCallback.
-     *
-     * @param isNamespaceAware Whether or not the parse should consider namespaces
-     */
-    public DocumentReaderCallback(boolean isNamespaceAware) {
-        this.isNamespaceAware = isNamespaceAware;
-    }
-
-    @Override
-    public void process(final InputStream stream) throws IOException {
-        try {
-            final StandardDocumentProvider documentProvider = new StandardDocumentProvider();
-            documentProvider.setNamespaceAware(isNamespaceAware);
-            document = documentProvider.parse(stream);
-        } catch (final ProcessingException e) {
-            throw new IOException(e.getLocalizedMessage(), e);
-        }
-    }
-
-    /**
-     * @return the document
-     */
-    public Document getDocument() {
-        return document;
-    }
-}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java
deleted file mode 100644
index 26f08af1e6..0000000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java
+++ /dev/null
@@ -1,514 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.util;
-
-import static org.apache.nifi.processors.standard.util.JmsProperties.ACKNOWLEDGEMENT_MODE;
-import static org.apache.nifi.processors.standard.util.JmsProperties.ACK_MODE_AUTO;
-import static org.apache.nifi.processors.standard.util.JmsProperties.ACTIVEMQ_PROVIDER;
-import static org.apache.nifi.processors.standard.util.JmsProperties.CLIENT_ID_PREFIX;
-import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_NAME;
-import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_TYPE;
-import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_TYPE_QUEUE;
-import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_TYPE_TOPIC;
-import static org.apache.nifi.processors.standard.util.JmsProperties.DURABLE_SUBSCRIPTION;
-import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER;
-import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_SELECTOR;
-import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD;
-import static org.apache.nifi.processors.standard.util.JmsProperties.SSL_CONTEXT_SERVICE;
-import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT;
-import static org.apache.nifi.processors.standard.util.JmsProperties.URL;
-import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQSslConnectionFactory;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.util.URISupport;
-import org.apache.activemq.util.URISupport.CompositeData;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.ssl.SSLContextService;
-
-public class JmsFactory {
-
-    public static final boolean DEFAULT_IS_TRANSACTED = false;
-    public static final String ATTRIBUTE_PREFIX = "jms.";
-    public static final String ATTRIBUTE_TYPE_SUFFIX = ".type";
-    public static final String CLIENT_ID_FIXED_PREFIX = "NiFi-";
-
-    // JMS Metadata Fields
-    public static final String JMS_MESSAGE_ID = "JMSMessageID";
-    public static final String JMS_DESTINATION = "JMSDestination";
-    public static final String JMS_REPLY_TO = "JMSReplyTo";
-    public static final String JMS_DELIVERY_MODE = "JMSDeliveryMode";
-    public static final String JMS_REDELIVERED = "JMSRedelivered";
-    public static final String JMS_CORRELATION_ID = "JMSCorrelationID";
-    public static final String JMS_TYPE = "JMSType";
-    public static final String JMS_TIMESTAMP = "JMSTimestamp";
-    public static final String JMS_EXPIRATION = "JMSExpiration";
-    public static final String JMS_PRIORITY = "JMSPriority";
-
-    // JMS Property Types.
-    public static final String PROP_TYPE_STRING = "string";
-    public static final String PROP_TYPE_INTEGER = "integer";
-    public static final String PROP_TYPE_OBJECT = "object";
-    public static final String PROP_TYPE_BYTE = "byte";
-    public static final String PROP_TYPE_DOUBLE = "double";
-    public static final String PROP_TYPE_FLOAT = "float";
-    public static final String PROP_TYPE_LONG = "long";
-    public static final String PROP_TYPE_SHORT = "short";
-    public static final String PROP_TYPE_BOOLEAN = "boolean";
-
-    public static Connection createConnection(final ProcessContext context) throws JMSException {
-        return createConnection(context, createClientId(context));
-    }
-
-    public static Connection createConnection(final ProcessContext context, final String clientId) throws JMSException {
-        Objects.requireNonNull(context);
-        Objects.requireNonNull(clientId);
-
-        final ConnectionFactory connectionFactory = createConnectionFactory(context);
-
-        final String username = context.getProperty(USERNAME).getValue();
-        final String password = context.getProperty(PASSWORD).getValue();
-        final Connection connection = (username == null && password == null) ? connectionFactory.createConnection() : connectionFactory.createConnection(username, password);
-
-        connection.setClientID(clientId);
-        connection.start();
-        return connection;
-    }
-
-    public static Connection createConnection(final String url, final String jmsProvider, final String username, final String password, final int timeoutMillis) throws JMSException {
-        final ConnectionFactory connectionFactory = createConnectionFactory(url, timeoutMillis, jmsProvider);
-        return (username == null && password == null) ? connectionFactory.createConnection() : connectionFactory.createConnection(username, password);
-    }
-
-    public static String createClientId(final ProcessContext context) {
-        final String clientIdPrefix = context.getProperty(CLIENT_ID_PREFIX).getValue();
-        return CLIENT_ID_FIXED_PREFIX + (clientIdPrefix == null ? "" : clientIdPrefix) + "-" + UUID.randomUUID().toString();
-    }
-
-    public static boolean clientIdPrefixEquals(final String one, final String two) {
-        if (one == null) {
-            return two == null;
-        } else if (two == null) {
-            return false;
-        }
-        int uuidLen = UUID.randomUUID().toString().length();
-        if (one.length() <= uuidLen || two.length() <= uuidLen) {
-            return false;
-        }
-        return one.substring(0, one.length() - uuidLen).equals(two.substring(0, two.length() - uuidLen));
-    }
-
-    public static byte[] createByteArray(final Message message) throws JMSException {
-        if (message instanceof TextMessage) {
-            return getMessageBytes((TextMessage) message);
-        } else if (message instanceof BytesMessage) {
-            return getMessageBytes((BytesMessage) message);
-        } else if (message instanceof StreamMessage) {
-            return getMessageBytes((StreamMessage) message);
-        } else if (message instanceof MapMessage) {
-            return getMessageBytes((MapMessage) message);
-        } else if (message instanceof ObjectMessage) {
-            return getMessageBytes((ObjectMessage) message);
-        }
-        return new byte[0];
-    }
-
-    private static byte[] getMessageBytes(TextMessage message) throws JMSException {
-        return (message.getText() == null) ? new byte[0] : message.getText().getBytes();
-    }
-
-    private static byte[] getMessageBytes(BytesMessage message) throws JMSException {
-        final long byteCount = message.getBodyLength();
-        if (byteCount > Integer.MAX_VALUE) {
-            throw new JMSException("Incoming message cannot be written to a FlowFile because its size is "
-                    + byteCount
-                    + " bytes, and the maximum size that this processor can handle is "
-                    + Integer.MAX_VALUE);
-        }
-
-        byte[] bytes = new byte[(int) byteCount];
-        message.readBytes(bytes);
-
-        return bytes;
-    }
-
-    private static byte[] getMessageBytes(StreamMessage message) throws JMSException {
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
-        byte[] byteBuffer = new byte[4096];
-        int byteCount;
-        while ((byteCount = message.readBytes(byteBuffer)) != -1) {
-            baos.write(byteBuffer, 0, byteCount);
-        }
-
-        try {
-            baos.close();
-        } catch (final IOException ioe) {
-        }
-
-        return baos.toByteArray();
-    }
-
-    @SuppressWarnings("rawtypes")
-    private static byte[] getMessageBytes(MapMessage message) throws JMSException {
-        Map<String, String> map = new HashMap<>();
-        Enumeration elements = message.getMapNames();
-        while (elements.hasMoreElements()) {
-            String key = (String) elements.nextElement();
-            map.put(key, message.getString(key));
-        }
-        return map.toString().getBytes();
-    }
-
-    private static byte[] getMessageBytes(ObjectMessage message) throws JMSException {
-        try {
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            // will fail if Object is not Serializable
-            try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
-                // will fail if Object is not Serializable
-                oos.writeObject(message.getObject());
-                oos.flush();
-            }
-            return baos.toByteArray();
-        } catch (IOException e) {
-            return new byte[0];
-        }
-    }
-
-    public static Session createSession(final ProcessContext context, final Connection connection, final boolean transacted) throws JMSException {
-        final String configuredAckMode = context.getProperty(ACKNOWLEDGEMENT_MODE).getValue();
-        return createSession(connection, configuredAckMode, transacted);
-    }
-
-    public static Session createSession(final Connection connection, final String configuredAckMode, final boolean transacted) throws JMSException {
-        final int ackMode;
-        if (configuredAckMode == null) {
-            ackMode = Session.AUTO_ACKNOWLEDGE;
-        } else {
-            ackMode = configuredAckMode.equalsIgnoreCase(ACK_MODE_AUTO) ? Session.AUTO_ACKNOWLEDGE : Session.CLIENT_ACKNOWLEDGE;
-        }
-
-        final Session session = connection.createSession(transacted, ackMode);
-        return session;
-    }
-
-    public static WrappedMessageConsumer createQueueMessageConsumer(final ProcessContext context) throws JMSException {
-        Connection connection = null;
-        Session jmsSession = null;
-        try {
-            connection = JmsFactory.createConnection(context);
-            jmsSession = JmsFactory.createSession(context, connection, DEFAULT_IS_TRANSACTED);
-
-            final String messageSelector = context.getProperty(MESSAGE_SELECTOR).getValue();
-            final Destination destination = createQueue(context);
-            final MessageConsumer messageConsumer = jmsSession.createConsumer(destination, messageSelector, false);
-
-            return new WrappedMessageConsumer(connection, jmsSession, messageConsumer);
-        } catch (JMSException e) {
-            if (jmsSession != null) {
-                jmsSession.close();
-            }
-            if (connection != null) {
-                connection.close();
-            }
-            throw e;
-        }
-    }
-
-    public static WrappedMessageConsumer createTopicMessageConsumer(final ProcessContext context) throws JMSException {
-        return createTopicMessageConsumer(context, createClientId(context));
-    }
-
-    public static WrappedMessageConsumer createTopicMessageConsumer(final ProcessContext context, final String clientId) throws JMSException {
-        Objects.requireNonNull(context);
-        Objects.requireNonNull(clientId);
-
-        Connection connection = null;
-        Session jmsSession = null;
-        try {
-            connection = JmsFactory.createConnection(context, clientId);
-            jmsSession = JmsFactory.createSession(context, connection, DEFAULT_IS_TRANSACTED);
-
-            final String messageSelector = context.getProperty(MESSAGE_SELECTOR).getValue();
-            final Topic topic = createTopic(context);
-            final MessageConsumer messageConsumer;
-            if (context.getProperty(DURABLE_SUBSCRIPTION).asBoolean()) {
-                messageConsumer = jmsSession.createDurableSubscriber(topic, clientId, messageSelector, false);
-            } else {
-                messageConsumer = jmsSession.createConsumer(topic, messageSelector, false);
-            }
-
-            return new WrappedMessageConsumer(connection, jmsSession, messageConsumer);
-        } catch (JMSException e) {
-            if (jmsSession != null) {
-                jmsSession.close();
-            }
-            if (connection != null) {
-                connection.close();
-            }
-            throw e;
-        }
-    }
-
-    private static Destination getDestination(final ProcessContext context) throws JMSException {
-        final String destinationType = context.getProperty(DESTINATION_TYPE).getValue();
-        switch (destinationType) {
-            case DESTINATION_TYPE_TOPIC:
-                return createTopic(context);
-            case DESTINATION_TYPE_QUEUE:
-            default:
-                return createQueue(context);
-        }
-    }
-
-    public static WrappedMessageProducer createMessageProducer(final ProcessContext context) throws JMSException {
-        return createMessageProducer(context, false);
-    }
-
-    public static WrappedMessageProducer createMessageProducer(final ProcessContext context, final boolean transacted) throws JMSException {
-        Connection connection = null;
-        Session jmsSession = null;
-
-        try {
-            connection = JmsFactory.createConnection(context);
-            jmsSession = JmsFactory.createSession(context, connection, transacted);
-
-            final Destination destination = getDestination(context);
-            final MessageProducer messageProducer = jmsSession.createProducer(destination);
-
-            return new WrappedMessageProducer(connection, jmsSession, messageProducer);
-        } catch (JMSException e) {
-            if (connection != null) {
-                connection.close();
-            }
-            if (jmsSession != null) {
-                jmsSession.close();
-            }
-            throw e;
-        }
-    }
-
-    public static Destination createQueue(final ProcessContext context) {
-        return createQueue(context, context.getProperty(DESTINATION_NAME).getValue());
-    }
-
-    public static Queue createQueue(final ProcessContext context, final String queueName) {
-        return createQueue(context.getProperty(JMS_PROVIDER).getValue(), queueName);
-    }
-
-    public static Queue createQueue(final String jmsProvider, final String queueName) {
-        switch (jmsProvider) {
-            case ACTIVEMQ_PROVIDER:
-            default:
-                return new ActiveMQQueue(queueName);
-        }
-    }
-
-    private static Topic createTopic(final ProcessContext context) {
-        final String topicName = context.getProperty(DESTINATION_NAME).getValue();
-        switch (context.getProperty(JMS_PROVIDER).getValue()) {
-            case ACTIVEMQ_PROVIDER:
-            default:
-                return new ActiveMQTopic(topicName);
-        }
-    }
-
-    private static ConnectionFactory createConnectionFactory(final ProcessContext context) throws JMSException {
-        final URI uri;
-        try {
-            uri = new URI(context.getProperty(URL).getValue());
-        } catch (URISyntaxException e) {
-            // Should not happen - URI was validated
-            throw new IllegalArgumentException("Validated URI [" + context.getProperty(URL) + "] was invalid", e);
-        }
-        final int timeoutMillis = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
-        final String provider = context.getProperty(JMS_PROVIDER).getValue();
-        if (isSSL(uri)) {
-            final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-            if (sslContextService == null) {
-                throw new IllegalArgumentException("Attempting to initiate SSL JMS connection and SSL Context is not set.");
-            }
-            return createSslConnectionFactory(uri, timeoutMillis, provider, sslContextService.getKeyStoreFile(),
-                    sslContextService.getKeyStorePassword(), sslContextService.getTrustStoreFile(), sslContextService.getTrustStorePassword());
-        } else {
-            return createConnectionFactory(uri, timeoutMillis, provider);
-        }
-    }
-
-    private static boolean isSSL(URI uri) {
-        try {
-            CompositeData compositeData = URISupport.parseComposite(uri);
-            if ("ssl".equals(compositeData.getScheme())) {
-                return true;
-            }
-            for(URI component : compositeData.getComponents()){
-                if ("ssl".equals(component.getScheme())) {
-                    return true;
-                }
-            }
-        } catch (URISyntaxException e) {
-            throw new IllegalArgumentException("Attempting to initiate JMS with invalid composite URI [" + uri + "]", e);
-        }
-        return false;
-    }
-
-    public static ConnectionFactory createConnectionFactory(final URI uri, final int timeoutMillis, final String jmsProvider) throws JMSException {
-        return createConnectionFactory(uri.toString(), timeoutMillis, jmsProvider);
-    }
-
-    public static ConnectionFactory createConnectionFactory(final String url, final int timeoutMillis, final String jmsProvider) throws JMSException {
-        switch (jmsProvider) {
-            case ACTIVEMQ_PROVIDER: {
-                final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
-                factory.setSendTimeout(timeoutMillis);
-                return factory;
-            }
-            default:
-                throw new IllegalArgumentException("Unknown JMS Provider: " + jmsProvider);
-        }
-    }
-
-    public static ConnectionFactory createSslConnectionFactory(final URI uri, final int timeoutMillis, final String jmsProvider,
-            final String keystore, final String keystorePassword, final String truststore, final String truststorePassword) throws JMSException {
-        return createSslConnectionFactory(uri.toString(), timeoutMillis, jmsProvider, keystore, keystorePassword, truststore, truststorePassword);
-    }
-
-    public static ConnectionFactory createSslConnectionFactory(final String url, final int timeoutMillis, final String jmsProvider,
-                            final String keystore, final String keystorePassword, final String truststore, final String truststorePassword) throws JMSException {
-        switch (jmsProvider) {
-            case ACTIVEMQ_PROVIDER: {
-                final ActiveMQSslConnectionFactory factory = new ActiveMQSslConnectionFactory(url);
-                try {
-                    factory.setKeyStore(keystore);
-                } catch (Exception e) {
-                    throw new JMSException("Problem Setting the KeyStore: " + e.getMessage());
-                }
-                factory.setKeyStorePassword(keystorePassword);
-                try {
-                    factory.setTrustStore(truststore);
-                } catch (Exception e) {
-                    throw new JMSException("Problem Setting the TrustStore: " + e.getMessage());
-                }
-                factory.setTrustStorePassword(truststorePassword);
-                factory.setSendTimeout(timeoutMillis);
-                return factory;
-            }
-            default:
-                throw new IllegalArgumentException("Unknown JMS Provider: " + jmsProvider);
-        }
-    }
-
-    public static Map<String, String> createAttributeMap(final Message message) throws JMSException {
-        final Map<String, String> attributes = new HashMap<>();
-
-        final Enumeration<?> enumeration = message.getPropertyNames();
-        while (enumeration.hasMoreElements()) {
-            final String propName = (String) enumeration.nextElement();
-
-            final Object value = message.getObjectProperty(propName);
-
-            if (value == null) {
-                attributes.put(ATTRIBUTE_PREFIX + propName, "");
-                attributes.put(ATTRIBUTE_PREFIX + propName + ATTRIBUTE_TYPE_SUFFIX, "Unknown");
-                continue;
-            }
-
-            final String valueString = value.toString();
-            attributes.put(ATTRIBUTE_PREFIX + propName, valueString);
-
-            final String propType;
-            if (value instanceof String) {
-                propType = PROP_TYPE_STRING;
-            } else if (value instanceof Double) {
-                propType = PROP_TYPE_DOUBLE;
-            } else if (value instanceof Float) {
-                propType = PROP_TYPE_FLOAT;
-            } else if (value instanceof Long) {
-                propType = PROP_TYPE_LONG;
-            } else if (value instanceof Integer) {
-                propType = PROP_TYPE_INTEGER;
-            } else if (value instanceof Short) {
-                propType = PROP_TYPE_SHORT;
-            } else if (value instanceof Byte) {
-                propType = PROP_TYPE_BYTE;
-            } else if (value instanceof Boolean) {
-                propType = PROP_TYPE_BOOLEAN;
-            } else {
-                propType = PROP_TYPE_OBJECT;
-            }
-
-            attributes.put(ATTRIBUTE_PREFIX + propName + ATTRIBUTE_TYPE_SUFFIX, propType);
-        }
-
-        if (message.getJMSCorrelationID() != null) {
-            attributes.put(ATTRIBUTE_PREFIX + JMS_CORRELATION_ID, message.getJMSCorrelationID());
-        }
-        if (message.getJMSDestination() != null) {
-            String destinationName;
-            if (message.getJMSDestination() instanceof Queue) {
-                destinationName = ((Queue) message.getJMSDestination()).getQueueName();
-            } else {
-                destinationName = ((Topic) message.getJMSDestination()).getTopicName();
-            }
-            attributes.put(ATTRIBUTE_PREFIX + JMS_DESTINATION, destinationName);
-        }
-        if (message.getJMSMessageID() != null) {
-            attributes.put(ATTRIBUTE_PREFIX + JMS_MESSAGE_ID, message.getJMSMessageID());
-        }
-        if (message.getJMSReplyTo() != null) {
-            attributes.put(ATTRIBUTE_PREFIX + JMS_REPLY_TO, message.getJMSReplyTo().toString());
-        }
-        if (message.getJMSType() != null) {
-            attributes.put(ATTRIBUTE_PREFIX + JMS_TYPE, message.getJMSType());
-        }
-
-        attributes.put(ATTRIBUTE_PREFIX + JMS_DELIVERY_MODE, String.valueOf(message.getJMSDeliveryMode()));
-        attributes.put(ATTRIBUTE_PREFIX + JMS_EXPIRATION, String.valueOf(message.getJMSExpiration()));
-        attributes.put(ATTRIBUTE_PREFIX + JMS_PRIORITY, String.valueOf(message.getJMSPriority()));
-        attributes.put(ATTRIBUTE_PREFIX + JMS_REDELIVERED, String.valueOf(message.getJMSRedelivered()));
-        attributes.put(ATTRIBUTE_PREFIX + JMS_TIMESTAMP, String.valueOf(message.getJMSTimestamp()));
-        return attributes;
-    }
-}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProcessingSummary.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProcessingSummary.java
deleted file mode 100644
index 5da67fefe4..0000000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProcessingSummary.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.util;
-
-import javax.jms.Message;
-
-import org.apache.nifi.flowfile.FlowFile;
-
-/**
- * Data structure which allows to collect processing summary data.
- *
- */
-public class JmsProcessingSummary {
-
-    private int messagesReceived;
-    private long bytesReceived;
-    private Message lastMessageReceived;
-    private int flowFilesCreated;
-    private FlowFile lastFlowFile; // helps testing
-
-    public JmsProcessingSummary() {
-        super();
-        this.messagesReceived = 0;
-        this.bytesReceived = 0;
-        this.lastMessageReceived = null;
-        this.flowFilesCreated = 0;
-        this.lastFlowFile = null;
-    }
-
-    public JmsProcessingSummary(long bytesReceived, Message lastMessageReceived, FlowFile lastFlowFile) {
-        super();
-        this.messagesReceived = 1;
-        this.bytesReceived = bytesReceived;
-        this.lastMessageReceived = lastMessageReceived;
-        this.flowFilesCreated = 1;
-        this.lastFlowFile = lastFlowFile;
-    }
-
-    public void add(JmsProcessingSummary jmsProcessingSummary) {
-        this.messagesReceived += jmsProcessingSummary.messagesReceived;
-        this.bytesReceived += jmsProcessingSummary.bytesReceived;
-        this.lastMessageReceived = jmsProcessingSummary.lastMessageReceived;
-        this.flowFilesCreated += jmsProcessingSummary.flowFilesCreated;
-        this.lastFlowFile = jmsProcessingSummary.lastFlowFile;
-    }
-
-    public int getMessagesReceived() {
-        return messagesReceived;
-    }
-
-    public long getBytesReceived() {
-        return bytesReceived;
-    }
-
-    public Message getLastMessageReceived() {
-        return lastMessageReceived;
-    }
-
-    public int getFlowFilesCreated() {
-        return flowFilesCreated;
-    }
-
-    public FlowFile getLastFlowFile() {
-        return lastFlowFile;
-    }
-
-}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java
deleted file mode 100644
index 7a3ee0add2..0000000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.util;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.ssl.SSLContextService;
-
-public class JmsProperties {
-
-    public static final String ACTIVEMQ_PROVIDER = "ActiveMQ";
-
-    public static final String ACK_MODE_CLIENT = "Client Acknowledge";
-    public static final String ACK_MODE_AUTO = "Auto Acknowledge";
-
-    public static final String DESTINATION_TYPE_QUEUE = "Queue";
-    public static final String DESTINATION_TYPE_TOPIC = "Topic";
-
-    public static final String MSG_TYPE_BYTE = "byte";
-    public static final String MSG_TYPE_TEXT = "text";
-    public static final String MSG_TYPE_STREAM = "stream";
-    public static final String MSG_TYPE_MAP = "map";
-    public static final String MSG_TYPE_EMPTY = "empty";
-
-    // Standard JMS Properties
-    public static final PropertyDescriptor JMS_PROVIDER = new PropertyDescriptor.Builder()
-            .name("JMS Provider")
-            .description("The Provider used for the JMS Server")
-            .required(true)
-            .allowableValues(ACTIVEMQ_PROVIDER)
-            .defaultValue(ACTIVEMQ_PROVIDER)
-            .build();
-    public static final PropertyDescriptor URL = new PropertyDescriptor.Builder()
-            .name("URL")
-            .description("The URL of the JMS Server")
-            .addValidator(StandardValidators.URI_VALIDATOR)
-            .required(true)
-            .build();
-    public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
-            .name("Communications Timeout")
-            .description("The amount of time to wait when attempting to receive a message before giving up and assuming failure")
-            .required(true)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .defaultValue("30 sec")
-            .build();
-    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
-            .name("Username")
-            .description("Username used for authentication and authorization")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
-            .name("Password")
-            .description("Password used for authentication and authorization")
-            .required(false)
-            .addValidator(Validator.VALID)
-            .sensitive(true)
-            .build();
-    public static final PropertyDescriptor CLIENT_ID_PREFIX = new PropertyDescriptor.Builder()
-            .name("Client ID Prefix")
-            .description("A human-readable ID that can be used to associate connections with yourself so that the maintainers of the JMS Server know who to contact if problems arise")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    // Topic/Queue determination Properties
-    public static final PropertyDescriptor DESTINATION_NAME = new PropertyDescriptor.Builder()
-            .name("Destination Name")
-            .description("The name of the JMS Topic or queue to use")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-    public static final PropertyDescriptor DESTINATION_TYPE = new PropertyDescriptor.Builder()
-            .name("Destination Type")
-            .description("The type of the JMS Destination to use")
-            .required(true)
-            .allowableValues(DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC)
-            .defaultValue(DESTINATION_TYPE_QUEUE)
-            .build();
-
-    public static final PropertyDescriptor DURABLE_SUBSCRIPTION = new PropertyDescriptor.Builder()
-            .name("Use Durable Subscription")
-            .description("If true, connections to the specified topic will use Durable Subscription so that messages are queued when we are not pulling them")
-            .required(true)
-            .allowableValues("true", "false")
-            .defaultValue("false")
-            .build();
-
-    // JMS Publisher Properties
-    public static final PropertyDescriptor ATTRIBUTES_TO_JMS_PROPS = new PropertyDescriptor.Builder()
-            .name("Copy Attributes to JMS Properties")
-            .description("Whether or not FlowFile Attributes should be translated into JMS Message Properties. If true, all "
-                    + "attributes starting with 'jms.' will be set as Properties on the JMS Message (without the 'jms.' prefix). "
-                    + "If an attribute exists that starts with the same value but ends in '.type', that attribute will be used "
-                    + "to determine the JMS Message Property type.")
-            .required(true)
-            .allowableValues("true", "false")
-            .defaultValue("true")
-            .build();
-
-    // JMS Listener Properties
-    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
-            .name("Message Batch Size")
-            .description("The number of messages to pull/push in a single iteration of the processor")
-            .required(true)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .defaultValue("10")
-            .build();
-    public static final PropertyDescriptor ACKNOWLEDGEMENT_MODE = new PropertyDescriptor.Builder()
-            .name("Acknowledgement Mode")
-            .description("The JMS Acknowledgement Mode. Using Auto Acknowledge can cause messages to be lost on restart of NiFi but may provide better performance than Client Acknowledge.")
-            .required(true)
-            .allowableValues(ACK_MODE_CLIENT, ACK_MODE_AUTO)
-            .defaultValue(ACK_MODE_CLIENT)
-            .build();
-    public static final PropertyDescriptor JMS_PROPS_TO_ATTRIBUTES = new PropertyDescriptor.Builder()
-            .name("Copy JMS Properties to Attributes")
-            .description("Whether or not the JMS Message Properties should be copied to the FlowFile Attributes; if so, the attribute name will be jms.XXX, where XXX is the JMS Property name")
-            .required(true)
-            .allowableValues("true", "false")
-            .defaultValue("true")
-            .build();
-    public static final PropertyDescriptor MESSAGE_SELECTOR = new PropertyDescriptor.Builder()
-            .name("Message Selector")
-            .description("The JMS Message Selector to use in order to narrow the messages that are pulled")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    // JMS Producer Properties
-    public static final PropertyDescriptor MESSAGE_TYPE = new PropertyDescriptor.Builder()
-            .name("Message Type")
-            .description("The Type of JMS Message to Construct")
-            .required(true)
-            .allowableValues(MSG_TYPE_BYTE, MSG_TYPE_STREAM, MSG_TYPE_TEXT, MSG_TYPE_MAP, MSG_TYPE_EMPTY)
-            .defaultValue(MSG_TYPE_BYTE)
-            .build();
-    public static final PropertyDescriptor MESSAGE_PRIORITY = new PropertyDescriptor.Builder()
-            .name("Message Priority")
-            .description("The Priority of the Message")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .build();
-    public static final PropertyDescriptor REPLY_TO_QUEUE = new PropertyDescriptor.Builder()
-            .name("Reply-To Queue")
-            .description("The name of the queue to which a reply to should be added")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .build();
-    public static final PropertyDescriptor MESSAGE_TTL = new PropertyDescriptor.Builder()
-            .name("Message Time to Live")
-            .description("The amount of time that the message should live on the destination before being removed; if not specified, the message will never expire.")
-            .required(false)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .build();
-    public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
-            .name("Max Buffer Size")
-            .description("The maximum amount of data that can be buffered for a JMS Message. If a FlowFile's size exceeds this value, the FlowFile will be routed to failure.")
-            .required(true)
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .defaultValue("1 MB")
-            .build();
-
-    // JMS SSL Properties
-    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
-            .name("SSL Context Service")
-            .description("The Controller Service to use in order to obtain an SSL Context.")
-            .required(false)
-            .identifiesControllerService(SSLContextService.class)
-            .build();
-}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/UDPStreamConsumer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/UDPStreamConsumer.java
deleted file mode 100644
index 80806df8c9..0000000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/UDPStreamConsumer.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.util;
-
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.io.nio.BufferPool;
-import org.apache.nifi.io.nio.consumer.StreamConsumer;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-
-/**
- *
- */
-public class UDPStreamConsumer implements StreamConsumer {
-
-    private final ComponentLog logger;
-    final List<FlowFile> newFlowFileQueue;
-    private final String uniqueId;
-    private BufferPool bufferPool = null;
-    private final BlockingQueue<ByteBuffer> filledBuffers = new LinkedBlockingQueue<>();
-    private final AtomicBoolean streamEnded = new AtomicBoolean(false);
-    private final AtomicBoolean consumerDone = new AtomicBoolean(false);
-    private ProcessSession session;
-    private final UDPConsumerCallback udpCallback;
-
-    public UDPStreamConsumer(final String streamId, final List<FlowFile> newFlowFiles, final long fileSizeTrigger, final ComponentLog logger,
-            final boolean flowFilePerDatagram) {
-        this.uniqueId = streamId;
-        this.newFlowFileQueue = newFlowFiles;
-        this.logger = logger;
-        this.udpCallback = new UDPConsumerCallback(filledBuffers, fileSizeTrigger, flowFilePerDatagram);
-    }
-
-    @Override
-    public void setReturnBufferQueue(final BufferPool pool) {
-        this.bufferPool = pool;
-        this.udpCallback.setBufferPool(pool);
-    }
-
-    @Override
-    public void addFilledBuffer(final ByteBuffer buffer) {
-        if (isConsumerFinished()) {
-            bufferPool.returnBuffer(buffer, 0);
-        } else {
-            filledBuffers.add(buffer);
-        }
-    }
-
-    private void close() {
-        if (isConsumerFinished()) {
-            return;
-        }
-        consumerDone.set(true);
-        ByteBuffer buf = null;
-        while ((buf = filledBuffers.poll()) != null) {
-            bufferPool.returnBuffer(buf, 0);
-        }
-    }
-
-    public void setSession(ProcessSession session) {
-        this.session = session;
-    }
-
-    @Override
-    public void process() throws IOException {
-        if (isConsumerFinished()) {
-            return;
-        }
-
-        FlowFile newFlowFile = null;
-        try {
-            if (streamEnded.get() && filledBuffers.isEmpty()) {
-                close();
-                return;
-            }
-            if (filledBuffers.isEmpty()) {
-                return;
-            }
-            // time to make a new flow file
-            newFlowFile = session.create();
-            newFlowFile = session.putAttribute(newFlowFile, "source.stream.identifier", uniqueId);
-            newFlowFile = session.write(newFlowFile, udpCallback);
-            if (newFlowFile.getSize() == 0) {
-                session.remove(newFlowFile);
-                return;
-            }
-            newFlowFileQueue.add(newFlowFile);
-        } catch (final Exception ex) {
-            close();
-            if (newFlowFile != null) {
-                try {
-                    session.remove(newFlowFile);
-                } catch (final Exception ex2) {
-                    logger.warn("Unable to delete partial flow file due to: ", ex2);
-                }
-            }
-            throw new IOException("Problem while processing data stream", ex);
-        }
-    }
-
-    @Override
-    public void signalEndOfStream() {
-        streamEnded.set(true);
-    }
-
-    @Override
-    public boolean isConsumerFinished() {
-        return consumerDone.get();
-    }
-
-    @Override
-    public String getId() {
-        return uniqueId;
-    }
-
-    @Override
-    public final boolean equals(final Object obj) {
-        if (obj == null) {
-            return false;
-        }
-        if (obj == this) {
-            return true;
-        }
-        if (obj.getClass() != getClass()) {
-            return false;
-        }
-        UDPStreamConsumer rhs = (UDPStreamConsumer) obj;
-        return new EqualsBuilder().appendSuper(super.equals(obj)).append(uniqueId, rhs.uniqueId).isEquals();
-    }
-
-    @Override
-    public final int hashCode() {
-        return new HashCodeBuilder(17, 37).append(uniqueId).toHashCode();
-    }
-
-    @Override
-    public final String toString() {
-        return new ToStringBuilder(this).append(uniqueId).toString();
-    }
-
-    public static final class UDPConsumerCallback implements OutputStreamCallback {
-
-        BufferPool bufferPool;
-        final BlockingQueue<ByteBuffer> filledBuffers;
-        final long fileSizeTrigger;
-        final boolean flowFilePerDatagram;
-
-        public UDPConsumerCallback(final BlockingQueue<ByteBuffer> filledBuffers, final long fileSizeTrigger, final boolean flowFilePerDatagram) {
-            this.filledBuffers = filledBuffers;
-            this.fileSizeTrigger = fileSizeTrigger;
-            this.flowFilePerDatagram = flowFilePerDatagram;
-        }
-
-        public void setBufferPool(BufferPool pool) {
-            this.bufferPool = pool;
-        }
-
-        @Override
-        public void process(final OutputStream out) throws IOException {
-            try {
-                long totalBytes = 0L;
-                try (WritableByteChannel wbc = Channels.newChannel(new BufferedOutputStream(out))) {
-                    ByteBuffer buffer = null;
-                    while ((buffer = filledBuffers.poll(50, TimeUnit.MILLISECONDS)) != null) {
-                        int bytesWrittenThisPass = 0;
-                        try {
-                            while (buffer.hasRemaining()) {
-                                bytesWrittenThisPass += wbc.write(buffer);
-                            }
-                            totalBytes += bytesWrittenThisPass;
-                            if (totalBytes > fileSizeTrigger || flowFilePerDatagram) {
-                                break;// this is enough data
-                            }
-                        } finally {
-                            bufferPool.returnBuffer(buffer, bytesWrittenThisPass);
-                        }
-                    }
-                }
-            } catch (final InterruptedException ie) {
-                // irrelevant
-            }
-        }
-
-    }
-}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/WrappedMessageConsumer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/WrappedMessageConsumer.java
deleted file mode 100644
index cc33184807..0000000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/WrappedMessageConsumer.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.util;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
-import org.apache.nifi.logging.ComponentLog;
-
-public class WrappedMessageConsumer {
-
-    private final Connection connection;
-    private final Session session;
-    private final MessageConsumer consumer;
-
-    private boolean closed = false;
-
-    public WrappedMessageConsumer(final Connection connection, final Session jmsSession, final MessageConsumer messageConsumer) {
-        this.connection = connection;
-        this.session = jmsSession;
-        this.consumer = messageConsumer;
-    }
-
-    public Connection getConnection() {
-        return connection;
-    }
-
-    public Session getSession() {
-        return session;
-    }
-
-    public MessageConsumer getConsumer() {
-        return consumer;
-    }
-
-    public void close(final ComponentLog logger) {
-        closed = true;
-
-        try {
-            connection.close();
-        } catch (final JMSException e) {
-            logger.warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e);
-        }
-
-        try {
-            session.close();
-        } catch (final JMSException e) {
-            logger.warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e);
-        }
-
-        try {
-            consumer.close();
-        } catch (final JMSException e) {
-            logger.warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e);
-        }
-    }
-
-    public boolean isClosed() {
-        return closed;
-    }
-}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/WrappedMessageProducer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/WrappedMessageProducer.java
deleted file mode 100644
index 49e586da85..0000000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/WrappedMessageProducer.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.util;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.nifi.logging.ComponentLog;
-
-public class WrappedMessageProducer {
-
-    private final Connection connection;
-    private final Session session;
-    private final MessageProducer producer;
-
-    private boolean closed = false;
-
-    public WrappedMessageProducer(final Connection connection, final Session jmsSession, final MessageProducer messageProducer) {
-        this.connection = connection;
-        this.session = jmsSession;
-        this.producer = messageProducer;
-    }
-
-    public Connection getConnection() {
-        return connection;
-    }
-
-    public Session getSession() {
-        return session;
-    }
-
-    public MessageProducer getProducer() {
-        return producer;
-    }
-
-    public void close(final ComponentLog logger) {
-        closed = true;
-
-        try {
-            connection.close();
-        } catch (final JMSException e) {
-            logger.warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e);
-        }
-
-        try {
-            session.close();
-        } catch (final JMSException e) {
-            logger.warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e);
-        }
-
-        try {
-            producer.close();
-        } catch (final JMSException e) {
-            logger.warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e);
-        }
-    }
-
-    public boolean isClosed() {
-        return closed;
-    }
-}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java
index 9b4a6ef58f..5429bf3f43 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.processors.standard;
 
-import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
 import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
@@ -26,6 +25,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.stubbing.Answer;
 
+import java.io.ByteArrayOutputStream;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/pom.xml
index db389fba2e..4de0a512c0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/pom.xml
@@ -176,17 +176,6 @@
                 <artifactId>javax.jms-api</artifactId>
                 <version>2.0.1</version>
             </dependency>
-            <dependency>
-                <groupId>org.apache.activemq</groupId>
-                <artifactId>activemq-client</artifactId>
-                <version>5.15.15</version>
-            </dependency>
-            <dependency>
-                <groupId>org.apache.activemq</groupId>
-                <artifactId>activemq-broker</artifactId>
-                <version>5.15.14</version>
-                <scope>test</scope>
-            </dependency>
             <dependency>
                 <groupId>org.apache.derby</groupId>
                 <artifactId>derby</artifactId>