You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/06/23 19:45:36 UTC
svn commit: r1687110 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/filter/
broker-core/src/main/java/org/apache/qpid/server/queue/
systests/src/test/java/org/apache/qpid/test/client/message/ test-profiles/
Author: kwall
Date: Tue Jun 23 17:45:36 2015
New Revision: 1687110
URL: http://svn.apache.org/r1687110
Log:
QPID-6607: [Java Broker] Prevent runtime errors occuring during selector evaluation from stopping the Broker
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/message/SelectorTest.java
qpid/java/trunk/test-profiles/JavaExcludes
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java?rev=1687110&r1=1687109&r2=1687110&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java Tue Jun 23 17:45:36 2015
@@ -39,7 +39,7 @@ public interface Filterable extends Filt
long getArrivalTime();
- public class Factory
+ class Factory
{
public static Filterable newInstance(final ServerMessage message, final InstanceProperties properties)
@@ -131,6 +131,23 @@ public interface Filterable extends Filt
return message.getMessageHeader().getHeader(name);
}
+ @Override
+ public String toString()
+ {
+ StringBuilder builder = new StringBuilder();
+ builder.append(getClass().getName());
+ builder.append(" [messageNumber=");
+ builder.append(getMessageNumber());
+ if (getMessageId() != null)
+ {
+ builder.append(", id=");
+ builder.append(getMessageId());
+ }
+ builder.append("]");
+
+
+ return builder.toString();
+ }
};
}
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1687110&r1=1687109&r2=1687110&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Tue Jun 23 17:45:36 2015
@@ -32,8 +32,13 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.JMSSelectorFilter;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.logging.EventLogger;
@@ -57,7 +62,7 @@ class QueueConsumerImpl
extends AbstractConfiguredObject<QueueConsumerImpl>
implements QueueConsumer<QueueConsumerImpl>, LogSubject
{
-
+ private final static Logger LOGGER = LoggerFactory.getLogger(QueueConsumerImpl.class);
private final AtomicBoolean _targetClosed = new AtomicBoolean(false);
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final long _consumerNumber;
@@ -409,7 +414,26 @@ class QueueConsumerImpl
return false;
}
}
- return (_filters == null) || _filters.allAllow(entry.asFilterable());
+
+ if (_filters == null)
+ {
+ return true;
+ }
+ else
+ {
+ Filterable msg = entry.asFilterable();
+ try
+ {
+ return _filters.allAllow(msg);
+ }
+ catch (SelectorParsingException e)
+ {
+ LOGGER.info(this + " could not evaluate filter [" + _filters
+ + "] against message " + msg
+ + ". Error was : " + e.getMessage());
+ return false;
+ }
+ }
}
protected String getFilterLogString()
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/message/SelectorTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/message/SelectorTest.java?rev=1687110&r1=1687109&r2=1687110&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/message/SelectorTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/message/SelectorTest.java Tue Jun 23 17:45:36 2015
@@ -55,12 +55,10 @@ public class SelectorTest extends QpidBr
private static final String BAD_MATHS_SELECTOR = " 1 % 5";
- private static final long RECIEVE_TIMEOUT = 1000;
-
protected void setUp() throws Exception
{
super.setUp();
- init((AMQConnection) getConnection("guest", "guest"));
+ init((AMQConnection) getConnection());
}
private void init(AMQConnection connection) throws JMSException
@@ -85,10 +83,7 @@ public class SelectorTest extends QpidBr
public void testUsingOnMessage() throws Exception
{
String selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'";
- // selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT;
-
- Session session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
- // _session.createConsumer(destination).setMessageListener(this);
+ Session session = _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
session.createConsumer(_destination, selector).setMessageListener(this);
try
@@ -206,7 +201,6 @@ public class SelectorTest extends QpidBr
caught = true;
}
assertTrue("No exception thrown!", caught);
- caught = false;
}
@@ -219,20 +213,14 @@ public class SelectorTest extends QpidBr
sentMsg.setIntProperty("testproperty", 1); // 1 % 5
producer.send(sentMsg);
- Message recvd = consumer.receive(RECIEVE_TIMEOUT);
- assertNotNull(recvd);
+ Message recvd = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message matching selector should be received", recvd);
sentMsg.setStringProperty("testproperty", "hello"); // "hello" % 5 makes no sense
producer.send(sentMsg);
- try
- {
- recvd = consumer.receive(RECIEVE_TIMEOUT);
- assertNull(recvd);
- }
- catch (Exception e)
- {
+ recvd = consumer.receive(RECEIVE_TIMEOUT);
+ assertNull("Message ausing runtime selector error should be received", recvd);
- }
assertFalse("Connection should not be closed", _connection.isClosed());
}
Modified: qpid/java/trunk/test-profiles/JavaExcludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/JavaExcludes?rev=1687110&r1=1687109&r2=1687110&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/JavaExcludes (original)
+++ qpid/java/trunk/test-profiles/JavaExcludes Tue Jun 23 17:45:36 2015
@@ -24,10 +24,6 @@ org.apache.qpid.client.SessionCreateTest
org.apache.qpid.test.client.queue.QueuePolicyTest#testRingPolicy
org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy
-//QPID-4153 Messages causing a runtime selector error should be dead-lettered (or something similar)
-org.apache.qpid.test.client.message.SelectorTest#testRuntimeSelectorError
-
-
org.apache.qpid.server.protocol.v0_8.AckTest#*
org.apache.qpid.server.protocol.v0_8.AcknowledgeTest#*
org.apache.qpid.server.protocol.v0_8.AMQChannelTest#*
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org