You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/03/17 23:44:38 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5559

Repository: activemq
Updated Branches:
  refs/heads/master 1a0f73ed1 -> ca456c460


https://issues.apache.org/jira/browse/AMQ-5559

Fix and tests for filter handling on attach.  We only support JMS
selector and NoLocal type filters for receivers so only report those
back, all others are dropped to indicate we will not honor them.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ca456c46
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ca456c46
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ca456c46

Branch: refs/heads/master
Commit: ca456c4601c5e659f9864041af87f489a0e63e4b
Parents: 1a0f73e
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Mar 17 18:44:24 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Mar 17 18:44:24 2015 -0400

----------------------------------------------------------------------
 .../transport/amqp/AmqpProtocolConverter.java   | 26 +++++---
 .../activemq/transport/amqp/AmqpSupport.java    | 11 ++--
 .../transport/amqp/client/AmqpClient.java       |  2 +-
 .../amqp/client/AmqpJmsSelectorFilter.java      | 48 +++++++++++++++
 .../amqp/client/AmqpJmsSelectorType.java        | 47 ---------------
 .../amqp/client/AmqpNoLocalFilter.java          | 45 ++++++++++++++
 .../transport/amqp/client/AmqpNoLocalType.java  | 44 --------------
 .../transport/amqp/client/AmqpReceiver.java     | 36 +++++++++--
 .../transport/amqp/client/AmqpSession.java      | 35 +++++++++++
 .../amqp/client/AmqpUnknownFilterType.java      |  7 ++-
 .../amqp/client/util/UnmodifiableLink.java      |  4 +-
 .../amqp/interop/AmqpConnectionsTest.java       |  2 +-
 .../amqp/interop/AmqpReceiverTest.java          | 63 ++++++++++++++++++--
 13 files changed, 249 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 3661f3d..5a73a25 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -1417,15 +1417,18 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) sender.getRemoteSource();
 
         try {
+            final Map<Symbol, Object> supportedFilters = new HashMap<Symbol, Object>();
             final ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
             final ConsumerContext consumerContext = new ConsumerContext(id, sender);
             sender.setContext(consumerContext);
 
+            boolean noLocal = false;
             String selector = null;
+
             if (source != null) {
-                DescribedType filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS);
+                Map.Entry<Symbol, DescribedType> filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS);
                 if (filter != null) {
-                    selector = filter.getDescribed().toString();
+                    selector = filter.getValue().getDescribed().toString();
                     // Validate the Selector.
                     try {
                         SelectorParser.parse(selector);
@@ -1436,6 +1439,14 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                         consumerContext.closed = true;
                         return;
                     }
+
+                    supportedFilters.put(filter.getKey(), filter.getValue());
+                }
+
+                filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS);
+                if (filter != null) {
+                    noLocal = true;
+                    supportedFilters.put(filter.getKey(), filter.getValue());
                 }
             }
 
@@ -1449,7 +1460,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                     source.setAddress(destination.getQualifiedName());
                     source.setDurable(TerminusDurability.UNSETTLED_STATE);
                     source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
-                    sender.setSource(source);
                 } else {
                     consumerContext.closed = true;
                     sender.setSource(null);
@@ -1465,7 +1475,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                 source = new org.apache.qpid.proton.amqp.messaging.Source();
                 source.setAddress(destination.getQualifiedName());
                 source.setDynamic(true);
-                sender.setSource(source);
                 consumerContext.addCloseAction(new Runnable() {
 
                     @Override
@@ -1477,6 +1486,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                 destination = createDestination(source);
             }
 
+            source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);
+            sender.setSource(source);
+
             int senderCredit = sender.getRemoteCredit();
 
             subscriptionsByConsumerId.put(id, consumerContext);
@@ -1486,6 +1498,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
             consumerInfo.setDestination(destination);
             consumerInfo.setPrefetchSize(senderCredit >= 0 ? senderCredit : 0);
             consumerInfo.setDispatchAsync(true);
+            consumerInfo.setNoLocal(noLocal);
 
             if (source.getDistributionMode() == COPY && destination.isQueue()) {
                 consumerInfo.setBrowser(true);
@@ -1495,11 +1508,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                 consumerInfo.setSubscriptionName(sender.getName());
             }
 
-            DescribedType filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS);
-            if (filter != null) {
-                consumerInfo.setNoLocal(true);
-            }
-
             consumerContext.info = consumerInfo;
             consumerContext.setDestination(destination);
             consumerContext.credit = senderCredit;

