You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/12/04 21:50:27 UTC
qpid-broker-j git commit: [Broker-J][AMQP 0-8..0-10] Queue declare
arguments that match know attributes should be acceptable
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 50b462d7a -> c80aceab2
[Broker-J][AMQP 0-8..0-10] Queue declare arguments that match know attributes should be acceptable
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c80aceab
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c80aceab
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c80aceab
Branch: refs/heads/master
Commit: c80aceab29fc8c6c835c0d83554286bbe2ed8e36
Parents: 50b462d
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Dec 4 18:04:34 2018 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Dec 4 18:04:34 2018 +0000
----------------------------------------------------------------------
.../server/queue/QueueArgumentsConverter.java | 20 ++-
.../protocol/v0_10/ServerSessionDelegate.java | 4 +-
.../qpid/server/protocol/v0_8/AMQChannel.java | 2 +-
.../qpid/tests/protocol/v0_10/Interaction.java | 22 +++
.../tests/protocol/v0_10/QueueInteraction.java | 8 +
.../v0_10/LargeApplicationHeadersTest.java | 37 +----
.../protocol/v0_10/LargeMessageBodyTest.java | 36 +----
.../qpid/tests/protocol/v0_10/MessageTest.java | 81 +++-------
.../v0_10/extensions/queue/QueueTest.java | 152 +++++++++++++++++
.../v0_8/extension/queue/QueueTest.java | 161 +++++++++++++++++++
10 files changed, 397 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
index 65be871..e67e9a3 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
@@ -20,16 +20,21 @@
*/
package org.apache.qpid.server.queue;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObjectAttribute;
+import org.apache.qpid.server.model.ConfiguredObjectTypeRegistry;
+import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.OverflowPolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -130,11 +135,24 @@ public class QueueArgumentsConverter
public static Map<String,Object> convertWireArgsToModel(final String queueName,
- Map<String, Object> wireArguments)
+ final Map<String, Object> wireArguments,
+ final Model model)
{
Map<String,Object> modelArguments = new HashMap<>();
if(wireArguments != null)
{
+ final ConfiguredObjectTypeRegistry typeRegistry = model.getTypeRegistry();
+ final Map<String, ConfiguredObjectAttribute<?, ?>> attributeTypes =
+ new HashMap<>(typeRegistry.getAttributeTypes(Queue.class));
+ typeRegistry.getTypeSpecialisations(Queue.class)
+ .forEach(type -> typeRegistry.getTypeSpecificAttributes(type)
+ .forEach(t -> attributeTypes.put(t.getName(), t)));
+ wireArguments.entrySet()
+ .stream()
+ .filter(entry -> attributeTypes.containsKey(entry.getKey())
+ && !attributeTypes.get(entry.getKey()).isDerived())
+ .forEach(entry -> modelArguments.put(entry.getKey(), entry.getValue()));
+
for(Map.Entry<String,String> entry : ATTRIBUTE_MAPPINGS.entrySet())
{
if(wireArguments.containsKey(entry.getKey()))
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 505ea89..5dda560 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -1557,7 +1557,9 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
try
{
final Map<String, Object> arguments = QueueArgumentsConverter.convertWireArgsToModel(queueName,
- method.getArguments());
+ method.getArguments(),
+ session.getAMQPConnection()
+ .getModel());
final String alternateExchangeName = method.getAlternateExchange();
if (method.hasAlternateExchange() && !nameNullOrEmpty(alternateExchangeName))
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 4b4f9c7..e2048a0 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -2985,7 +2985,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
}
Map<String, Object> attributes =
- QueueArgumentsConverter.convertWireArgsToModel(queueNameString, wireArguments);
+ QueueArgumentsConverter.convertWireArgsToModel(queueNameString, wireArguments, getModel());
attributes.put(Queue.NAME, queueNameString);
attributes.put(Queue.DURABLE, durable);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
index 03deef5..8804c31 100644
--- a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
@@ -21,6 +21,7 @@
package org.apache.qpid.tests.protocol.v0_10;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import org.apache.qpid.server.protocol.v0_10.transport.BBDecoder;
import org.apache.qpid.server.protocol.v0_10.transport.BBEncoder;
@@ -34,6 +35,7 @@ import org.apache.qpid.server.protocol.v0_10.transport.Method;
import org.apache.qpid.server.protocol.v0_10.transport.SessionAttached;
import org.apache.qpid.tests.protocol.AbstractFrameTransport;
import org.apache.qpid.tests.protocol.AbstractInteraction;
+import org.apache.qpid.tests.protocol.Response;
public class Interaction extends AbstractInteraction<Interaction>
{
@@ -204,4 +206,24 @@ public class Interaction extends AbstractInteraction<Interaction>
{
return _exchangeInteraction;
}
+
+ public <T extends Method> T consume(final Class<T> expected,
+ final Class<? extends Method>... ignore)
+ throws Exception
+ {
+ final Class<? extends Method>[] expectedResponses = Arrays.copyOf(ignore, ignore.length + 1);
+ expectedResponses[ignore.length] = expected;
+
+ T completed = null;
+ do
+ {
+ Response<?> response = consumeResponse(expectedResponses).getLatestResponse();
+ if (expected.isAssignableFrom(response.getBody().getClass()))
+ {
+ completed = (T) response.getBody();
+ }
+ }
+ while (completed == null);
+ return completed;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/QueueInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/QueueInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/QueueInteraction.java
index ca3edee..6171466 100644
--- a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/QueueInteraction.java
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/QueueInteraction.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.tests.protocol.v0_10;
+import java.util.Map;
+
import org.apache.qpid.server.protocol.v0_10.transport.QueueDeclare;
import org.apache.qpid.server.protocol.v0_10.transport.QueueDelete;
import org.apache.qpid.server.protocol.v0_10.transport.QueuePurge;
@@ -143,4 +145,10 @@ public class QueueInteraction
{
return _interaction.sendPerformative(_query);
}
+
+ public QueueInteraction declareArguments(final Map<String, Object> arguments)
+ {
+ _declare.setArguments(arguments);
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeApplicationHeadersTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeApplicationHeadersTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeApplicationHeadersTest.java
index 2872f46..51a376a 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeApplicationHeadersTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeApplicationHeadersTest.java
@@ -24,22 +24,17 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.apache.qpid.server.protocol.v0_10.transport.*;
-import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -113,12 +108,10 @@ public class LargeApplicationHeadersTest extends BrokerAdminUsingTestBase
.consumeResponse()
.getLatestResponse(SessionCompleted.class);
-
- MessageTransfer transfer = consumeResponse(interaction,
- MessageTransfer.class,
- SessionCompleted.class,
- SessionCommandPoint.class,
- SessionConfirmed.class);
+ MessageTransfer transfer = interaction.consume(MessageTransfer.class,
+ SessionCompleted.class,
+ SessionCommandPoint.class,
+ SessionConfirmed.class);
assertThat(transfer.getBodySize(), is(0));
@@ -134,28 +127,6 @@ public class LargeApplicationHeadersTest extends BrokerAdminUsingTestBase
}
}
- private <T extends Method> T consumeResponse(final Interaction interaction,
- final Class<T> expected,
- final Class<? extends Method>... ignore)
- throws Exception
- {
- List<Class<? extends Method>> possibleResponses = new ArrayList<>(Arrays.asList(ignore));
- possibleResponses.add(expected);
-
- T completed = null;
- do
- {
- interaction.consumeResponse(possibleResponses.toArray(new Class[possibleResponses.size()]));
- Response<?> response = interaction.getLatestResponse();
- if (expected.isAssignableFrom(response.getBody().getClass()))
- {
- completed = (T) response.getBody();
- }
- }
- while (completed == null);
- return completed;
- }
-
private Map<String, Object> createApplicationHeadersThatExceedSingleFrame(final int headerPropertySize, final int maxFrameSize)
{
Map<String, Object> applicationHeaders = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeMessageBodyTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeMessageBodyTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeMessageBodyTest.java
index cc2ef42..6d829d5 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeMessageBodyTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeMessageBodyTest.java
@@ -26,9 +26,6 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
import java.util.stream.IntStream;
import org.junit.Before;
@@ -41,11 +38,9 @@ import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTune;
import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
-import org.apache.qpid.server.protocol.v0_10.transport.Method;
import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint;
import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted;
import org.apache.qpid.server.protocol.v0_10.transport.SessionConfirmed;
-import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -119,11 +114,10 @@ public class LargeMessageBodyTest extends BrokerAdminUsingTestBase
.consumeResponse()
.getLatestResponse(SessionCompleted.class);
- MessageTransfer transfer = consumeResponse(interaction,
- MessageTransfer.class,
- SessionCompleted.class,
- SessionCommandPoint.class,
- SessionConfirmed.class);
+ MessageTransfer transfer = interaction.consume(MessageTransfer.class,
+ SessionCompleted.class,
+ SessionCommandPoint.class,
+ SessionConfirmed.class);
assertThat(transfer.getBodySize(), is(equalTo(messageContent.length)));
QpidByteBuffer receivedBody = transfer.getBody();
@@ -132,26 +126,4 @@ public class LargeMessageBodyTest extends BrokerAdminUsingTestBase
assertThat(receivedBytes, is(equalTo(messageContent)));
}
}
-
- private <T extends Method> T consumeResponse(final Interaction interaction,
- final Class<T> expected,
- final Class<? extends Method>... ignore)
- throws Exception
- {
- List<Class<? extends Method>> possibleResponses = new ArrayList<>(Arrays.asList(ignore));
- possibleResponses.add(expected);
-
- T completed = null;
- do
- {
- interaction.consumeResponse(possibleResponses.toArray(new Class[possibleResponses.size()]));
- Response<?> response = interaction.getLatestResponse();
- if (expected.isAssignableFrom(response.getBody().getClass()))
- {
- completed = (T) response.getBody();
- }
- }
- while (completed == null);
- return completed;
- }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
index 6747451..08baf22 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
@@ -27,9 +27,6 @@ import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
import org.junit.Before;
import org.junit.Test;
@@ -41,14 +38,12 @@ import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
-import org.apache.qpid.server.protocol.v0_10.transport.Method;
import org.apache.qpid.server.protocol.v0_10.transport.Range;
import org.apache.qpid.server.protocol.v0_10.transport.RangeSet;
import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint;
import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted;
import org.apache.qpid.server.protocol.v0_10.transport.SessionConfirmed;
import org.apache.qpid.server.protocol.v0_10.transport.SessionFlush;
-import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -115,10 +110,8 @@ public class MessageTest extends BrokerAdminUsingTestBase
.flushCompleted()
.flush();
- SessionCompleted completed = consumeResponse(interaction,
- SessionCompleted.class,
- SessionCommandPoint.class,
- SessionConfirmed.class);
+ SessionCompleted completed =
+ interaction.consume(SessionCompleted.class, SessionCommandPoint.class, SessionConfirmed.class);
assertThat(completed.getCommands(), is(notNullValue()));
assertThat(completed.getCommands().includes(0), is(equalTo(true)));
@@ -160,11 +153,10 @@ public class MessageTest extends BrokerAdminUsingTestBase
.flowValue(-1)
.flow();
- MessageTransfer transfer = consumeResponse(interaction,
- MessageTransfer.class,
- SessionCompleted.class,
- SessionCommandPoint.class,
- SessionConfirmed.class);
+ MessageTransfer transfer = interaction.consume(MessageTransfer.class,
+ SessionCompleted.class,
+ SessionCommandPoint.class,
+ SessionConfirmed.class);
try (QpidByteBuffer buffer = transfer.getBody())
{
@@ -210,11 +202,10 @@ public class MessageTest extends BrokerAdminUsingTestBase
.flowValue(-1)
.flow();
- MessageTransfer transfer = consumeResponse(interaction,
- MessageTransfer.class,
- SessionCompleted.class,
- SessionCommandPoint.class,
- SessionConfirmed.class);
+ MessageTransfer transfer = interaction.consume(MessageTransfer.class,
+ SessionCompleted.class,
+ SessionCommandPoint.class,
+ SessionConfirmed.class);
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
@@ -224,11 +215,10 @@ public class MessageTest extends BrokerAdminUsingTestBase
.flushCompleted()
.flush();
- SessionCompleted completed = consumeResponse(interaction,
- SessionCompleted.class,
- SessionCommandPoint.class,
- SessionConfirmed.class,
- SessionFlush.class);
+ SessionCompleted completed = interaction.consume(SessionCompleted.class,
+ SessionCommandPoint.class,
+ SessionConfirmed.class,
+ SessionFlush.class);
assertThat(completed.getCommands(), is(notNullValue()));
assertThat(completed.getCommands().includes(3), is(equalTo(true)));
@@ -273,12 +263,11 @@ public class MessageTest extends BrokerAdminUsingTestBase
.flowValue(-1)
.flow();
- MessageTransfer transfer = consumeResponse(interaction,
- MessageTransfer.class,
- SessionCompleted.class,
- SessionCommandPoint.class,
- SessionConfirmed.class,
- SessionFlush.class);
+ MessageTransfer transfer = interaction.consume(MessageTransfer.class,
+ SessionCompleted.class,
+ SessionCommandPoint.class,
+ SessionConfirmed.class,
+ SessionFlush.class);
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
@@ -293,11 +282,10 @@ public class MessageTest extends BrokerAdminUsingTestBase
.session().flushCompleted()
.flush();
- SessionCompleted completed = consumeResponse(interaction,
- SessionCompleted.class,
- SessionCommandPoint.class,
- SessionConfirmed.class,
- SessionFlush.class);
+ SessionCompleted completed = interaction.consume(SessionCompleted.class,
+ SessionCommandPoint.class,
+ SessionConfirmed.class,
+ SessionFlush.class);
assertThat(completed.getCommands(), is(notNullValue()));
assertThat(completed.getCommands().includes(4), is(equalTo(true)));
@@ -305,27 +293,4 @@ public class MessageTest extends BrokerAdminUsingTestBase
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
}
}
-
- private <T extends Method> T consumeResponse(final Interaction interaction,
- final Class<T> expected,
- final Class<? extends Method>... ignore)
- throws Exception
- {
- List<Class<? extends Method>> possibleResponses = new ArrayList<>(Arrays.asList(ignore));
- possibleResponses.add(expected);
-
- T completed = null;
- do
- {
- interaction.consumeResponse(possibleResponses.toArray(new Class[possibleResponses.size()]));
- Response<?> response = interaction.getLatestResponse();
- if (expected.isAssignableFrom(response.getBody().getClass()))
- {
- completed = (T) response.getBody();
- }
- }
- while (completed == null);
- return completed;
- }
-
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java
new file mode 100644
index 0000000..f5216c2
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java
@@ -0,0 +1,152 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10.extensions.queue;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assume.assumeThat;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_10.transport.ExecutionResult;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
+import org.apache.qpid.server.protocol.v0_10.transport.QueueQueryResult;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionConfirmed;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v0_10.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_10.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+public class QueueTest extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+ private static final byte[] SESSION_NAME = "test".getBytes(UTF_8);
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ }
+
+ @Test
+ @SpecificationTest(section = "10.queue.declare", description = "This command creates or checks a queue.")
+ public void queueDeclareUsingRealQueueAttributesInWireArguments() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ SessionCompleted completed = interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(SESSION_NAME)
+ .queue()
+ .declareQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .declareArguments(Collections.singletonMap("defaultFilters",
+ "{\"selector\":{\"x-filter-jms-selector\":[\"id=2\"]}}"))
+ .declareId(0)
+ .declare()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse()
+ .getLatestResponse(SessionCompleted.class);
+
+ assertThat(completed.getCommands().includes(0), is(equalTo(true)));
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+
+ MessageProperties messageProperties1 = new MessageProperties();
+ messageProperties1.setApplicationHeaders(Collections.singletonMap("id", 1));
+
+ interaction.message()
+ .transferDestination(BrokerAdmin.TEST_QUEUE_NAME)
+ .transferId(0)
+ .transferHeader(null, messageProperties1)
+ .transferBody("Test1".getBytes(StandardCharsets.UTF_8))
+ .transfer()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse();
+
+ MessageProperties messageProperties2 = new MessageProperties();
+ messageProperties2.setApplicationHeaders(Collections.singletonMap("id", 2));
+ final String body2 = "Message 2 Content";
+ interaction.message()
+ .transferDestination(BrokerAdmin.TEST_QUEUE_NAME)
+ .transferId(1)
+ .transferHeader(null, messageProperties2)
+ .transferBody(body2.getBytes(StandardCharsets.UTF_8))
+ .transfer()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse();
+
+ final String subscriberName = "Test";
+ interaction.message()
+ .subscribeDestination(subscriberName)
+ .subscribeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .subscribeId(0)
+ .subscribe()
+ .message()
+ .flowId(1)
+ .flowDestination(subscriberName)
+ .flowUnit(MessageCreditUnit.MESSAGE)
+ .flowValue(1)
+ .flow()
+ .message()
+ .flowId(2)
+ .flowDestination(subscriberName)
+ .flowUnit(MessageCreditUnit.BYTE)
+ .flowValue(-1)
+ .flow();
+
+ MessageTransfer transfer = interaction.consume(MessageTransfer.class,
+ SessionCompleted.class,
+ SessionCommandPoint.class,
+ SessionConfirmed.class);
+
+ try (QpidByteBuffer buffer = transfer.getBody())
+ {
+ final byte[] dst = new byte[buffer.remaining()];
+ buffer.get(dst);
+ assertThat(new String(dst, UTF_8), is(equalTo(body2)));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java
new file mode 100644
index 0000000..af809e1
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java
@@ -0,0 +1,161 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8.extension.queue;
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicDeliverBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicQosOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelFlowOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
+import org.apache.qpid.server.protocol.v0_8.transport.QueueDeclareOkBody;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v0_8.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_8.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+public class QueueTest extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ }
+
+ @Test
+ @SpecificationTest(section = "1.7.2.1", description = "declare queue, create if needed")
+ public void queueDeclareUsingRealQueueAttributesInWireArguments() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ QueueDeclareOkBody response = interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .queue().declareName(BrokerAdmin.TEST_QUEUE_NAME)
+ .declareArguments(Collections.singletonMap("defaultFilters",
+ "{\"selector\":{\"x-filter-jms-selector\":[\"id=2\"]}}"))
+ .declare()
+ .consumeResponse().getLatestResponse(QueueDeclareOkBody.class);
+
+ assertThat(response.getQueue(), is(equalTo(AMQShortString.valueOf(BrokerAdmin.TEST_QUEUE_NAME))));
+ assertThat(response.getMessageCount(), is(equalTo(0L)));
+ assertThat(response.getConsumerCount(), is(equalTo(0L)));
+
+ // make sure that wire arguments took effect
+ // by publishing messages and consuming message matching filter
+
+ String consumerTag = "test";
+ interaction.basic().qosPrefetchCount(2)
+ .qos()
+ .consumeResponse(BasicQosOkBody.class)
+ .basic().consumeConsumerTag(consumerTag)
+ .consumeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .consume()
+ .consumeResponse(BasicConsumeOkBody.class)
+ .channel().flow(true)
+ .consumeResponse(ChannelFlowOkBody.class);
+
+ String content2 = "Test Content 2";
+ Map<String, Object> messageHeaders2 = Collections.singletonMap("id", 2);
+ String contentType = "text/plain";
+
+ // first message is not matching queue default filter
+ interaction.basic().publishExchange("")
+ .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+ .publishMandatory(true)
+ .contentHeaderPropertiesContentType(contentType)
+ .contentHeaderPropertiesHeaders(Collections.singletonMap("id", 1))
+ .content("Test1")
+ .publishMessage()
+
+ // second message is matching queue default filter
+ .basic().publishExchange("")
+ .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+ .publishMandatory(true)
+ .contentHeaderPropertiesContentType(contentType)
+ .contentHeaderPropertiesHeaders(messageHeaders2)
+ .content(content2)
+ .publishMessage();
+
+ // second message should be received
+ BasicDeliverBody delivery =
+ interaction.consumeResponse(BasicDeliverBody.class).getLatestResponse(BasicDeliverBody.class);
+
+ assertThat(delivery.getConsumerTag(), is(equalTo(AMQShortString.valueOf(consumerTag))));
+ assertThat(delivery.getConsumerTag(), is(notNullValue()));
+ assertThat(delivery.getRedelivered(), is(equalTo(false)));
+ assertThat(delivery.getExchange(), is(nullValue()));
+ assertThat(delivery.getRoutingKey(), is(equalTo(AMQShortString.valueOf(BrokerAdmin.TEST_QUEUE_NAME))));
+
+ ContentHeaderBody header =
+ interaction.consumeResponse(ContentHeaderBody.class).getLatestResponse(ContentHeaderBody.class);
+
+ assertThat(header.getBodySize(), is(equalTo((long) content2.length())));
+ BasicContentHeaderProperties properties = header.getProperties();
+ Map<String, Object> receivedHeaders = new HashMap<>(FieldTable.convertToMap(properties.getHeaders()));
+ assertThat(receivedHeaders, is(equalTo(new HashMap<>(messageHeaders2))));
+ assertThat(properties.getContentTypeAsString(), is(equalTo(contentType)));
+
+ ContentBody content = interaction.consumeResponse(ContentBody.class).getLatestResponse(ContentBody.class);
+
+ QpidByteBuffer payload = content.getPayload();
+ byte[] contentData = new byte[payload.remaining()];
+ payload.get(contentData);
+ payload.dispose();
+ String receivedContent = new String(contentData, StandardCharsets.UTF_8);
+
+ assertThat(receivedContent, is(equalTo(receivedContent)));
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(2)));
+
+ interaction.basic().ackDeliveryTag(delivery.getDeliveryTag())
+ .ack()
+ .channel().close().consumeResponse(ChannelCloseOkBody.class);
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org