You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/09 18:40:37 UTC
svn commit: r1566328 [2/3] - in
/qpid/branches/java-broker-amqp-1-0-management/java: ./
broker-core/src/main/java/org/apache/qpid/server/message/internal/
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid...
Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java?rev=1566328&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java Sun Feb 9 17:40:35 2014
@@ -0,0 +1,271 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.message.internal.InternalMessageMetaData;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.ReplyTo;
+import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.typedmessage.TypedBytesContentReader;
+import org.apache.qpid.typedmessage.TypedBytesFormatException;
+
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+
+public class MessageConverter_v0_10_to_Internal implements MessageConverter<MessageTransferMessage, InternalMessage>
+{
+ @Override
+ public Class<MessageTransferMessage> getInputClass()
+ {
+ return MessageTransferMessage.class;
+ }
+
+ @Override
+ public Class<InternalMessage> getOutputClass()
+ {
+ return InternalMessage.class;
+ }
+
+ @Override
+ public InternalMessage convert(MessageTransferMessage serverMessage, VirtualHost vhost)
+ {
+ final String mimeType = serverMessage.getMessageHeader().getMimeType();
+ byte[] data = new byte[(int) serverMessage.getSize()];
+ serverMessage.getContent(ByteBuffer.wrap(data), 0);
+
+ Object body = convertMessageBody(mimeType, data);
+ MessageProperties messageProps = serverMessage.getHeader().getMessageProperties();
+ AMQMessageHeader fixedHeader = new DelegatingMessageHeader(serverMessage.getMessageHeader(), messageProps == null ? null : messageProps.getReplyTo());
+ return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), fixedHeader, body);
+ }
+
+ private static class DelegatingMessageHeader implements AMQMessageHeader
+ {
+ private final AMQMessageHeader _delegate;
+ private final ReplyTo _replyTo;
+
+
+ private DelegatingMessageHeader(final AMQMessageHeader delegate, final ReplyTo replyTo)
+ {
+ _delegate = delegate;
+ _replyTo = replyTo;
+ }
+
+ @Override
+ public String getCorrelationId()
+ {
+ return _delegate.getCorrelationId();
+ }
+
+ @Override
+ public long getExpiration()
+ {
+ return _delegate.getExpiration();
+ }
+
+ @Override
+ public String getUserId()
+ {
+ return _delegate.getUserId();
+ }
+
+ @Override
+ public String getAppId()
+ {
+ return _delegate.getAppId();
+ }
+
+ @Override
+ public String getMessageId()
+ {
+ return _delegate.getMessageId();
+ }
+
+ @Override
+ public String getMimeType()
+ {
+ return _delegate.getMimeType();
+ }
+
+ @Override
+ public String getEncoding()
+ {
+ return _delegate.getEncoding();
+ }
+
+ @Override
+ public byte getPriority()
+ {
+ return _delegate.getPriority();
+ }
+
+ @Override
+ public long getTimestamp()
+ {
+ return _delegate.getTimestamp();
+ }
+
+ @Override
+ public String getType()
+ {
+ return _delegate.getType();
+ }
+
+ @Override
+ public String getReplyTo()
+ {
+ return _replyTo == null
+ ? null
+ : _replyTo.getExchange() == null || _replyTo.getExchange().equals("")
+ ? _replyTo.getRoutingKey()
+ : _replyTo.getRoutingKey() == null || _replyTo.getRoutingKey().equals("")
+ ? _replyTo.getExchange()
+ : _replyTo.getExchange() + "/" + _replyTo.getRoutingKey();
+ }
+
+ @Override
+ public Object getHeader(final String name)
+ {
+ return _delegate.getHeader(name);
+ }
+
+ @Override
+ public boolean containsHeaders(final Set<String> names)
+ {
+ return _delegate.containsHeaders(names);
+ }
+
+ @Override
+ public boolean containsHeader(final String name)
+ {
+ return _delegate.containsHeader(name);
+ }
+
+ @Override
+ public Collection<String> getHeaderNames()
+ {
+ return _delegate.getHeaderNames();
+ }
+ }
+
+ private static Object convertMessageBody(String mimeType, byte[] data)
+ {
+ if("text/plain".equals(mimeType) || "text/xml".equals(mimeType))
+ {
+ String text = new String(data);
+ return text;
+ }
+ else if("jms/map-message".equals(mimeType))
+ {
+ TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
+
+ LinkedHashMap map = new LinkedHashMap();
+ final int entries = reader.readIntImpl();
+ for (int i = 0; i < entries; i++)
+ {
+ try
+ {
+ String propName = reader.readStringImpl();
+ Object value = reader.readObject();
+
+ map.put(propName, value);
+ }
+ catch (EOFException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+
+ }
+
+ return map;
+
+ }
+ else if("amqp/map".equals(mimeType))
+ {
+ BBDecoder decoder = new BBDecoder();
+ decoder.init(ByteBuffer.wrap(data));
+ final Map<String,Object> map = decoder.readMap();
+
+ return map;
+
+ }
+ else if("amqp/list".equals(mimeType))
+ {
+ BBDecoder decoder = new BBDecoder();
+ decoder.init(ByteBuffer.wrap(data));
+ return decoder.readList();
+ }
+ else if("jms/stream-message".equals(mimeType))
+ {
+ TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
+
+ List list = new ArrayList();
+ while (reader.remaining() != 0)
+ {
+ try
+ {
+ list.add(reader.readObject());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new RuntimeException(e); // TODO - Implement
+ }
+ catch (EOFException e)
+ {
+ throw new RuntimeException(e); // TODO - Implement
+ }
+ }
+ return list;
+ }
+ else
+ {
+ return data;
+
+ }
+ }
+
+ @Override
+ public String getType()
+ {
+ return "v0-10 to Internal";
+ }
+}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter?rev=1566328&r1=1566327&r2=1566328&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter Sun Feb 9 17:40:35 2014
@@ -17,3 +17,5 @@
# under the License.
#
org.apache.qpid.server.protocol.v0_10.MessageConverter_v0_10
+org.apache.qpid.server.protocol.v0_10.MessageConverter_Internal_to_v0_10
+org.apache.qpid.server.protocol.v0_10.MessageConverter_v0_10_to_Internal
Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java?rev=1566328&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java Sun Feb 9 17:40:35 2014
@@ -0,0 +1,268 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.codec.BBEncoder;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MessageConverter_Internal_to_v0_8 implements MessageConverter<InternalMessage, AMQMessage>
+{
+ private static final int BASIC_CLASS_ID = 60;
+ private static final Charset UTF_8 = Charset.forName("UTF-8");
+
+
+ public Class<InternalMessage> getInputClass()
+ {
+ return InternalMessage.class;
+ }
+
+ @Override
+ public Class<AMQMessage> getOutputClass()
+ {
+ return AMQMessage.class;
+ }
+
+ @Override
+ public AMQMessage convert(InternalMessage serverMsg, VirtualHost vhost)
+ {
+ return new AMQMessage(convertToStoredMessage(serverMsg), null);
+ }
+
+ private StoredMessage<MessageMetaData> convertToStoredMessage(final InternalMessage serverMsg)
+ {
+ final byte[] messageContent = convertToBody(serverMsg.getMessageBody());
+ final MessageMetaData messageMetaData_0_8 = convertMetaData(serverMsg,
+ getBodyMimeType(serverMsg.getMessageBody()),
+ messageContent.length);
+
+ return new StoredMessage<MessageMetaData>()
+ {
+ @Override
+ public MessageMetaData getMetaData()
+ {
+ return messageMetaData_0_8;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return serverMsg.getMessageNumber();
+ }
+
+ @Override
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ int size = messageContent.length - offsetInMessage;
+ if(dst.remaining() < size)
+ {
+ size = dst.remaining();
+ }
+ ByteBuffer buf = ByteBuffer.wrap(messageContent, offsetInMessage, size);
+ dst.put(buf);
+ return size;
+ }
+
+ @Override
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ return ByteBuffer.wrap(messageContent, offsetInMessage, size);
+ }
+
+ @Override
+ public StoreFuture flushToStore()
+ {
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ private MessageMetaData convertMetaData(InternalMessage serverMsg, final String bodyMimeType, final int size)
+ {
+
+ MessagePublishInfo publishInfo = new MessagePublishInfo()
+ {
+ @Override
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ @Override
+ public void setExchange(final AMQShortString amqShortString)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ @Override
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
+
+ final BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ props.setAppId(serverMsg.getMessageHeader().getAppId());
+ props.setContentType(bodyMimeType);
+ props.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId());
+ props.setDeliveryMode(serverMsg.isPersistent() ? BasicContentHeaderProperties.PERSISTENT : BasicContentHeaderProperties.NON_PERSISTENT);
+ props.setExpiration(serverMsg.getExpiration());
+ props.setMessageId(serverMsg.getMessageHeader().getMessageId());
+ props.setPriority(serverMsg.getMessageHeader().getPriority());
+ props.setReplyTo(serverMsg.getMessageHeader().getReplyTo());
+ props.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
+ props.setUserId(serverMsg.getMessageHeader().getUserId());
+
+ Map<String,Object> headerProps = new LinkedHashMap<String, Object>();
+
+ for(String headerName : serverMsg.getMessageHeader().getHeaderNames())
+ {
+ headerProps.put(headerName, serverMsg.getMessageHeader().getHeader(headerName));
+ }
+
+ props.setHeaders(FieldTable.convertToFieldTable(headerProps));
+
+ final ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID);
+ return new MessageMetaData(publishInfo, chb, serverMsg.getArrivalTime());
+ }
+
+
+ @Override
+ public String getType()
+ {
+ return "Internal to v0-8";
+ }
+
+
+ public static byte[] convertToBody(Object object)
+ {
+ if(object instanceof String)
+ {
+ return ((String)object).getBytes(UTF_8);
+ }
+ else if(object instanceof byte[])
+ {
+ return (byte[]) object;
+ }
+ else if(object instanceof Map)
+ {
+ BBEncoder encoder = new BBEncoder(1024);
+ encoder.writeMap((Map)object);
+ ByteBuffer buf = encoder.segment();
+ int remaining = buf.remaining();
+ byte[] data = new byte[remaining];
+ buf.get(data);
+ return data;
+
+ }
+ else if(object instanceof List)
+ {
+ BBEncoder encoder = new BBEncoder(1024);
+ encoder.writeList((List) object);
+ ByteBuffer buf = encoder.segment();
+ int remaining = buf.remaining();
+ byte[] data = new byte[remaining];
+ buf.get(data);
+ return data;
+ }
+ else
+ {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ try
+ {
+ ObjectOutputStream os = new ObjectOutputStream(bytesOut);
+ os.writeObject(object);
+ return bytesOut.toByteArray();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static String getBodyMimeType(Object object)
+ {
+ if(object instanceof String)
+ {
+ return "text/plain";
+ }
+ else if(object instanceof byte[])
+ {
+ return "application/octet-stream";
+ }
+ else if(object instanceof Map)
+ {
+ return "amqp/map";
+ }
+ else if(object instanceof List)
+ {
+ return "amqp/list";
+ }
+ else
+ {
+ return "application/java-object-stream";
+ }
+ }
+
+}
Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java?rev=1566328&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java Sun Feb 9 17:40:35 2014
@@ -0,0 +1,148 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.typedmessage.TypedBytesContentReader;
+import org.apache.qpid.typedmessage.TypedBytesFormatException;
+
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMessage, InternalMessage>
+{
+ @Override
+ public Class<AMQMessage> getInputClass()
+ {
+ return AMQMessage.class;
+ }
+
+ @Override
+ public Class<InternalMessage> getOutputClass()
+ {
+ return InternalMessage.class;
+ }
+
+ @Override
+ public InternalMessage convert(AMQMessage serverMessage, VirtualHost vhost)
+ {
+ final String mimeType = serverMessage.getMessageHeader().getMimeType();
+ byte[] data = new byte[(int) serverMessage.getSize()];
+ serverMessage.getContent(ByteBuffer.wrap(data), 0);
+
+ Object body = convertMessageBody(mimeType, data);
+
+ return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), serverMessage.getMessageHeader(), body);
+ }
+
+ private static Object convertMessageBody(String mimeType, byte[] data)
+ {
+ if("text/plain".equals(mimeType) || "text/xml".equals(mimeType))
+ {
+ String text = new String(data);
+ return text;
+ }
+ else if("jms/map-message".equals(mimeType))
+ {
+ TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
+
+ LinkedHashMap map = new LinkedHashMap();
+ final int entries = reader.readIntImpl();
+ for (int i = 0; i < entries; i++)
+ {
+ try
+ {
+ String propName = reader.readStringImpl();
+ Object value = reader.readObject();
+
+ map.put(propName, value);
+ }
+ catch (EOFException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+
+ }
+
+ return map;
+
+ }
+ else if("amqp/map".equals(mimeType))
+ {
+ BBDecoder decoder = new BBDecoder();
+ decoder.init(ByteBuffer.wrap(data));
+ final Map<String,Object> map = decoder.readMap();
+
+ return map;
+
+ }
+ else if("amqp/list".equals(mimeType))
+ {
+ BBDecoder decoder = new BBDecoder();
+ decoder.init(ByteBuffer.wrap(data));
+ return decoder.readList();
+ }
+ else if("jms/stream-message".equals(mimeType))
+ {
+ TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
+
+ List list = new ArrayList();
+ while (reader.remaining() != 0)
+ {
+ try
+ {
+ list.add(reader.readObject());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new RuntimeException(e); // TODO - Implement
+ }
+ catch (EOFException e)
+ {
+ throw new RuntimeException(e); // TODO - Implement
+ }
+ }
+ return list;
+ }
+ else
+ {
+ return data;
+
+ }
+ }
+
+ @Override
+ public String getType()
+ {
+ return "v0-8 to Internal";
+ }
+}
Copied: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter (from r1564820, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter?p2=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter&p1=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter&r1=1564820&r2=1566328&rev=1566328&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter Sun Feb 9 17:40:35 2014
@@ -16,4 +16,5 @@
# specific language governing permissions and limitations
# under the License.
#
-org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_0_10_to_1_0
+org.apache.qpid.server.protocol.v0_8.MessageConverter_Internal_to_v0_8
+org.apache.qpid.server.protocol.v0_8.MessageConverter_v0_8_to_Internal
\ No newline at end of file
Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java?rev=1566328&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java Sun Feb 9 17:40:35 2014
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.UnsignedByte;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+import org.apache.qpid.server.message.internal.InternalMessage;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+public class MessageConverter_Internal_to_v1_0 extends MessageConverter_to_1_0<InternalMessage>
+{
+ private static final Charset UTF_8 = Charset.forName("UTF-8");
+
+
+ public Class<InternalMessage> getInputClass()
+ {
+ return InternalMessage.class;
+ }
+
+
+ @Override
+ protected MessageMetaData_1_0 convertMetaData(final InternalMessage serverMessage,
+ final SectionEncoder sectionEncoder)
+ {
+ List<Section> sections = new ArrayList<Section>(3);
+ Header header = new Header();
+
+ header.setDurable(serverMessage.isPersistent());
+ header.setPriority(UnsignedByte.valueOf(serverMessage.getMessageHeader().getPriority()));
+ if(serverMessage.getExpiration() != 0l && serverMessage.getArrivalTime() !=0l && serverMessage.getExpiration() >= serverMessage.getArrivalTime())
+ {
+ header.setTtl(UnsignedInteger.valueOf(serverMessage.getExpiration()-serverMessage.getArrivalTime()));
+ }
+
+ sections.add(header);
+
+ Properties properties = new Properties();
+ properties.setCorrelationId(serverMessage.getMessageHeader().getCorrelationId());
+ properties.setCreationTime(new Date(serverMessage.getMessageHeader().getTimestamp()));
+ properties.setMessageId(serverMessage.getMessageHeader().getMessageId());
+ final String userId = serverMessage.getMessageHeader().getUserId();
+ if(userId != null)
+ {
+ properties.setUserId(new Binary(userId.getBytes(UTF_8)));
+ }
+ properties.setReplyTo(serverMessage.getMessageHeader().getReplyTo());
+
+ sections.add(properties);
+
+ if(!serverMessage.getMessageHeader().getHeaderNames().isEmpty())
+ {
+ ApplicationProperties applicationProperties = new ApplicationProperties(serverMessage.getMessageHeader().getHeaderMap() );
+ sections.add(applicationProperties);
+ }
+ return new MessageMetaData_1_0(sections, sectionEncoder);
+
+ }
+
+ protected Section getBodySection(final InternalMessage serverMessage, final String mimeType)
+ {
+ return convertToBody(serverMessage.getMessageBody());
+ }
+
+
+ @Override
+ public String getType()
+ {
+ return "Internal to v1-0";
+ }
+
+
+ public Section convertToBody(Object object)
+ {
+ if(object instanceof String)
+ {
+ return new AmqpValue(object);
+ }
+ else if(object instanceof byte[])
+ {
+ return new Data(new Binary((byte[])object));
+ }
+ else if(object instanceof Map)
+ {
+ return new AmqpValue(MessageConverter_to_1_0.fixMapValues((Map)object));
+ }
+ else if(object instanceof List)
+ {
+ return new AmqpValue(MessageConverter_to_1_0.fixListValues((List)object));
+ }
+ else
+ {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ try
+ {
+ ObjectOutputStream os = new ObjectOutputStream(bytesOut);
+ os.writeObject(object);
+ return new Data(new Binary(bytesOut.toByteArray()));
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1566328&r1=1566327&r2=1566328&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Sun Feb 9 17:40:35 2014
@@ -156,7 +156,7 @@ public abstract class MessageConverter_t
}
}
- private static Map fixMapValues(final Map<String, Object> map)
+ static Map fixMapValues(final Map<String, Object> map)
{
for(Map.Entry<String,Object> entry : map.entrySet())
{
@@ -165,7 +165,7 @@ public abstract class MessageConverter_t
return map;
}
- private static Object fixValue(final Object value)
+ static Object fixValue(final Object value)
{
if(value instanceof byte[])
{
@@ -185,7 +185,7 @@ public abstract class MessageConverter_t
}
}
- private static List fixListValues(final List list)
+ static List fixListValues(final List list)
{
ListIterator iterator = list.listIterator();
while(iterator.hasNext())
@@ -198,83 +198,88 @@ public abstract class MessageConverter_t
}
private StoredMessage<MessageMetaData_1_0> convertServerMessage(final MessageMetaData_1_0 metaData,
- final ServerMessage serverMessage,
+ final M serverMessage,
SectionEncoder sectionEncoder)
{
- final String mimeType = serverMessage.getMessageHeader().getMimeType();
- byte[] data = new byte[(int) serverMessage.getSize()];
- serverMessage.getContent(ByteBuffer.wrap(data), 0);
-
- Section bodySection = convertMessageBody(mimeType, data);
-
- final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection, sectionEncoder);
-
- return new StoredMessage<MessageMetaData_1_0>()
- {
- @Override
- public MessageMetaData_1_0 getMetaData()
- {
- return metaData;
- }
-
- @Override
- public long getMessageNumber()
- {
- return serverMessage.getMessageNumber();
- }
-
- @Override
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getContent(int offsetInMessage, ByteBuffer dst)
- {
- ByteBuffer buf = allData.duplicate();
- buf.position(offsetInMessage);
- buf = buf.slice();
- int size;
- if(dst.remaining()<buf.remaining())
- {
- buf.limit(dst.remaining());
- size = dst.remaining();
- }
- else
- {
- size = buf.remaining();
- }
- dst.put(buf);
- return size;
- }
-
- @Override
- public ByteBuffer getContent(int offsetInMessage, int size)
- {
- ByteBuffer buf = allData.duplicate();
- buf.position(offsetInMessage);
- buf = buf.slice();
- if(size < buf.remaining())
+ final String mimeType = serverMessage.getMessageHeader().getMimeType();
+ Section bodySection = getBodySection(serverMessage, mimeType);
+
+ final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection, sectionEncoder);
+
+ return new StoredMessage<MessageMetaData_1_0>()
{
- buf.limit(size);
- }
- return buf;
- }
-
- @Override
- public StoreFuture flushToStore()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- };
- }
+ @Override
+ public MessageMetaData_1_0 getMetaData()
+ {
+ return metaData;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return serverMessage.getMessageNumber();
+ }
+
+ @Override
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ ByteBuffer buf = allData.duplicate();
+ buf.position(offsetInMessage);
+ buf = buf.slice();
+ int size;
+ if(dst.remaining()<buf.remaining())
+ {
+ buf.limit(dst.remaining());
+ size = dst.remaining();
+ }
+ else
+ {
+ size = buf.remaining();
+ }
+ dst.put(buf);
+ return size;
+ }
+
+ @Override
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ ByteBuffer buf = allData.duplicate();
+ buf.position(offsetInMessage);
+ buf = buf.slice();
+ if(size < buf.remaining())
+ {
+ buf.limit(size);
+ }
+ return buf;
+ }
+
+ @Override
+ public StoreFuture flushToStore()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ protected Section getBodySection(final M serverMessage, final String mimeType)
+ {
+ byte[] data = new byte[(int) serverMessage.getSize()];
+ serverMessage.getContent(ByteBuffer.wrap(data), 0);
+
+ return convertMessageBody(mimeType, data);
+ }
private ByteBuffer encodeConvertedMessage(MessageMetaData_1_0 metaData, Section bodySection, SectionEncoder sectionEncoder)
{
Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java?rev=1566328&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java Sun Feb 9 17:40:35 2014
@@ -0,0 +1,281 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.typedmessage.TypedBytesContentReader;
+import org.apache.qpid.typedmessage.TypedBytesFormatException;
+
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+
+public class MessageConverter_v1_0_to_Internal implements MessageConverter<Message_1_0, InternalMessage>
+{
+
+ static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance();
+ static
+ {
+ TYPE_REGISTRY.registerTransportLayer();
+ TYPE_REGISTRY.registerMessagingLayer();
+ TYPE_REGISTRY.registerTransactionLayer();
+ TYPE_REGISTRY.registerSecurityLayer();
+ }
+
+ @Override
+ public Class<Message_1_0> getInputClass()
+ {
+ return Message_1_0.class;
+ }
+
+ @Override
+ public Class<InternalMessage> getOutputClass()
+ {
+ return InternalMessage.class;
+ }
+
+ @Override
+ public InternalMessage convert(Message_1_0 serverMessage, VirtualHost vhost)
+ {
+ final String mimeType = serverMessage.getMessageHeader().getMimeType();
+
+
+
+
+ byte[] data = new byte[(int) serverMessage.getSize()];
+ serverMessage.getStoredMessage().getContent(0,ByteBuffer.wrap(data));
+
+ SectionDecoderImpl sectionDecoder = new SectionDecoderImpl(TYPE_REGISTRY);
+
+ try
+ {
+ List<Section> sections = sectionDecoder.parseAll(ByteBuffer.wrap(data));
+ ListIterator<Section> iterator = sections.listIterator();
+ Section previousSection = null;
+ while(iterator.hasNext())
+ {
+ Section section = iterator.next();
+ if(!(section instanceof AmqpValue || section instanceof Data || section instanceof AmqpSequence))
+ {
+ iterator.remove();
+ }
+ else
+ {
+ if(previousSection != null && (previousSection.getClass() != section.getClass() || section instanceof AmqpValue))
+ {
+ throw new RuntimeException("Message is badly formed and has multiple body section which are not all Data or not all AmqpSequence");
+ }
+ else
+ {
+ previousSection = section;
+ }
+ }
+ }
+
+ Object bodyObject;
+
+ if(sections.isEmpty())
+ {
+ // should actually be illegal
+ bodyObject = new byte[0];
+ }
+ else
+ {
+ Section firstBodySection = sections.get(0);
+ if(firstBodySection instanceof AmqpValue)
+ {
+ bodyObject = fixObject(((AmqpValue)firstBodySection).getValue());
+ }
+ else if(firstBodySection instanceof Data)
+ {
+ int totalSize = 0;
+ for(Section section : sections)
+ {
+ totalSize += ((Data)section).getValue().getLength();
+ }
+ byte[] bodyData = new byte[totalSize];
+ ByteBuffer buf = ByteBuffer.wrap(bodyData);
+ for(Section section : sections)
+ {
+ buf.put(((Data)section).getValue().asByteBuffer());
+ }
+ bodyObject = bodyData;
+ }
+ else
+ {
+ ArrayList totalSequence = new ArrayList();
+ for(Section section : sections)
+ {
+ totalSequence.addAll(((AmqpSequence)section).getValue());
+ }
+ bodyObject = fixObject(totalSequence);
+ }
+ }
+ return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), serverMessage.getMessageHeader(), bodyObject);
+
+ }
+ catch (AmqpErrorException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+
+
+
+ }
+
+ private Object fixObject(final Object value)
+ {
+ if(value instanceof Binary)
+ {
+ final Binary binaryValue = (Binary) value;
+ byte[] data = new byte[binaryValue.getLength()];
+ binaryValue.asByteBuffer().get(data);
+ return data;
+ }
+ else if(value instanceof List)
+ {
+ List listValue = (List) value;
+ List fixedValue = new ArrayList(listValue.size());
+ for(Object o : listValue)
+ {
+ fixedValue.add(fixObject(o));
+ }
+ return fixedValue;
+ }
+ else if(value instanceof Map)
+ {
+ Map<?,?> mapValue = (Map) value;
+ Map fixedValue = new LinkedHashMap(mapValue.size());
+ for(Map.Entry<?,?> entry : mapValue.entrySet())
+ {
+ fixedValue.put(fixObject(entry.getKey()),fixObject(entry.getValue()));
+ }
+ return fixedValue;
+ }
+ else
+ {
+ return value;
+ }
+
+ }
+
+ private static Object convertMessageBody(String mimeType, byte[] data)
+ {
+ if("text/plain".equals(mimeType) || "text/xml".equals(mimeType))
+ {
+ String text = new String(data);
+ return text;
+ }
+ else if("jms/map-message".equals(mimeType))
+ {
+ TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
+
+ LinkedHashMap map = new LinkedHashMap();
+ final int entries = reader.readIntImpl();
+ for (int i = 0; i < entries; i++)
+ {
+ try
+ {
+ String propName = reader.readStringImpl();
+ Object value = reader.readObject();
+
+ map.put(propName, value);
+ }
+ catch (EOFException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+
+ }
+
+ return map;
+
+ }
+ else if("amqp/map".equals(mimeType))
+ {
+ BBDecoder decoder = new BBDecoder();
+ decoder.init(ByteBuffer.wrap(data));
+ final Map<String,Object> map = decoder.readMap();
+
+ return map;
+
+ }
+ else if("amqp/list".equals(mimeType))
+ {
+ BBDecoder decoder = new BBDecoder();
+ decoder.init(ByteBuffer.wrap(data));
+ return decoder.readList();
+ }
+ else if("jms/stream-message".equals(mimeType))
+ {
+ TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
+
+ List list = new ArrayList();
+ while (reader.remaining() != 0)
+ {
+ try
+ {
+ list.add(reader.readObject());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new RuntimeException(e); // TODO - Implement
+ }
+ catch (EOFException e)
+ {
+ throw new RuntimeException(e); // TODO - Implement
+ }
+ }
+ return list;
+ }
+ else
+ {
+ return data;
+
+ }
+ }
+
+ @Override
+ public String getType()
+ {
+ return "v0-8 to Internal";
+ }
+}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1566328&r1=1566327&r2=1566328&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Sun Feb 9 17:40:35 2014
@@ -391,10 +391,6 @@ public class SendingLink_1_0 implements
options.add(Consumer.Option.NO_LOCAL);
}
-
- _consumer.setNoLocal(noLocal);
-
-
try
{
_consumer = _queue.addConsumer(_target,
Copied: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter (from r1564820, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter?p2=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter&p1=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter&r1=1564820&r2=1566328&rev=1566328&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter Sun Feb 9 17:40:35 2014
@@ -16,4 +16,5 @@
# specific language governing permissions and limitations
# under the License.
#
-org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_0_10_to_1_0
+org.apache.qpid.server.protocol.v1_0.MessageConverter_Internal_to_v1_0
+org.apache.qpid.server.protocol.v1_0.MessageConverter_v1_0_to_Internal
\ No newline at end of file
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1566328&r1=1566327&r2=1566328&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java Sun Feb 9 17:40:35 2014
@@ -61,7 +61,7 @@ public class MessageConverter_0_10_to_0_
{
if(deliveryProps.hasDeliveryMode())
{
- props.setDeliveryMode((byte) (deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT
+ props.setDeliveryMode((deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT
? BasicContentHeaderProperties.PERSISTENT
: BasicContentHeaderProperties.NON_PERSISTENT));
}
Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/build.xml?rev=1566328&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/build.xml (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/build.xml Sun Feb 9 17:40:35 2014
@@ -0,0 +1,32 @@
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -->
+<project name="Qpid Broker-Plugins AMQP Management" default="build">
+ <property name="module.depends" value="common broker-core" />
+ <property name="module.test.depends" value="qpid-test-utils broker-core/tests" />
+
+ <property name="module.genpom" value="true"/>
+ <property name="module.genpom.args" value="-Sqpid-common=provided -Sqpid-broker-core=provided"/>
+
+ <property name="broker.plugin" value="true"/>
+ <property name="broker-plugins-management-amqp.libs" value="" />
+
+ <import file="../../module.xml" />
+
+ <target name="bundle" depends="bundle-tasks"/>
+</project>
Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/pom.xml?rev=1566328&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/pom.xml (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/pom.xml Sun Feb 9 17:40:35 2014
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>qpid-parent</artifactId>
+ <groupId>org.apache.qpid</groupId>
+ <version>1.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>qpid-broker-plugins-management-amqp</artifactId>
+ <version>0.28-SNAPSHOT</version>
+ <name>AMQP Management Protocol Plug-in</name>
+ <description>AMQP Management broker plug-in</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ </build>
+
+</project>
Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagedEntityType.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagedEntityType.java?rev=1566328&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagedEntityType.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagedEntityType.java Sun Feb 9 17:40:35 2014
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.management.amqp;
+
+import java.util.Arrays;
+
+class ManagedEntityType
+{
+ private final String _name;
+ private final ManagedEntityType[] _parents;
+ private final String[] _attributes;
+ private final String[] _operations;
+
+ ManagedEntityType(final String name,
+ final ManagedEntityType[] parents,
+ final String[] attributes,
+ final String[] operations)
+ {
+ _name = name;
+ _parents = parents;
+ _attributes = attributes;
+ _operations = operations;
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public ManagedEntityType[] getParents()
+ {
+ return _parents;
+ }
+
+ public String[] getAttributes()
+ {
+ return _attributes;
+ }
+
+ public String[] getOperations()
+ {
+ return _operations;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ManagedEntityType{" +
+ "name='" + _name + '\'' +
+ ", parents=" + Arrays.toString(_parents) +
+ ", attributes=" + Arrays.toString(_attributes) +
+ ", operations=" + Arrays.toString(_operations) +
+ '}';
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org