You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/04/27 19:33:09 UTC
svn commit: r1097189 [40/42] - in /activemq/activemq-apollo/trunk: ./
apollo-openwire/ apollo-openwire/src/ apollo-openwire/src/main/
apollo-openwire/src/main/resources/
apollo-openwire/src/main/resources/META-INF/
apollo-openwire/src/main/resources/ME...
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionId.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionId.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionId.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionId.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+/**
+ *
+ * @openwire:marshaller code="121"
+ */
+public class SessionId implements DataStructure {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SESSION_ID;
+
+ protected String connectionId;
+ protected long value;
+
+ protected transient int hashCode;
+ protected transient String key;
+ protected transient ConnectionId parentId;
+
+ public SessionId() {
+ }
+
+ public SessionId(ConnectionId connectionId, long sessionId) {
+ this.connectionId = connectionId.getValue();
+ this.value = sessionId;
+ }
+
+ public SessionId(SessionId id) {
+ this.connectionId = id.getConnectionId();
+ this.value = id.getValue();
+ }
+
+ public SessionId(ProducerId id) {
+ this.connectionId = id.getConnectionId();
+ this.value = id.getSessionId();
+ }
+
+ public SessionId(ConsumerId id) {
+ this.connectionId = id.getConnectionId();
+ this.value = id.getSessionId();
+ }
+
+ public ConnectionId getParentId() {
+ if (parentId == null) {
+ parentId = new ConnectionId(this);
+ }
+ return parentId;
+ }
+
+ public int hashCode() {
+ if (hashCode == 0) {
+ hashCode = connectionId.hashCode() ^ (int)value;
+ }
+ return hashCode;
+ }
+
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || o.getClass() != SessionId.class) {
+ return false;
+ }
+ SessionId id = (SessionId)o;
+ return value == id.value && connectionId.equals(id.connectionId);
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public String getConnectionId() {
+ return connectionId;
+ }
+
+ public void setConnectionId(String connectionId) {
+ this.connectionId = connectionId;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public long getValue() {
+ return value;
+ }
+
+ public void setValue(long sessionId) {
+ this.value = sessionId;
+ }
+
+ public String toString() {
+ if (key == null) {
+ key = connectionId + ":" + value;
+ }
+ return key;
+ }
+
+ public boolean isMarshallAware() {
+ return false;
+ }
+}
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionId.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionInfo.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionInfo.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionInfo.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+/**
+ *
+ * @openwire:marshaller code="4"
+ * @version $Revision: 1.13 $
+ */
+public class SessionInfo extends BaseCommand {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SESSION_INFO;
+
+ protected SessionId sessionId;
+
+ public SessionInfo() {
+ sessionId = new SessionId();
+ }
+
+ public SessionInfo(ConnectionInfo connectionInfo, long sessionId) {
+ this.sessionId = new SessionId(connectionInfo.getConnectionId(), sessionId);
+ }
+
+ public SessionInfo(SessionId sessionId) {
+ this.sessionId = sessionId;
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public SessionId getSessionId() {
+ return sessionId;
+ }
+
+ public void setSessionId(SessionId sessionId) {
+ this.sessionId = sessionId;
+ }
+
+ public RemoveInfo createRemoveCommand() {
+ RemoveInfo command = new RemoveInfo(getSessionId());
+ command.setResponseRequired(isResponseRequired());
+ return command;
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ return visitor.processAddSession(this);
+ }
+
+}
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionInfo.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ShutdownInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ShutdownInfo.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ShutdownInfo.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ShutdownInfo.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+/**
+ *
+ * @openwire:marshaller code="11"
+ */
+public class ShutdownInfo extends BaseCommand {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SHUTDOWN_INFO;
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ return visitor.processShutdown(this);
+ }
+
+ public boolean isShutdownInfo() {
+ return true;
+ }
+
+}
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ShutdownInfo.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SubscriptionInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SubscriptionInfo.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SubscriptionInfo.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SubscriptionInfo.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+
+import org.apache.activemq.apollo.util.IntrospectionSupport;
+
+/**
+ * Used to represent a durable subscription.
+ *
+ * @openwire:marshaller code="55"
+ * @version $Revision: 1.6 $
+ */
+public class SubscriptionInfo implements DataStructure {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.DURABLE_SUBSCRIPTION_INFO;
+
+ protected ActiveMQDestination subscribedDestination;
+ protected ActiveMQDestination destination;
+ protected String clientId;
+ protected String subscriptionName;
+ protected String selector;
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ /**
+ * This is the a resolved destination that the subscription is receiving
+ * messages from. This will never be a pattern or a composite destination.
+ *
+ * @openwire:property version=1 cache=true
+ */
+ public ActiveMQDestination getDestination() {
+ return destination;
+ }
+
+ public void setDestination(ActiveMQDestination destination) {
+ this.destination = destination;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public String getSelector() {
+ return selector;
+ }
+
+ public void setSelector(String selector) {
+ this.selector = selector;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public String getSubscriptionName() {
+ return subscriptionName;
+ }
+
+ public void setSubscriptionName(String subscriptionName) {
+ this.subscriptionName = subscriptionName;
+ }
+
+ public boolean isMarshallAware() {
+ return false;
+ }
+
+ public String toString() {
+ return IntrospectionSupport.toString(this);
+ }
+
+ public int hashCode() {
+ int h1 = clientId != null ? clientId.hashCode() : -1;
+ int h2 = subscriptionName != null ? subscriptionName.hashCode() : -1;
+ return h1 ^ h2;
+ }
+
+ public boolean equals(Object obj) {
+ boolean result = false;
+ if (obj instanceof SubscriptionInfo) {
+ SubscriptionInfo other = (SubscriptionInfo)obj;
+ result = (clientId == null && other.clientId == null || clientId != null
+ && other.clientId != null
+ && clientId.equals(other.clientId))
+ && (subscriptionName == null && other.subscriptionName == null || subscriptionName != null
+ && other.subscriptionName != null
+ && subscriptionName
+ .equals(other.subscriptionName));
+ }
+ return result;
+ }
+
+ /**
+ * The destination the client originally subscribed to.. This may not match
+ * the {@see getDestination} method if the subscribed destination uses
+ * patterns or composites.
+ *
+ * If the subscribed destinationis not set, this just ruturns the
+ * desitination.
+ *
+ * @openwire:property version=3
+ */
+ public ActiveMQDestination getSubscribedDestination() {
+ if (subscribedDestination == null) {
+ return getDestination();
+ }
+ return subscribedDestination;
+ }
+
+ public void setSubscribedDestination(ActiveMQDestination subscribedDestination) {
+ this.subscribedDestination = subscribedDestination;
+ }
+
+}
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SubscriptionInfo.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionId.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionId.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionId.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionId.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+/**
+ * @openwire:marshaller
+ * @version $Revision: 1.6 $
+ */
+public abstract class TransactionId implements DataStructure {
+
+ public abstract boolean isXATransaction();
+ public abstract boolean isLocalTransaction();
+ public abstract String getTransactionKey();
+
+ public boolean isMarshallAware() {
+ return false;
+ }
+}
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionId.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionInfo.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionInfo.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionInfo.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import java.io.IOException;
+
+/**
+ *
+ * @openwire:marshaller code="7"
+ * @version $Revision: 1.10 $
+ */
+public class TransactionInfo extends BaseCommand {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.TRANSACTION_INFO;
+
+ public static final byte BEGIN = 0;
+ public static final byte PREPARE = 1;
+ public static final byte COMMIT_ONE_PHASE = 2;
+ public static final byte COMMIT_TWO_PHASE = 3;
+ public static final byte ROLLBACK = 4;
+ public static final byte RECOVER = 5;
+ public static final byte FORGET = 6;
+ public static final byte END = 7;
+
+ protected byte type;
+ protected ConnectionId connectionId;
+ protected TransactionId transactionId;
+
+ public TransactionInfo() {
+ }
+
+ public TransactionInfo(ConnectionId connectionId, TransactionId transactionId, byte type) {
+ this.connectionId = connectionId;
+ this.transactionId = transactionId;
+ this.type = type;
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public ConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public void setConnectionId(ConnectionId connectionId) {
+ this.connectionId = connectionId;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public TransactionId getTransactionId() {
+ return transactionId;
+ }
+
+ public void setTransactionId(TransactionId transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public byte getType() {
+ return type;
+ }
+
+ public void setType(byte type) {
+ this.type = type;
+ }
+
+ public Response visit(org.apache.activemq.apollo.openwire.support.state.CommandVisitor visitor) throws Exception {
+ switch (type) {
+ case TransactionInfo.BEGIN:
+ return visitor.processBeginTransaction(this);
+ case TransactionInfo.END:
+ return visitor.processEndTransaction(this);
+ case TransactionInfo.PREPARE:
+ return visitor.processPrepareTransaction(this);
+ case TransactionInfo.COMMIT_ONE_PHASE:
+ return visitor.processCommitTransactionOnePhase(this);
+ case TransactionInfo.COMMIT_TWO_PHASE:
+ return visitor.processCommitTransactionTwoPhase(this);
+ case TransactionInfo.ROLLBACK:
+ return visitor.processRollbackTransaction(this);
+ case TransactionInfo.RECOVER:
+ return visitor.processRecoverTransactions(this);
+ case TransactionInfo.FORGET:
+ return visitor.processForgetTransaction(this);
+ default:
+ throw new IOException("Transaction info type unknown: " + type);
+ }
+ }
+
+}
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionInfo.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/WireFormatInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/WireFormatInfo.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/WireFormatInfo.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/WireFormatInfo.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.apollo.openwire.codec.OpenWireFormat;
+import org.apache.activemq.apollo.openwire.support.MarshallingSupport;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.ByteArrayInputStream;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+
+/**
+ * @openwire:marshaller code="1"
+ */
+public class WireFormatInfo implements Command, MarshallAware {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.WIREFORMAT_INFO;
+ private static final int MAX_PROPERTY_SIZE = 1024 * 4;
+ private static final byte MAGIC[] = new byte[] {'A', 'c', 't', 'i', 'v', 'e', 'M', 'Q'};
+
+ protected byte magic[] = MAGIC;
+ protected int version;
+ protected Buffer marshalledProperties;
+
+ protected transient Map<String, Object> properties;
+ private transient Endpoint from;
+ private transient Endpoint to;
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public boolean isWireFormatInfo() {
+ return true;
+ }
+
+ public boolean isMarshallAware() {
+ return true;
+ }
+
+ /**
+ * @openwire:property version=1 size=8 testSize=-1
+ */
+ public byte[] getMagic() {
+ return magic;
+ }
+
+ public void setMagic(byte[] magic) {
+ this.magic = magic;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public int getVersion() {
+ return version;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public Buffer getMarshalledProperties() {
+ return marshalledProperties;
+ }
+
+ public void setMarshalledProperties(Buffer marshalledProperties) {
+ this.marshalledProperties = marshalledProperties;
+ }
+
+ /**
+ * The endpoint within the transport where this message came from.
+ */
+ public Endpoint getFrom() {
+ return from;
+ }
+
+ public void setFrom(Endpoint from) {
+ this.from = from;
+ }
+
+ /**
+ * The endpoint within the transport where this message is going to - null
+ * means all endpoints.
+ */
+ public Endpoint getTo() {
+ return to;
+ }
+
+ public void setTo(Endpoint to) {
+ this.to = to;
+ }
+
+ // ////////////////////
+ //
+ // Implementation Methods.
+ //
+ // ////////////////////
+
+ public Object getProperty(String name) throws IOException {
+ if (properties == null) {
+ if (marshalledProperties == null) {
+ return null;
+ }
+ properties = unmarsallProperties(marshalledProperties);
+ }
+ return properties.get(name);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Map<String, Object> getProperties() throws IOException {
+ if (properties == null) {
+ if (marshalledProperties == null) {
+ return Collections.EMPTY_MAP;
+ }
+ properties = unmarsallProperties(marshalledProperties);
+ }
+ return Collections.unmodifiableMap(properties);
+ }
+
+ public void clearProperties() {
+ marshalledProperties = null;
+ properties = null;
+ }
+
+ public void setProperty(String name, Object value) throws IOException {
+ lazyCreateProperties();
+ properties.put(name, value);
+ }
+
+ protected void lazyCreateProperties() throws IOException {
+ if (properties == null) {
+ if (marshalledProperties == null) {
+ properties = new HashMap<String, Object>();
+ } else {
+ properties = unmarsallProperties(marshalledProperties);
+ marshalledProperties = null;
+ }
+ }
+ }
+
+ private Map<String, Object> unmarsallProperties(Buffer marshalledProperties) throws IOException {
+ return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)), MAX_PROPERTY_SIZE);
+ }
+
+ public void beforeMarshall(OpenWireFormat wireFormat) throws IOException {
+ // Need to marshal the properties.
+ if (marshalledProperties == null && properties != null) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream os = new DataOutputStream(baos);
+ MarshallingSupport.marshalPrimitiveMap(properties, os);
+ os.close();
+ marshalledProperties = baos.toBuffer();
+ }
+ }
+
+ public void afterMarshall(OpenWireFormat wireFormat) throws IOException {
+ }
+
+ public void beforeUnmarshall(OpenWireFormat wireFormat) throws IOException {
+ }
+
+ public void afterUnmarshall(OpenWireFormat wireFormat) throws IOException {
+ }
+
+ public boolean isValid() {
+ return magic != null && Arrays.equals(magic, MAGIC);
+ }
+
+ public void setResponseRequired(boolean responseRequired) {
+ }
+
+ /**
+ * @throws IOException
+ */
+ public boolean isCacheEnabled() throws IOException {
+ return Boolean.TRUE == getProperty("CacheEnabled");
+ }
+
+ public void setCacheEnabled(boolean cacheEnabled) throws IOException {
+ setProperty("CacheEnabled", cacheEnabled ? Boolean.TRUE : Boolean.FALSE);
+ }
+
+ /**
+ * @throws IOException
+ */
+ public boolean isStackTraceEnabled() throws IOException {
+ return Boolean.TRUE == getProperty("StackTraceEnabled");
+ }
+
+ public void setStackTraceEnabled(boolean stackTraceEnabled) throws IOException {
+ setProperty("StackTraceEnabled", stackTraceEnabled ? Boolean.TRUE : Boolean.FALSE);
+ }
+
+ /**
+ * @throws IOException
+ */
+ public boolean isTcpNoDelayEnabled() throws IOException {
+ return Boolean.TRUE == getProperty("TcpNoDelayEnabled");
+ }
+
+ public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) throws IOException {
+ setProperty("TcpNoDelayEnabled", tcpNoDelayEnabled ? Boolean.TRUE : Boolean.FALSE);
+ }
+
+ /**
+ * @throws IOException
+ */
+ public boolean isSizePrefixDisabled() throws IOException {
+ return Boolean.TRUE == getProperty("SizePrefixDisabled");
+ }
+
+ public void setSizePrefixDisabled(boolean prefixPacketSize) throws IOException {
+ setProperty("SizePrefixDisabled", prefixPacketSize ? Boolean.TRUE : Boolean.FALSE);
+ }
+
+ /**
+ * @throws IOException
+ */
+ public boolean isTightEncodingEnabled() throws IOException {
+ return Boolean.TRUE == getProperty("TightEncodingEnabled");
+ }
+
+ public void setTightEncodingEnabled(boolean tightEncodingEnabled) throws IOException {
+ setProperty("TightEncodingEnabled", tightEncodingEnabled ? Boolean.TRUE : Boolean.FALSE);
+ }
+
+ /**
+ * @throws IOException
+ */
+ public long getMaxInactivityDuration() throws IOException {
+ Long l = (Long)getProperty("MaxInactivityDuration");
+ return l == null ? 0 : l.longValue();
+ }
+
+ public void setMaxInactivityDuration(long maxInactivityDuration) throws IOException {
+ setProperty("MaxInactivityDuration", new Long(maxInactivityDuration));
+ }
+
+ public long getMaxInactivityDurationInitalDelay() throws IOException {
+ Long l = (Long)getProperty("MaxInactivityDurationInitalDelay");
+ return l == null ? 0 : l.longValue();
+ }
+
+ public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) throws IOException {
+ setProperty("MaxInactivityDurationInitalDelay", new Long(maxInactivityDurationInitalDelay));
+ }
+
+
+
+ /**
+ * @throws IOException
+ */
+ public int getCacheSize() throws IOException {
+ Integer i = (Integer)getProperty("CacheSize");
+ return i == null ? 0 : i.intValue();
+ }
+
+ public void setCacheSize(int cacheSize) throws IOException {
+ setProperty("CacheSize", new Integer(cacheSize));
+ }
+
+ public Response visit(org.apache.activemq.apollo.openwire.support.state.CommandVisitor visitor) throws Exception {
+ return visitor.processWireFormat(this);
+ }
+
+ public String toString() {
+ Map<String, Object> p = null;
+ try {
+ p = getProperties();
+ } catch (IOException ignore) {
+ }
+ return "WireFormatInfo { version=" + version + ", properties=" + p + ", magic=" + toString(magic) + "}";
+ }
+
+ private String toString(byte[] data) {
+ StringBuffer sb = new StringBuffer();
+ sb.append('[');
+ for (int i = 0; i < data.length; i++) {
+ if (i != 0) {
+ sb.append(',');
+ }
+ sb.append((char)data[i]);
+ }
+ sb.append(']');
+ return sb.toString();
+ }
+
+ // /////////////////////////////////////////////////////////////
+ //
+ // This are not implemented.
+ //
+ // /////////////////////////////////////////////////////////////
+
+ public void setCommandId(int value) {
+ }
+
+ public int getCommandId() {
+ return 0;
+ }
+
+ public boolean isResponseRequired() {
+ return false;
+ }
+
+ public boolean isResponse() {
+ return false;
+ }
+
+ public boolean isBrokerInfo() {
+ return false;
+ }
+
+ public boolean isMessageDispatch() {
+ return false;
+ }
+
+ public boolean isMessage() {
+ return false;
+ }
+
+ public boolean isMessageAck() {
+ return false;
+ }
+
+ public boolean isMessageDispatchNotification() {
+ return false;
+ }
+
+ public boolean isShutdownInfo() {
+ return false;
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/XATransactionId.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/XATransactionId.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/XATransactionId.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/XATransactionId.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.HexSupport;
+
+import javax.transaction.xa.Xid;
+import java.util.Arrays;
+
+/**
+ * @openwire:marshaller code="112"
+ * @version $Revision: 1.6 $
+ */
+public class XATransactionId extends TransactionId implements Xid, Comparable {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_XA_TRANSACTION_ID;
+
+ private int formatId;
+ private byte[] branchQualifier;
+ private byte[] globalTransactionId;
+
+ private transient int hash;
+ private transient String transactionKey;
+
+ public XATransactionId() {
+ }
+
+ public XATransactionId(Xid xid) {
+ this.formatId = xid.getFormatId();
+ this.globalTransactionId = xid.getGlobalTransactionId();
+ this.branchQualifier = xid.getBranchQualifier();
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public synchronized String getTransactionKey() {
+ if (transactionKey == null) {
+ transactionKey = "XID:" + formatId + ":" + HexSupport.toHexFromBuffer(new Buffer(globalTransactionId)) + ":"
+ + HexSupport.toHexFromBuffer(new Buffer(branchQualifier));
+ }
+ return transactionKey;
+ }
+
+ public String toString() {
+ return getTransactionKey();
+ }
+
+ public boolean isXATransaction() {
+ return true;
+ }
+
+ public boolean isLocalTransaction() {
+ return false;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public int getFormatId() {
+ return formatId;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public byte[] getGlobalTransactionId() {
+ return globalTransactionId;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public byte[] getBranchQualifier() {
+ return branchQualifier;
+ }
+
+ public void setBranchQualifier(byte[] branchQualifier) {
+ this.branchQualifier = branchQualifier;
+ this.hash = 0;
+ }
+
+ public void setFormatId(int formatId) {
+ this.formatId = formatId;
+ this.hash = 0;
+ }
+
+ public void setGlobalTransactionId(byte[] globalTransactionId) {
+ this.globalTransactionId = globalTransactionId;
+ this.hash = 0;
+ }
+
+ public int hashCode() {
+ if (hash == 0) {
+ hash = formatId;
+ hash = hash(globalTransactionId, hash);
+ hash = hash(branchQualifier, hash);
+ if (hash == 0) {
+ hash = 0xaceace;
+ }
+ }
+ return hash;
+ }
+
+ private static int hash(byte[] bytes, int hash) {
+ int size = bytes.length;
+ for (int i = 0; i < size; i++) {
+ hash ^= bytes[i] << ((i % 4) * 8);
+ }
+ return hash;
+ }
+
+ public boolean equals(Object o) {
+ if (o == null || o.getClass() != XATransactionId.class) {
+ return false;
+ }
+ XATransactionId xid = (XATransactionId)o;
+ return xid.formatId == formatId && Arrays.equals(xid.globalTransactionId, globalTransactionId)
+ && Arrays.equals(xid.branchQualifier, branchQualifier);
+ }
+
+ public int compareTo(Object o) {
+ if (o == null || o.getClass() != XATransactionId.class) {
+ return -1;
+ }
+ XATransactionId xid = (XATransactionId)o;
+ return getTransactionKey().compareTo(xid.getTransactionKey());
+ }
+
+}
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/XATransactionId.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/package.html?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/package.html (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/package.html Wed Apr 27 17:32:51 2011
@@ -0,0 +1,27 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+<p>
+Command objects used via the Command Pattern to communicate among nodes
+</p>
+
+</body>
+</html>
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/package.html
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/dto/OpenwireConnectionStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/dto/OpenwireConnectionStatusDTO.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/dto/OpenwireConnectionStatusDTO.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/dto/OpenwireConnectionStatusDTO.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.dto;
+
+import org.apache.activemq.apollo.dto.ConnectionStatusDTO;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+@XmlRootElement(name="openwire_connection_status")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class OpenwireConnectionStatusDTO extends ConnectionStatusDTO {
+
+ /**
+ * The version of the STOMP protocol being used.
+ */
+ @XmlAttribute
+ public int protocol_version;
+
+ /**
+ * The connected user
+ */
+ @XmlAttribute
+ public String user;
+
+ /**
+ * What the connection is currently waiting on
+ */
+ @XmlAttribute(name="waiting-on")
+ public String waiting_on;
+
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/MarshallingSupport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/MarshallingSupport.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/MarshallingSupport.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/MarshallingSupport.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
+import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+
+/**
+ * The fixed version of the UTF8 encoding function. Some older JVM's UTF8
+ * encoding function breaks when handling large strings.
+ *
+ */
+public final class MarshallingSupport {
+
+ public static final byte NULL = 0;
+ public static final byte BOOLEAN_TYPE = 1;
+ public static final byte BYTE_TYPE = 2;
+ public static final byte CHAR_TYPE = 3;
+ public static final byte SHORT_TYPE = 4;
+ public static final byte INTEGER_TYPE = 5;
+ public static final byte LONG_TYPE = 6;
+ public static final byte DOUBLE_TYPE = 7;
+ public static final byte FLOAT_TYPE = 8;
+ public static final byte STRING_TYPE = 9;
+ public static final byte BYTE_ARRAY_TYPE = 10;
+ public static final byte MAP_TYPE = 11;
+ public static final byte LIST_TYPE = 12;
+ public static final byte BIG_STRING_TYPE = 13;
+
+ private MarshallingSupport() {
+ }
+
+ public static void marshalPrimitiveMap(Map map, DataOutputStream out) throws IOException {
+ if (map == null) {
+ out.writeInt(-1);
+ } else {
+ out.writeInt(map.size());
+ for (Iterator iter = map.keySet().iterator(); iter.hasNext();) {
+ String name = (String)iter.next();
+ out.writeUTF(name);
+ Object value = map.get(name);
+ marshalPrimitive(out, value);
+ }
+ }
+ }
+
+ public static Map<String, Object> unmarshalPrimitiveMap(DataInputStream in) throws IOException {
+ return unmarshalPrimitiveMap(in, Integer.MAX_VALUE);
+ }
+
+ /**
+ * @param in
+ * @return
+ * @throws IOException
+ * @throws IOException
+ */
+ public static Map<String, Object> unmarshalPrimitiveMap(DataInputStream in, int maxPropertySize) throws IOException {
+ int size = in.readInt();
+ if (size > maxPropertySize) {
+ throw new IOException("Primitive map is larger than the allowed size: " + size);
+ }
+ if (size < 0) {
+ return null;
+ } else {
+ Map<String, Object> rc = new HashMap<String, Object>(size);
+ for (int i = 0; i < size; i++) {
+ String name = in.readUTF();
+ rc.put(name, unmarshalPrimitive(in));
+ }
+ return rc;
+ }
+
+ }
+
+ public static void marshalPrimitiveList(List list, DataOutputStream out) throws IOException {
+ out.writeInt(list.size());
+ for (Iterator iter = list.iterator(); iter.hasNext();) {
+ Object element = (Object)iter.next();
+ marshalPrimitive(out, element);
+ }
+ }
+
+ public static List<Object> unmarshalPrimitiveList(DataInputStream in) throws IOException {
+ int size = in.readInt();
+ List<Object> answer = new ArrayList<Object>(size);
+ while (size-- > 0) {
+ answer.add(unmarshalPrimitive(in));
+ }
+ return answer;
+ }
+
+ public static void marshalPrimitive(DataOutputStream out, Object value) throws IOException {
+ if (value == null) {
+ marshalNull(out);
+ } else if (value.getClass() == Boolean.class) {
+ marshalBoolean(out, ((Boolean)value).booleanValue());
+ } else if (value.getClass() == Byte.class) {
+ marshalByte(out, ((Byte)value).byteValue());
+ } else if (value.getClass() == Character.class) {
+ marshalChar(out, ((Character)value).charValue());
+ } else if (value.getClass() == Short.class) {
+ marshalShort(out, ((Short)value).shortValue());
+ } else if (value.getClass() == Integer.class) {
+ marshalInt(out, ((Integer)value).intValue());
+ } else if (value.getClass() == Long.class) {
+ marshalLong(out, ((Long)value).longValue());
+ } else if (value.getClass() == Float.class) {
+ marshalFloat(out, ((Float)value).floatValue());
+ } else if (value.getClass() == Double.class) {
+ marshalDouble(out, ((Double)value).doubleValue());
+ } else if (value.getClass() == byte[].class) {
+ marshalByteArray(out, (byte[])value);
+ } else if (value.getClass() == String.class) {
+ marshalString(out, (String)value);
+ } else if (value instanceof Map) {
+ out.writeByte(MAP_TYPE);
+ marshalPrimitiveMap((Map)value, out);
+ } else if (value instanceof List) {
+ out.writeByte(LIST_TYPE);
+ marshalPrimitiveList((List)value, out);
+ } else {
+ throw new IOException("Object is not a primitive: " + value);
+ }
+ }
+
+ public static Object unmarshalPrimitive(DataInputStream in) throws IOException {
+ Object value = null;
+ byte type = in.readByte();
+ switch (type) {
+ case BYTE_TYPE:
+ value = Byte.valueOf(in.readByte());
+ break;
+ case BOOLEAN_TYPE:
+ value = in.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
+ break;
+ case CHAR_TYPE:
+ value = Character.valueOf(in.readChar());
+ break;
+ case SHORT_TYPE:
+ value = Short.valueOf(in.readShort());
+ break;
+ case INTEGER_TYPE:
+ value = Integer.valueOf(in.readInt());
+ break;
+ case LONG_TYPE:
+ value = Long.valueOf(in.readLong());
+ break;
+ case FLOAT_TYPE:
+ value = new Float(in.readFloat());
+ break;
+ case DOUBLE_TYPE:
+ value = new Double(in.readDouble());
+ break;
+ case BYTE_ARRAY_TYPE:
+ value = new byte[in.readInt()];
+ in.readFully((byte[])value);
+ break;
+ case STRING_TYPE:
+ value = in.readUTF();
+ break;
+ case BIG_STRING_TYPE:
+ value = readUTF8(in);
+ break;
+ case MAP_TYPE:
+ value = unmarshalPrimitiveMap(in);
+ break;
+ case LIST_TYPE:
+ value = unmarshalPrimitiveList(in);
+ break;
+ case NULL:
+ value = null;
+ break;
+ default:
+ throw new IOException("Unknown primitive type: " + type);
+ }
+ return value;
+ }
+
+ public static void marshalNull(DataOutputStream out) throws IOException {
+ out.writeByte(NULL);
+ }
+
+ public static void marshalBoolean(DataOutputStream out, boolean value) throws IOException {
+ out.writeByte(BOOLEAN_TYPE);
+ out.writeBoolean(value);
+ }
+
+ public static void marshalByte(DataOutputStream out, byte value) throws IOException {
+ out.writeByte(BYTE_TYPE);
+ out.writeByte(value);
+ }
+
+ public static void marshalChar(DataOutputStream out, char value) throws IOException {
+ out.writeByte(CHAR_TYPE);
+ out.writeChar(value);
+ }
+
+ public static void marshalShort(DataOutputStream out, short value) throws IOException {
+ out.writeByte(SHORT_TYPE);
+ out.writeShort(value);
+ }
+
+ public static void marshalInt(DataOutputStream out, int value) throws IOException {
+ out.writeByte(INTEGER_TYPE);
+ out.writeInt(value);
+ }
+
+ public static void marshalLong(DataOutputStream out, long value) throws IOException {
+ out.writeByte(LONG_TYPE);
+ out.writeLong(value);
+ }
+
+ public static void marshalFloat(DataOutputStream out, float value) throws IOException {
+ out.writeByte(FLOAT_TYPE);
+ out.writeFloat(value);
+ }
+
+ public static void marshalDouble(DataOutputStream out, double value) throws IOException {
+ out.writeByte(DOUBLE_TYPE);
+ out.writeDouble(value);
+ }
+
+ public static void marshalByteArray(DataOutputStream out, byte[] value) throws IOException {
+ marshalByteArray(out, value, 0, value.length);
+ }
+
+ public static void marshalByteArray(DataOutputStream out, byte[] value, int offset, int length) throws IOException {
+ out.writeByte(BYTE_ARRAY_TYPE);
+ out.writeInt(length);
+ out.write(value, offset, length);
+ }
+
+ public static void marshalString(DataOutputStream out, String s) throws IOException {
+ // If it's too big, out.writeUTF may not able able to write it out.
+ if (s.length() < Short.MAX_VALUE / 4) {
+ out.writeByte(STRING_TYPE);
+ out.writeUTF(s);
+ } else {
+ out.writeByte(BIG_STRING_TYPE);
+ writeUTF8(out, s);
+ }
+ }
+
+ public static void writeUTF8(DataOutput dataOut, String text) throws IOException {
+ if (text != null) {
+ int strlen = text.length();
+ int utflen = 0;
+ char[] charr = new char[strlen];
+ int c = 0;
+ int count = 0;
+
+ text.getChars(0, strlen, charr, 0);
+
+ for (int i = 0; i < strlen; i++) {
+ c = charr[i];
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ utflen++;
+ } else if (c > 0x07FF) {
+ utflen += 3;
+ } else {
+ utflen += 2;
+ }
+ }
+ // TODO diff: Sun code - removed
+ byte[] bytearr = new byte[utflen + 4]; // TODO diff: Sun code
+ bytearr[count++] = (byte)((utflen >>> 24) & 0xFF); // TODO diff:
+ // Sun code
+ bytearr[count++] = (byte)((utflen >>> 16) & 0xFF); // TODO diff:
+ // Sun code
+ bytearr[count++] = (byte)((utflen >>> 8) & 0xFF);
+ bytearr[count++] = (byte)((utflen >>> 0) & 0xFF);
+ for (int i = 0; i < strlen; i++) {
+ c = charr[i];
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ bytearr[count++] = (byte)c;
+ } else if (c > 0x07FF) {
+ bytearr[count++] = (byte)(0xE0 | ((c >> 12) & 0x0F));
+ bytearr[count++] = (byte)(0x80 | ((c >> 6) & 0x3F));
+ bytearr[count++] = (byte)(0x80 | ((c >> 0) & 0x3F));
+ } else {
+ bytearr[count++] = (byte)(0xC0 | ((c >> 6) & 0x1F));
+ bytearr[count++] = (byte)(0x80 | ((c >> 0) & 0x3F));
+ }
+ }
+ dataOut.write(bytearr);
+
+ } else {
+ dataOut.writeInt(-1);
+ }
+ }
+
+ public static String readUTF8(DataInput dataIn) throws IOException {
+ int utflen = dataIn.readInt(); // TODO diff: Sun code
+ if (utflen > -1) {
+ StringBuffer str = new StringBuffer(utflen);
+ byte bytearr[] = new byte[utflen];
+ int c;
+ int char2;
+ int char3;
+ int count = 0;
+
+ dataIn.readFully(bytearr, 0, utflen);
+
+ while (count < utflen) {
+ c = bytearr[count] & 0xff;
+ switch (c >> 4) {
+ case 0:
+ case 1:
+ case 2:
+ case 3:
+ case 4:
+ case 5:
+ case 6:
+ case 7:
+ /* 0xxxxxxx */
+ count++;
+ str.append((char)c);
+ break;
+ case 12:
+ case 13:
+ /* 110x xxxx 10xx xxxx */
+ count += 2;
+ if (count > utflen) {
+ throw new UTFDataFormatException();
+ }
+ char2 = bytearr[count - 1];
+ if ((char2 & 0xC0) != 0x80) {
+ throw new UTFDataFormatException();
+ }
+ str.append((char)(((c & 0x1F) << 6) | (char2 & 0x3F)));
+ break;
+ case 14:
+ /* 1110 xxxx 10xx xxxx 10xx xxxx */
+ count += 3;
+ if (count > utflen) {
+ throw new UTFDataFormatException();
+ }
+ char2 = bytearr[count - 2]; // TODO diff: Sun code
+ char3 = bytearr[count - 1]; // TODO diff: Sun code
+ if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+ throw new UTFDataFormatException();
+ }
+ str.append((char)(((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0)));
+ break;
+ default:
+ /* 10xx xxxx, 1111 xxxx */
+ throw new UTFDataFormatException();
+ }
+ }
+ // The number of chars produced may be less than utflen
+ return new String(str);
+ } else {
+ return null;
+ }
+ }
+
+ public static String propertiesToString(Properties props) throws IOException {
+ String result = "";
+ if (props != null) {
+ DataByteArrayOutputStream dataOut = new DataByteArrayOutputStream();
+ props.store(dataOut, "");
+ result = new String(dataOut.getData(), 0, dataOut.size());
+ dataOut.close();
+ }
+ return result;
+ }
+
+ public static Properties stringToProperties(String str) throws IOException {
+ Properties result = new Properties();
+ if (str != null && str.length() > 0) {
+ DataByteArrayInputStream dataIn = new DataByteArrayInputStream(str.getBytes());
+ result.load(dataIn);
+ dataIn.close();
+ }
+ return result;
+ }
+
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/OpenwireException.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/OpenwireException.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/OpenwireException.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/OpenwireException.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.openwire.support;
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class OpenwireException extends Exception {
+
+ public OpenwireException() {
+ }
+
+ public OpenwireException(String s) {
+ super(s);
+ }
+
+ public OpenwireException(String s, Throwable throwable) {
+ super(s, throwable);
+ }
+
+ public OpenwireException(Throwable throwable) {
+ super(throwable);
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/Settings.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/Settings.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/Settings.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/Settings.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.openwire.support;
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class Settings {
+
+ static public boolean enable_compression() {
+ return true;
+ }
+
+ static public boolean enable_nested_map_and_list() {
+ return true;
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/TypeConversionSupport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/TypeConversionSupport.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/TypeConversionSupport.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/TypeConversionSupport.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support;
+
+import java.util.Date;
+import java.util.HashMap;
+
+import org.apache.activemq.apollo.openwire.command.ActiveMQDestination;
+
+public final class TypeConversionSupport {
+
+ static class ConversionKey {
+ final Class from;
+ final Class to;
+ final int hashCode;
+
+ public ConversionKey(Class from, Class to) {
+ this.from = from;
+ this.to = to;
+ this.hashCode = from.hashCode() ^ (to.hashCode() << 1);
+ }
+
+ public boolean equals(Object o) {
+ ConversionKey x = (ConversionKey)o;
+ return x.from == from && x.to == to;
+ }
+
+ public int hashCode() {
+ return hashCode;
+ }
+ }
+
+ interface Converter {
+ Object convert(Object value);
+ }
+
+ private static final HashMap<ConversionKey, Converter> CONVERSION_MAP = new HashMap<ConversionKey, Converter>();
+ static {
+ Converter toStringConverter = new Converter() {
+ public Object convert(Object value) {
+ return value.toString();
+ }
+ };
+ CONVERSION_MAP.put(new ConversionKey(Boolean.class, String.class), toStringConverter);
+ CONVERSION_MAP.put(new ConversionKey(Byte.class, String.class), toStringConverter);
+ CONVERSION_MAP.put(new ConversionKey(Short.class, String.class), toStringConverter);
+ CONVERSION_MAP.put(new ConversionKey(Integer.class, String.class), toStringConverter);
+ CONVERSION_MAP.put(new ConversionKey(Long.class, String.class), toStringConverter);
+ CONVERSION_MAP.put(new ConversionKey(Float.class, String.class), toStringConverter);
+ CONVERSION_MAP.put(new ConversionKey(Double.class, String.class), toStringConverter);
+
+ CONVERSION_MAP.put(new ConversionKey(String.class, Boolean.class), new Converter() {
+ public Object convert(Object value) {
+ return Boolean.valueOf((String)value);
+ }
+ });
+ CONVERSION_MAP.put(new ConversionKey(String.class, Byte.class), new Converter() {
+ public Object convert(Object value) {
+ return Byte.valueOf((String)value);
+ }
+ });
+ CONVERSION_MAP.put(new ConversionKey(String.class, Short.class), new Converter() {
+ public Object convert(Object value) {
+ return Short.valueOf((String)value);
+ }
+ });
+ CONVERSION_MAP.put(new ConversionKey(String.class, Integer.class), new Converter() {
+ public Object convert(Object value) {
+ return Integer.valueOf((String)value);
+ }
+ });
+ CONVERSION_MAP.put(new ConversionKey(String.class, Long.class), new Converter() {
+ public Object convert(Object value) {
+ return Long.valueOf((String)value);
+ }
+ });
+ CONVERSION_MAP.put(new ConversionKey(String.class, Float.class), new Converter() {
+ public Object convert(Object value) {
+ return Float.valueOf((String)value);
+ }
+ });
+ CONVERSION_MAP.put(new ConversionKey(String.class, Double.class), new Converter() {
+ public Object convert(Object value) {
+ return Double.valueOf((String)value);
+ }
+ });
+
+ Converter longConverter = new Converter() {
+ public Object convert(Object value) {
+ return Long.valueOf(((Number)value).longValue());
+ }
+ };
+ CONVERSION_MAP.put(new ConversionKey(Byte.class, Long.class), longConverter);
+ CONVERSION_MAP.put(new ConversionKey(Short.class, Long.class), longConverter);
+ CONVERSION_MAP.put(new ConversionKey(Integer.class, Long.class), longConverter);
+ CONVERSION_MAP.put(new ConversionKey(Date.class, Long.class), new Converter() {
+ public Object convert(Object value) {
+ return Long.valueOf(((Date)value).getTime());
+ }
+ });
+
+ Converter intConverter = new Converter() {
+ public Object convert(Object value) {
+ return Integer.valueOf(((Number)value).intValue());
+ }
+ };
+ CONVERSION_MAP.put(new ConversionKey(Byte.class, Integer.class), intConverter);
+ CONVERSION_MAP.put(new ConversionKey(Short.class, Integer.class), intConverter);
+
+ CONVERSION_MAP.put(new ConversionKey(Byte.class, Short.class), new Converter() {
+ public Object convert(Object value) {
+ return Short.valueOf(((Number)value).shortValue());
+ }
+ });
+
+ CONVERSION_MAP.put(new ConversionKey(Float.class, Double.class), new Converter() {
+ public Object convert(Object value) {
+ return new Double(((Number)value).doubleValue());
+ }
+ });
+ CONVERSION_MAP.put(new ConversionKey(String.class, ActiveMQDestination.class), new Converter() {
+ public Object convert(Object value) {
+ return ActiveMQDestination.createDestination((String)value, ActiveMQDestination.QUEUE_TYPE);
+ }
+ });
+ }
+
+ private TypeConversionSupport() {
+ }
+
+ public static Object convert(Object value, Class clazz) {
+
+ assert value != null && clazz != null;
+
+ if (value.getClass() == clazz) {
+ return value;
+ }
+
+ Converter c = CONVERSION_MAP.get(new ConversionKey(value.getClass(), clazz));
+ if (c == null) {
+ return null;
+ }
+ return c.convert(value);
+
+ }
+
+}
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/TypeConversionSupport.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/AdvisorySupport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/AdvisorySupport.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/AdvisorySupport.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/AdvisorySupport.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support.advisory;
+
+import org.apache.activemq.apollo.openwire.command.ActiveMQDestination;
+import org.apache.activemq.apollo.openwire.command.ActiveMQTopic;
+
+public final class AdvisorySupport {
+
+ public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.";
+ public static final ActiveMQTopic CONNECTION_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Connection");
+ public static final ActiveMQTopic QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Queue");
+ public static final ActiveMQTopic TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Topic");
+ public static final ActiveMQTopic TEMP_QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempQueue");
+ public static final ActiveMQTopic TEMP_TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempTopic");
+ public static final String PRODUCER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Producer.";
+ public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue.";
+ public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic.";
+ public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer.";
+ public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue.";
+ public static final String TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic.";
+ public static final String EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Topic.";
+ public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Queue.";
+ public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Topic.";
+ public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Queue.";
+ public static final String SLOW_CONSUMER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "SlowConsumer.";
+ public static final String FAST_PRODUCER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FastPorducer.";
+ public static final String MESSAGE_DISCAREDED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDiscarded.";
+ public static final String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL.";
+ public static final String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered.";
+ public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed.";
+ public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker";
+ public static final String AGENT_TOPIC = "ActiveMQ.Agent";
+ public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
+ public static final String MSG_PROPERTY_ORIGIN_BROKER_ID="originBrokerId";
+ public static final String MSG_PROPERTY_ORIGIN_BROKER_NAME="originBrokerName";
+ public static final String MSG_PROPERTY_ORIGIN_BROKER_URL="originBrokerURL";
+ public static final String MSG_PROPERTY_USAGE_NAME="usageName";
+ public static final String MSG_PROPERTY_CONSUMER_ID="consumerId";
+ public static final String MSG_PROPERTY_PRODUCER_ID="producerId";
+ public static final String MSG_PROPERTY_MESSAGE_ID="orignalMessageId";
+ public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(TEMP_QUEUE_ADVISORY_TOPIC + "," + TEMP_TOPIC_ADVISORY_TOPIC);
+ private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC);
+
+ private AdvisorySupport() {
+ }
+
+ public static ActiveMQTopic getConnectionAdvisoryTopic() {
+ return CONNECTION_ADVISORY_TOPIC;
+ }
+
+ public static ActiveMQTopic getConsumerAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isQueue()) {
+ return new ActiveMQTopic(QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
+ } else {
+ return new ActiveMQTopic(TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
+ }
+ }
+
+ public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isQueue()) {
+ return new ActiveMQTopic(QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
+ } else {
+ return new ActiveMQTopic(TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
+ }
+ }
+
+ public static ActiveMQTopic getExpiredMessageTopic(ActiveMQDestination destination) {
+ if (destination.isQueue()) {
+ return getExpiredQueueMessageAdvisoryTopic(destination);
+ }
+ return getExpiredTopicMessageAdvisoryTopic(destination);
+ }
+
+ public static ActiveMQTopic getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination) {
+ String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName();
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(ActiveMQDestination destination) {
+ String name = EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName();
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(ActiveMQDestination destination) {
+ String name = NO_TOPIC_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName();
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(ActiveMQDestination destination) {
+ String name = NO_QUEUE_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName();
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic getSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
+ String name = SLOW_CONSUMER_TOPIC_PREFIX
+ + destination.getDestinationTypeAsString() + "."
+ + destination.getPhysicalName();
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic getFastProducerAdvisoryTopic(ActiveMQDestination destination) {
+ String name = FAST_PRODUCER_TOPIC_PREFIX
+ + destination.getDestinationTypeAsString() + "."
+ + destination.getPhysicalName();
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
+ String name = MESSAGE_DISCAREDED_TOPIC_PREFIX
+ + destination.getDestinationTypeAsString() + "."
+ + destination.getPhysicalName();
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
+ String name = MESSAGE_DELIVERED_TOPIC_PREFIX
+ + destination.getDestinationTypeAsString() + "."
+ + destination.getPhysicalName();
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic getMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
+ String name = MESSAGE_CONSUMED_TOPIC_PREFIX
+ + destination.getDestinationTypeAsString() + "."
+ + destination.getPhysicalName();
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic getMasterBrokerAdvisoryTopic() {
+ return new ActiveMQTopic(MASTER_BROKER_TOPIC_PREFIX);
+ }
+
+ public static ActiveMQTopic getFullAdvisoryTopic(ActiveMQDestination destination) {
+ String name = FULL_TOPIC_PREFIX
+ + destination.getDestinationTypeAsString() + "."
+ + destination.getPhysicalName();
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic getDestinationAdvisoryTopic(ActiveMQDestination destination) {
+ switch (destination.getDestinationType()) {
+ case ActiveMQDestination.QUEUE_TYPE:
+ return QUEUE_ADVISORY_TOPIC;
+ case ActiveMQDestination.TOPIC_TYPE:
+ return TOPIC_ADVISORY_TOPIC;
+ case ActiveMQDestination.TEMP_QUEUE_TYPE:
+ return TEMP_QUEUE_ADVISORY_TOPIC;
+ case ActiveMQDestination.TEMP_TOPIC_TYPE:
+ return TEMP_TOPIC_ADVISORY_TOPIC;
+ default:
+ throw new RuntimeException("Unknown destination type: " + destination.getDestinationType());
+ }
+ }
+
+ public static boolean isDestinationAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isDestinationAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC) || destination.equals(QUEUE_ADVISORY_TOPIC)
+ || destination.equals(TOPIC_ADVISORY_TOPIC);
+ }
+ }
+
+ public static boolean isAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.isTopic() && destination.getPhysicalName().startsWith(ADVISORY_TOPIC_PREFIX);
+ }
+ }
+
+ public static boolean isConnectionAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isConnectionAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.equals(CONNECTION_ADVISORY_TOPIC);
+ }
+ }
+
+ public static boolean isProducerAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isProducerAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.isTopic() && destination.getPhysicalName().startsWith(PRODUCER_ADVISORY_TOPIC_PREFIX);
+ }
+ }
+
+ public static boolean isConsumerAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isConsumerAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.isTopic() && destination.getPhysicalName().startsWith(CONSUMER_ADVISORY_TOPIC_PREFIX);
+ }
+ }
+
+ public static boolean isSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isSlowConsumerAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.isTopic() && destination.getPhysicalName().startsWith(SLOW_CONSUMER_TOPIC_PREFIX);
+ }
+ }
+
+ public static boolean isFastProducerAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isFastProducerAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.isTopic() && destination.getPhysicalName().startsWith(FAST_PRODUCER_TOPIC_PREFIX);
+ }
+ }
+
+ public static boolean isMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isMessageConsumedAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_CONSUMED_TOPIC_PREFIX);
+ }
+ }
+
+ public static boolean isMasterBrokerAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isMasterBrokerAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.isTopic() && destination.getPhysicalName().startsWith(MASTER_BROKER_TOPIC_PREFIX);
+ }
+ }
+
+ public static boolean isMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isMessageDeliveredAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DELIVERED_TOPIC_PREFIX);
+ }
+ }
+
+ public static boolean isMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isMessageDiscardedAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DISCAREDED_TOPIC_PREFIX);
+ }
+ }
+
+ public static boolean isFullAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isFullAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.isTopic() && destination.getPhysicalName().startsWith(FULL_TOPIC_PREFIX);
+ }
+ }
+
+ /**
+ * Returns the agent topic which is used to send commands to the broker
+ */
+ public static ActiveMQDestination getAgentDestination() {
+ return AGENT_TOPIC_DESTINATION;
+ }
+}
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/AdvisorySupport.java
------------------------------------------------------------------------------
svn:executable = *