http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
index c0cfb94..7af4c2c 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.amqp;
 
 import java.nio.ByteBuffer;
+import java.util.AbstractMap;
 import java.util.Map;
 
 import org.apache.qpid.proton.amqp.Binary;
@@ -86,7 +87,7 @@ public class AmqpSupport {
      *
      * @return the filter if found in the mapping or null if not found.
      */
-    public static DescribedType findFilter(Map<Symbol, Object> filters, Object[] filterIds) {
+    public static Map.Entry<Symbol, DescribedType> findFilter(Map<Symbol, Object> filters, Object[] filterIds) {
 
         if (filterIds == null || filterIds.length == 0) {
             throw new IllegalArgumentException("Invalid Filter Ids array passed: " + filterIds);
@@ -96,14 +97,14 @@ public class AmqpSupport {
             return null;
         }
 
-        for (Object value : filters.values()) {
-            if (value instanceof DescribedType) {
-                DescribedType describedType = ((DescribedType) value);
+        for (Map.Entry<Symbol, Object> filter : filters.entrySet()) {
+            if (filter.getValue() instanceof DescribedType) {
+                DescribedType describedType = ((DescribedType) filter.getValue());
                 Object descriptor = describedType.getDescriptor();
 
                 for (Object filterId : filterIds) {
                     if (descriptor.equals(filterId)) {
-                        return describedType;
+                        return new AbstractMap.SimpleImmutableEntry<Symbol, DescribedType>(filter.getKey(), describedType);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
index e7d3eaf..2762732 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
@@ -187,7 +187,7 @@ public class AmqpClient {
      * @param stateInspector
      *        the new state inspector to use.
      */
-    public void setStateInspector(AmqpValidator stateInspector) {
+    public void setValidator(AmqpValidator stateInspector) {
         if (stateInspector == null) {
             stateInspector = new AmqpValidator();
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorFilter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorFilter.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorFilter.java
new file mode 100644
index 0000000..9fad2ef
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorFilter.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_CODE;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+
+/**
+ * A Described Type wrapper for JMS selector values.
+ */
+public class AmqpJmsSelectorFilter implements DescribedType {
+
+    private final String selector;
+
+    public AmqpJmsSelectorFilter(String selector) {
+        this.selector = selector;
+    }
+
+    @Override
+    public Object getDescriptor() {
+        return JMS_SELECTOR_CODE;
+    }
+
+    @Override
+    public Object getDescribed() {
+        return this.selector;
+    }
+
+    @Override
+    public String toString() {
+        return "AmqpJmsSelectorType{" + selector + "}";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java
deleted file mode 100644
index d93e052..0000000
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.client;
-
-import org.apache.qpid.proton.amqp.DescribedType;
-import org.apache.qpid.proton.amqp.UnsignedLong;
-
-/**
- * A Described Type wrapper for JMS selector values.
- */
-public class AmqpJmsSelectorType implements DescribedType {
-
-    private final String selector;
-
-    public AmqpJmsSelectorType(String selector) {
-        this.selector = selector;
-    }
-
-    @Override
-    public Object getDescriptor() {
-        return UnsignedLong.valueOf(0x0000468C00000004L);
-    }
-
-    @Override
-    public Object getDescribed() {
-        return this.selector;
-    }
-
-    @Override
-    public String toString() {
-        return "AmqpJmsSelectorType{" + selector + "}";
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java
new file mode 100644
index 0000000..0bdd71e
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_CODE;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+
+/**
+ * A Described Type wrapper for JMS no local option for MessageConsumer.
+ */
+public class AmqpNoLocalFilter implements DescribedType {
+
+    public static final AmqpNoLocalFilter NO_LOCAL = new AmqpNoLocalFilter();
+
+    private final String noLocal;
+
+    public AmqpNoLocalFilter() {
+        this.noLocal = "NoLocalFilter{}";
+    }
+
+    @Override
+    public Object getDescriptor() {
+        return NO_LOCAL_CODE;
+    }
+
+    @Override
+    public Object getDescribed() {
+        return this.noLocal;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java
deleted file mode 100644
index 2d61b83..0000000
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.client;
-
-import org.apache.qpid.proton.amqp.DescribedType;
-import org.apache.qpid.proton.amqp.UnsignedLong;
-
-/**
- * A Described Type wrapper for JMS no local option for MessageConsumer.
- */
-public class AmqpNoLocalType implements DescribedType {
-
-    public static final AmqpNoLocalType NO_LOCAL = new AmqpNoLocalType();
-
-    private final String noLocal;
-
-    public AmqpNoLocalType() {
-        this.noLocal = "NoLocalFilter{}";
-    }
-
-    @Override
-    public Object getDescriptor() {
-        return UnsignedLong.valueOf(0x0000468C00000003L);
-    }
-
-    @Override
-    public Object getDescribed() {
-        return this.noLocal;
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index ff530b9..1290d27 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -76,6 +76,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
     private String selector;
     private boolean presettle;
     private boolean noLocal;
+    private Source userSpecifiedSource;
 
     /**
      * Create a new receiver instance.
@@ -94,6 +95,28 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
     }
 
     /**
+     * Create a new receiver instance.
+     *
+     * @param session
+     *        The parent session that created the receiver.
+     * @param source
+     *        The Source instance to use instead of creating and configuring one.
+     * @param receiverId
+     *        The unique ID assigned to this receiver.
+     */
+    public AmqpReceiver(AmqpSession session, Source source, String receiverId) {
+
+        if (source == null) {
+            throw new IllegalArgumentException("User specified Source cannot be null");
+        }
+
+        this.session = session;
+        this.userSpecifiedSource = source;
+        this.address = source.getAddress();
+        this.receiverId = receiverId;
+    }
+
+    /**
      * Close the receiver, a closed receiver will throw exceptions if any further send
      * calls are made.
      *
@@ -423,11 +446,14 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
     @Override
     protected void doOpen() {
 
-        Source source = new Source();
-        source.setAddress(address);
+        Source source = userSpecifiedSource;
         Target target = new Target();
 
-        configureSource(source);
+        if (userSpecifiedSource == null) {
+            source = new Source();
+            source.setAddress(address);
+            configureSource(source);
+        }
 
         String receiverName = receiverId + ":" + address;
 
@@ -523,11 +549,11 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
         source.setDefaultOutcome(modified);
 
         if (isNoLocal()) {
-            filters.put(NO_LOCAL_NAME, AmqpNoLocalType.NO_LOCAL);
+            filters.put(NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL);
         }
 
         if (getSelector() != null && !getSelector().trim().equals("")) {
-            filters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorType(getSelector()));
+            filters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(getSelector()));
         }
 
         if (!filters.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index c747dc6..8b039b6 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.transport.amqp.client.util.ClientFuture;
 import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession;
+import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Session;
 
@@ -152,6 +153,40 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
     }
 
     /**
+     * Create a receiver instance using the given address
+     *
+     * @param address
+     *        the address to which the receiver will subscribe for its messages.
+     * @param source
+     *        the caller created and configured Source used to create the receiver link.
+     *
+     * @return a newly created receiver that is ready for use.
+     *
+     * @throws Exception if an error occurs while creating the receiver.
+     */
+    public AmqpReceiver createReceiver(Source source) throws Exception {
+        checkClosed();
+
+        final ClientFuture request = new ClientFuture();
+        final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, getNextReceiverId());
+
+        connection.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+                receiver.setStateInspector(getStateInspector());
+                receiver.open(request);
+                pumpToProtonTransport();
+            }
+        });
+
+        request.sync();
+
+        return receiver;
+    }
+
+    /**
      * Create a receiver instance using the given address that creates a durable subscription.
      *
      * @param address

http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java
index c86a2c9..9f3c840 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.amqp.client;
 
 import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedLong;
 
 /**
@@ -26,6 +27,10 @@ public class AmqpUnknownFilterType implements DescribedType {
 
     public static final AmqpUnknownFilterType UNKOWN_FILTER = new AmqpUnknownFilterType();
 
+    public static final UnsignedLong UNKNOWN_FILTER_CODE = UnsignedLong.valueOf(0x0000468C00000099L);
+    public static final Symbol UNKNOWN_FILTER_NAME = Symbol.valueOf("apache.org:unkown-filter:string");
+    public static final Object[] UNKNOWN_FILTER_IDS = new Object[] { UNKNOWN_FILTER_CODE, UNKNOWN_FILTER_NAME };
+
     private final String payload;
 
     public AmqpUnknownFilterType() {
@@ -34,7 +39,7 @@ public class AmqpUnknownFilterType implements DescribedType {
 
     @Override
     public Object getDescriptor() {
-        return UnsignedLong.valueOf(0x0000468C00000099L);
+        return UNKNOWN_FILTER_CODE;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java
index c7a99d3..fd44dcd 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java
@@ -147,13 +147,13 @@ public class UnmodifiableLink implements Link {
     @Override
     public Source getRemoteSource() {
         // TODO Figure out a simple way to wrap the odd Source types in Proton-J
-        return link.getSource();
+        return link.getRemoteSource();
     }
 
     @Override
     public Target getRemoteTarget() {
         // TODO Figure out a simple way to wrap the odd Target types in Proton-J
-        return link.getTarget();
+        return link.getRemoteTarget();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java
index 2f9935f..dfe3a4b 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java
@@ -62,7 +62,7 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport {
         AmqpClient client = createAmqpClient();
         assertNotNull(client);
 
-        client.setStateInspector(new AmqpValidator() {
+        client.setValidator(new AmqpValidator() {
 
             @Override
             public void inspectOpenedResource(Connection connection) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/ca456c46/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
index cdecab0..13b5904 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -33,10 +34,14 @@ import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.activemq.transport.amqp.client.AmqpUnknownFilterType;
 import org.apache.activemq.transport.amqp.client.AmqpValidator;
 import org.apache.activemq.util.Wait;
+import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.proton.engine.Receiver;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -74,18 +79,18 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
     public void testCreateQueueReceiverWithJMSSelector() throws Exception {
         AmqpClient client = createAmqpClient();
 
-        client.setStateInspector(new AmqpValidator() {
+        client.setValidator(new AmqpValidator() {
 
             @SuppressWarnings("unchecked")
             @Override
             public void inspectOpenedResource(Receiver receiver) {
                 LOG.info("Receiver opened: {}", receiver);
 
-                if (receiver.getSource() == null) {
+                if (receiver.getRemoteSource() == null) {
                     markAsInvalid("Link opened with null source.");
                 }
 
-                Source source = (Source) receiver.getSource();
+                Source source = (Source) receiver.getRemoteSource();
                 Map<Symbol, Object> filters = source.getFilter();
 
                 if (findFilter(filters, JMS_SELECTOR_FILTER_IDS) == null) {
@@ -111,18 +116,18 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
     public void testCreateQueueReceiverWithNoLocalSet() throws Exception {
         AmqpClient client = createAmqpClient();
 
-        client.setStateInspector(new AmqpValidator() {
+        client.setValidator(new AmqpValidator() {
 
             @SuppressWarnings("unchecked")
             @Override
             public void inspectOpenedResource(Receiver receiver) {
                 LOG.info("Receiver opened: {}", receiver);
 
-                if (receiver.getSource() == null) {
+                if (receiver.getRemoteSource() == null) {
                     markAsInvalid("Link opened with null source.");
                 }
 
-                Source source = (Source) receiver.getSource();
+                Source source = (Source) receiver.getRemoteSource();
                 Map<Symbol, Object> filters = source.getFilter();
 
                 if (findFilter(filters, NO_LOCAL_FILTER_IDS) == null) {
@@ -363,4 +368,50 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
 
         connection.close();
     }
+
+    @Test(timeout = 60000)
+    public void testUnsupportedFiltersAreNotListedAsSupported() throws Exception {
+        AmqpClient client = createAmqpClient();
+
+        client.setValidator(new AmqpValidator() {
+
+            @SuppressWarnings("unchecked")
+            @Override
+            public void inspectOpenedResource(Receiver receiver) {
+                LOG.info("Receiver opened: {}", receiver);
+
+                if (receiver.getRemoteSource() == null) {
+                    markAsInvalid("Link opened with null source.");
+                }
+
+                Source source = (Source) receiver.getRemoteSource();
+                Map<Symbol, Object> filters = source.getFilter();
+
+                if (findFilter(filters, AmqpUnknownFilterType.UNKNOWN_FILTER_IDS) != null) {
+                    markAsInvalid("Broker should not return unsupported filter on attach.");
+                }
+            }
+        });
+
+        Map<Symbol, DescribedType> filters = new HashMap<Symbol, DescribedType>();
+        filters.put(AmqpUnknownFilterType.UNKNOWN_FILTER_NAME, AmqpUnknownFilterType.UNKOWN_FILTER);
+
+        Source source = new Source();
+        source.setAddress("queue://" + getTestName());
+        source.setFilter(filters);
+        source.setDurable(TerminusDurability.NONE);
+        source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        assertEquals(0, brokerService.getAdminView().getQueues().length);
+
+        session.createReceiver(source);
+
+        assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length);
+
+        connection.getStateInspector().assertValid();
+        connection.close();
+    }
 }