You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mo...@apache.org on 2017/05/02 14:19:46 UTC
nifi git commit: NIFI-3223 added support for expression language in
PublishAMQP - EXCHANGE - ROUTING_KEY
Repository: nifi
Updated Branches:
refs/heads/0.x ceec0ecfd -> d38a324b3
NIFI-3223 added support for expression language in PublishAMQP
- EXCHANGE
- ROUTING_KEY
Signed-off-by: Mike Moser <mo...@apache.org>
This closes #1723.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d38a324b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d38a324b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d38a324b
Branch: refs/heads/0.x
Commit: d38a324b3d514d5f1637357ae14f947c42f1dba4
Parents: ceec0ec
Author: Oleg Zhurakousky <ol...@suitcase.io>
Authored: Fri Jan 27 13:41:54 2017 -0500
Committer: Mike Moser <mo...@apache.org>
Committed: Tue May 2 14:17:47 2017 +0000
----------------------------------------------------------------------
.../nifi/amqp/processors/AMQPPublisher.java | 65 ++++++++------------
.../nifi/amqp/processors/PublishAMQP.java | 18 +++---
.../nifi/amqp/processors/AMQPPublisherTest.java | 37 +++--------
.../nifi/amqp/processors/ConsumeAMQPTest.java | 6 +-
4 files changed, 49 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/d38a324b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java
index 41a08d9..b4d6951 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java
@@ -18,9 +18,7 @@ package org.apache.nifi.amqp.processors;
import java.io.IOException;
-import org.apache.nifi.logging.ProcessorLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.nifi.logging.ComponentLog;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Connection;
@@ -32,50 +30,21 @@ import com.rabbitmq.client.ReturnListener;
*/
final class AMQPPublisher extends AMQPWorker {
- private final static Logger logger = LoggerFactory.getLogger(AMQPPublisher.class);
+ private final ComponentLog processLog;
- private final String exchangeName;
-
- private final String routingKey;
-
- private final ProcessorLog processLog;
+ private final String connectionString;
/**
* Creates an instance of this publisher
*
* @param connection
* instance of AMQP {@link Connection}
- * @param exchangeName
- * the name of AMQP exchange to which messages will be published.
- * If not provided 'default' exchange will be used.
- * @param routingKey
- * (required) the name of the routingKey to be used by AMQP-based
- * system to route messages to its final destination (queue).
*/
- AMQPPublisher(Connection connection, String exchangeName, String routingKey, ProcessorLog processLog) {
+ AMQPPublisher(Connection connection, ComponentLog processLog) {
super(connection);
this.processLog = processLog;
- this.validateStringProperty("routingKey", routingKey);
- this.exchangeName = exchangeName == null ? "" : exchangeName.trim();
- if (this.exchangeName.length() == 0) {
- logger.info("The 'exchangeName' is not specified. Messages will be sent to default exchange");
- }
-
- this.routingKey = routingKey;
this.channel.addReturnListener(new UndeliverableMessageLogger());
- logger.info("Successfully connected AMQPPublisher to " + connection.toString() + " and '" + this.exchangeName
- + "' exchange with '" + routingKey + "' as a routing key.");
- }
-
- /**
- * Publishes message without any AMQP properties (see
- * {@link BasicProperties}) to a pre-defined AMQP Exchange.
- *
- * @param bytes
- * bytes representing a message.
- */
- void publish(byte[] bytes) {
- this.publish(bytes, null);
+ this.connectionString = connection.toString();
}
/**
@@ -86,14 +55,28 @@ final class AMQPPublisher extends AMQPWorker {
* bytes representing a message.
* @param properties
* instance of {@link BasicProperties}
+ * @param exchange
+ * the name of AMQP exchange to which messages will be published.
+ * If not provided 'default' exchange will be used.
+ * @param routingKey
+ * (required) the name of the routingKey to be used by AMQP-based
+ * system to route messages to its final destination (queue).
*/
- void publish(byte[] bytes, BasicProperties properties) {
+ void publish(byte[] bytes, BasicProperties properties, String routingKey, String exchange) {
+ this.validateStringProperty("routingKey", routingKey);
+ exchange = exchange == null ? "" : exchange.trim();
+ if (exchange.length() == 0) {
+ processLog.info("The 'exchangeName' is not specified. Messages will be sent to default exchange");
+ }
+ processLog.info("Successfully connected AMQPPublisher to " + this.connectionString + " and '" + exchange
+ + "' exchange with '" + routingKey + "' as a routing key.");
+
if (this.channel.isOpen()) {
try {
- this.channel.basicPublish(this.exchangeName, this.routingKey, true, properties, bytes);
+ this.channel.basicPublish(exchange, routingKey, true, properties, bytes);
} catch (Exception e) {
throw new IllegalStateException("Failed to publish to '" +
- this.exchangeName + "' with '" + this.routingKey + "'.", e);
+ exchange + "' with '" + routingKey + "'.", e);
}
} else {
throw new IllegalStateException("This instance of AMQPPublisher is invalid since "
@@ -106,7 +89,7 @@ final class AMQPPublisher extends AMQPWorker {
*/
@Override
public String toString() {
- return super.toString() + ", EXCHANGE:" + this.exchangeName + ", ROUTING_KEY:" + this.routingKey;
+ return this.connectionString;
}
/**
@@ -127,7 +110,7 @@ final class AMQPPublisher extends AMQPWorker {
throws IOException {
String logMessage = "Message destined for '" + exchangeName + "' exchange with '" + routingKey
+ "' as routing key came back with replyCode=" + replyCode + " and replyText=" + replyText + ".";
- logger.warn(logMessage);
+ processLog.warn(logMessage);
AMQPPublisher.this.processLog.warn(logMessage);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d38a324b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
index 85116c2..330346f 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
@@ -70,6 +70,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
+ "It is an optional property. If kept empty the messages will be sent to a default AMQP exchange.")
.required(true)
.defaultValue("")
+ .expressionLanguageSupported(true)
.addValidator(Validator.VALID)
.build();
public static final PropertyDescriptor ROUTING_KEY = new PropertyDescriptor.Builder()
@@ -79,6 +80,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
+ "corresponds to a destination queue name, otherwise a binding from the Exchange to a Queue via Routing Key must be set "
+ "(usually by the AMQP administrator)")
.required(true)
+ .expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@@ -130,15 +132,19 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
FlowFile flowFile = processSession.get();
if (flowFile != null) {
BasicProperties amqpProperties = this.extractAmqpPropertiesFromFlowFile(flowFile);
+ String routingKey = context.getProperty(ROUTING_KEY).evaluateAttributeExpressions(flowFile).getValue();
+ if (routingKey == null){
+ throw new IllegalArgumentException("Failed to determine 'routing key' with provided value '"
+ + context.getProperty(ROUTING_KEY) + "' after evaluating it as expression against incoming FlowFile.");
+ }
+ String exchange = context.getProperty(EXCHANGE).evaluateAttributeExpressions(flowFile).getValue();
byte[] messageContent = this.extractMessage(flowFile, processSession);
try {
- this.targetResource.publish(messageContent, amqpProperties);
+ this.targetResource.publish(messageContent, amqpProperties, routingKey, exchange);
processSession.transfer(flowFile, REL_SUCCESS);
- processSession.getProvenanceReporter().send(flowFile,
- this.amqpConnection.toString() + "/E:" + context.getProperty(EXCHANGE).getValue() + "/RK:"
- + context.getProperty(ROUTING_KEY).getValue());
+ processSession.getProvenanceReporter().send(flowFile, this.amqpConnection.toString() + "/E:" + exchange + "/RK:" + routingKey);
} catch (Exception e) {
processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
this.getLogger().error("Failed while sending message to AMQP via " + this.targetResource, e);
@@ -168,9 +174,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
*/
@Override
protected AMQPPublisher finishBuildingTargetResource(ProcessContext context) {
- String exchangeName = context.getProperty(EXCHANGE).getValue();
- String routingKey = context.getProperty(ROUTING_KEY).getValue();
- return new AMQPPublisher(this.amqpConnection, exchangeName, routingKey, this.getLogger());
+ return new AMQPPublisher(this.amqpConnection, this.getLogger());
}
/**
http://git-wip-us.apache.org/repos/asf/nifi/blob/d38a324b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java
index 9f01c04..90bd919 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java
@@ -17,7 +17,6 @@
package org.apache.nifi.amqp.processors;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -27,7 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.nifi.util.MockProcessorLog;
+import org.apache.nifi.logging.ComponentLog;
import org.junit.Test;
import org.mockito.Mockito;
@@ -40,31 +39,24 @@ public class AMQPPublisherTest {
@SuppressWarnings("resource")
@Test(expected = IllegalArgumentException.class)
public void failOnNullConnection() {
- new AMQPPublisher(null, null, null, null);
- }
-
- @SuppressWarnings("resource")
- @Test(expected = IllegalArgumentException.class)
- public void failOnMissingRoutingKey() throws Exception {
- Connection conn = new TestConnection(null, null);
- new AMQPPublisher(conn, null, "", null);
+ new AMQPPublisher(null, null);
}
@Test(expected = IllegalStateException.class)
public void failPublishIfChannelClosed() throws Exception {
Connection conn = new TestConnection(null, null);
- try (AMQPPublisher sender = new AMQPPublisher(conn, null, "foo", null)) {
+ try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class))) {
conn.close();
- sender.publish("oleg".getBytes());
+ sender.publish("oleg".getBytes(), null, "foo", "");
}
}
@Test(expected = IllegalStateException.class)
public void failPublishIfChannelFails() throws Exception {
TestConnection conn = new TestConnection(null, null);
- try (AMQPPublisher sender = new AMQPPublisher(conn, null, "foo", null)) {
+ try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class))) {
((TestChannel) conn.createChannel()).corruptChannel();
- sender.publish("oleg".getBytes());
+ sender.publish("oleg".getBytes(), null, "foo", "");
}
}
@@ -77,8 +69,8 @@ public class AMQPPublisherTest {
Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
- try (AMQPPublisher sender = new AMQPPublisher(connection, "myExchange", "key1", null)) {
- sender.publish("hello".getBytes());
+ try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) {
+ sender.publish("hello".getBytes(), null, "key1", "myExchange");
Thread.sleep(200);
}
@@ -100,9 +92,8 @@ public class AMQPPublisherTest {
ReturnListener retListener = mock(ReturnListener.class);
connection.createChannel().addReturnListener(retListener);
- try (AMQPPublisher sender = new AMQPPublisher(connection, "myExchange", "key2",
- new MockProcessorLog("foo", ""))) {
- sender.publish("hello".getBytes());
+ try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) {
+ sender.publish("hello".getBytes(), null, "key1", "myExchange");
Thread.sleep(1000);
}
Thread.sleep(200);
@@ -111,12 +102,4 @@ public class AMQPPublisherTest {
connection.close();
}
- @Test
- public void validateToString() throws Exception {
- TestConnection conn = new TestConnection(null, null);
- try (AMQPPublisher sender = new AMQPPublisher(conn, "myExchange", "key1", null)) {
- String toString = sender.toString();
- assertTrue(toString.contains("EXCHANGE:myExchange, ROUTING_KEY:key1"));
- }
- }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d38a324b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
index 3a2754d..52b48d8 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
@@ -17,12 +17,14 @@
package org.apache.nifi.amqp.processors;
import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
@@ -46,8 +48,8 @@ public class ConsumeAMQPTest {
Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
- try (AMQPPublisher sender = new AMQPPublisher(connection, "myExchange", "key1", null)) {
- sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN);
+ try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) {
+ sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
ConsumeAMQP pubProc = new LocalConsumeAMQP(connection);
TestRunner runner = TestRunners.newTestRunner(pubProc);