You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/01 20:37:54 UTC
svn commit: r780773 [7/31] - in /activemq/sandbox/activemq-flow:
activemq-client/ activemq-client/src/main/java/org/
activemq-client/src/main/java/org/apache/
activemq-client/src/main/java/org/apache/activemq/
activemq-client/src/main/java/org/apache/a...
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ReplayCommand.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ReplayCommand.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ReplayCommand.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ReplayCommand.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * A general purpose replay command for some kind of producer where ranges of
+ * messages are asked to be replayed. This command is typically used over a
+ * non-reliable transport such as UDP or multicast but could also be used on
+ * TCP/IP if a socket has been re-established.
+ *
+ * @openwire:marshaller code="65"
+ * @version $Revision: 563921 $
+ */
+public class ReplayCommand extends BaseCommand {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REPLAY;
+
+ private String producerId;
+ private int firstAckNumber;
+ private int lastAckNumber;
+ private int firstNakNumber;
+ private int lastNakNumber;
+
+ public ReplayCommand() {
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public String getProducerId() {
+ return producerId;
+ }
+
+ /**
+ * Is used to uniquely identify the producer of the sequence
+ *
+ * @openwire:property version=1 cache=false
+ */
+ public void setProducerId(String producerId) {
+ this.producerId = producerId;
+ }
+
+ public int getFirstAckNumber() {
+ return firstAckNumber;
+ }
+
+ /**
+ * Is used to specify the first sequence number being acknowledged as delivered on the transport
+ * so that it can be removed from cache
+ *
+ * @openwire:property version=1
+ */
+ public void setFirstAckNumber(int firstSequenceNumber) {
+ this.firstAckNumber = firstSequenceNumber;
+ }
+
+ public int getLastAckNumber() {
+ return lastAckNumber;
+ }
+
+ /**
+ * Is used to specify the last sequence number being acknowledged as delivered on the transport
+ * so that it can be removed from cache
+ *
+ * @openwire:property version=1
+ */
+ public void setLastAckNumber(int lastSequenceNumber) {
+ this.lastAckNumber = lastSequenceNumber;
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ return null;
+ }
+
+ /**
+ * Is used to specify the first sequence number to be replayed
+ *
+ * @openwire:property version=1
+ */
+ public int getFirstNakNumber() {
+ return firstNakNumber;
+ }
+
+ public void setFirstNakNumber(int firstNakNumber) {
+ this.firstNakNumber = firstNakNumber;
+ }
+
+ /**
+ * Is used to specify the last sequence number to be replayed
+ *
+ * @openwire:property version=1
+ */
+ public int getLastNakNumber() {
+ return lastNakNumber;
+ }
+
+ public void setLastNakNumber(int lastNakNumber) {
+ this.lastNakNumber = lastNakNumber;
+ }
+
+ public String toString() {
+ return "ReplayCommand {commandId = " + getCommandId() + ", firstNakNumber = " + getFirstNakNumber() + ", lastNakNumber = " + getLastNakNumber() + "}";
+ }
+
+}
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Response.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Response.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Response.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Response.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * @openwire:marshaller code="30"
+ * @version $Revision: 1.6 $
+ */
+public class Response extends BaseCommand {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.RESPONSE;
+ int correlationId;
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public int getCorrelationId() {
+ return correlationId;
+ }
+
+ public void setCorrelationId(int responseId) {
+ this.correlationId = responseId;
+ }
+
+ public boolean isResponse() {
+ return true;
+ }
+
+ public boolean isException() {
+ return false;
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ return null;
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Response.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SessionId.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SessionId.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SessionId.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SessionId.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+/**
+ *
+ * @openwire:marshaller code="121"
+ * @version $Revision$
+ */
+public class SessionId implements DataStructure {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SESSION_ID;
+
+ protected String connectionId;
+ protected long value;
+
+ protected transient int hashCode;
+ protected transient String key;
+ protected transient ConnectionId parentId;
+
+ public SessionId() {
+ }
+
+ public SessionId(ConnectionId connectionId, long sessionId) {
+ this.connectionId = connectionId.getValue();
+ this.value = sessionId;
+ }
+
+ public SessionId(SessionId id) {
+ this.connectionId = id.getConnectionId();
+ this.value = id.getValue();
+ }
+
+ public SessionId(ProducerId id) {
+ this.connectionId = id.getConnectionId();
+ this.value = id.getSessionId();
+ }
+
+ public SessionId(ConsumerId id) {
+ this.connectionId = id.getConnectionId();
+ this.value = id.getSessionId();
+ }
+
+ public ConnectionId getParentId() {
+ if (parentId == null) {
+ parentId = new ConnectionId(this);
+ }
+ return parentId;
+ }
+
+ public int hashCode() {
+ if (hashCode == 0) {
+ hashCode = connectionId.hashCode() ^ (int)value;
+ }
+ return hashCode;
+ }
+
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || o.getClass() != SessionId.class) {
+ return false;
+ }
+ SessionId id = (SessionId)o;
+ return value == id.value && connectionId.equals(id.connectionId);
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public String getConnectionId() {
+ return connectionId;
+ }
+
+ public void setConnectionId(String connectionId) {
+ this.connectionId = connectionId;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public long getValue() {
+ return value;
+ }
+
+ public void setValue(long sessionId) {
+ this.value = sessionId;
+ }
+
+ public String toString() {
+ if (key == null) {
+ key = connectionId + ":" + value;
+ }
+ return key;
+ }
+
+ public boolean isMarshallAware() {
+ return false;
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SessionId.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SessionInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SessionInfo.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SessionInfo.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SessionInfo.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ *
+ * @openwire:marshaller code="4"
+ * @version $Revision: 1.13 $
+ */
+public class SessionInfo extends BaseCommand {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SESSION_INFO;
+
+ protected SessionId sessionId;
+
+ public SessionInfo() {
+ sessionId = new SessionId();
+ }
+
+ public SessionInfo(ConnectionInfo connectionInfo, long sessionId) {
+ this.sessionId = new SessionId(connectionInfo.getConnectionId(), sessionId);
+ }
+
+ public SessionInfo(SessionId sessionId) {
+ this.sessionId = sessionId;
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public SessionId getSessionId() {
+ return sessionId;
+ }
+
+ public void setSessionId(SessionId sessionId) {
+ this.sessionId = sessionId;
+ }
+
+ public RemoveInfo createRemoveCommand() {
+ RemoveInfo command = new RemoveInfo(getSessionId());
+ command.setResponseRequired(isResponseRequired());
+ return command;
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ return visitor.processAddSession(this);
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SessionInfo.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ShutdownInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ShutdownInfo.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ShutdownInfo.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ShutdownInfo.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ *
+ * @openwire:marshaller code="11"
+ * @version $Revision$
+ */
+public class ShutdownInfo extends BaseCommand {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SHUTDOWN_INFO;
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ return visitor.processShutdown(this);
+ }
+
+ public boolean isShutdownInfo() {
+ return true;
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ShutdownInfo.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SubscriptionInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SubscriptionInfo.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SubscriptionInfo.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SubscriptionInfo.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.util.IntrospectionSupport;
+
+/**
+ * Used to represent a durable subscription.
+ *
+ * @openwire:marshaller code="55"
+ * @version $Revision: 1.6 $
+ */
+public class SubscriptionInfo implements DataStructure {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.DURABLE_SUBSCRIPTION_INFO;
+
+ protected ActiveMQDestination subscribedDestination;
+ protected ActiveMQDestination destination;
+ protected String clientId;
+ protected String subscriptionName;
+ protected String selector;
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ /**
+ * This is the a resolved destination that the subscription is receiving
+ * messages from. This will never be a pattern or a composite destination.
+ *
+ * @openwire:property version=1 cache=true
+ */
+ public ActiveMQDestination getDestination() {
+ return destination;
+ }
+
+ public void setDestination(ActiveMQDestination destination) {
+ this.destination = destination;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public String getSelector() {
+ return selector;
+ }
+
+ public void setSelector(String selector) {
+ this.selector = selector;
+ }
+
+ /**
+ * @openwire:property version=1
+ * @deprecated
+ */
+ public String getSubcriptionName() {
+ return subscriptionName;
+ }
+
+ /**
+ * @param subscriptionName *
+ * @deprecated
+ */
+ public void setSubcriptionName(String subscriptionName) {
+ this.subscriptionName = subscriptionName;
+ }
+
+ public String getSubscriptionName() {
+ return subscriptionName;
+ }
+
+ public void setSubscriptionName(String subscriptionName) {
+ this.subscriptionName = subscriptionName;
+ }
+
+ public boolean isMarshallAware() {
+ return false;
+ }
+
+ public String toString() {
+ return IntrospectionSupport.toString(this);
+ }
+
+ public int hashCode() {
+ int h1 = clientId != null ? clientId.hashCode() : -1;
+ int h2 = subscriptionName != null ? subscriptionName.hashCode() : -1;
+ return h1 ^ h2;
+ }
+
+ public boolean equals(Object obj) {
+ boolean result = false;
+ if (obj instanceof SubscriptionInfo) {
+ SubscriptionInfo other = (SubscriptionInfo)obj;
+ result = (clientId == null && other.clientId == null || clientId != null
+ && other.clientId != null
+ && clientId.equals(other.clientId))
+ && (subscriptionName == null && other.subscriptionName == null || subscriptionName != null
+ && other.subscriptionName != null
+ && subscriptionName
+ .equals(other.subscriptionName));
+ }
+ return result;
+ }
+
+ /**
+ * The destination the client originally subscribed to.. This may not match
+ * the {@see getDestination} method if the subscribed destination uses
+ * patterns or composites.
+ *
+ * If the subscribed destinationis not set, this just ruturns the
+ * desitination.
+ *
+ * @openwire:property version=3
+ */
+ public ActiveMQDestination getSubscribedDestination() {
+ if (subscribedDestination == null) {
+ return getDestination();
+ }
+ return subscribedDestination;
+ }
+
+ public void setSubscribedDestination(ActiveMQDestination subscribedDestination) {
+ this.subscribedDestination = subscribedDestination;
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/SubscriptionInfo.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/TransactionId.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/TransactionId.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/TransactionId.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/TransactionId.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+/**
+ * @openwire:marshaller
+ * @version $Revision: 1.6 $
+ */
+public abstract class TransactionId implements DataStructure {
+
+ public abstract boolean isXATransaction();
+ public abstract boolean isLocalTransaction();
+ public abstract String getTransactionKey();
+
+ public boolean isMarshallAware() {
+ return false;
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/TransactionId.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/TransactionInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/TransactionInfo.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/TransactionInfo.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/TransactionInfo.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import java.io.IOException;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ *
+ * @openwire:marshaller code="7"
+ * @version $Revision: 1.10 $
+ */
+public class TransactionInfo extends BaseCommand {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.TRANSACTION_INFO;
+
+ public static final byte BEGIN = 0;
+ public static final byte PREPARE = 1;
+ public static final byte COMMIT_ONE_PHASE = 2;
+ public static final byte COMMIT_TWO_PHASE = 3;
+ public static final byte ROLLBACK = 4;
+ public static final byte RECOVER = 5;
+ public static final byte FORGET = 6;
+ public static final byte END = 7;
+
+ protected byte type;
+ protected ConnectionId connectionId;
+ protected TransactionId transactionId;
+
+ public TransactionInfo() {
+ }
+
+ public TransactionInfo(ConnectionId connectionId, TransactionId transactionId, byte type) {
+ this.connectionId = connectionId;
+ this.transactionId = transactionId;
+ this.type = type;
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public ConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public void setConnectionId(ConnectionId connectionId) {
+ this.connectionId = connectionId;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public TransactionId getTransactionId() {
+ return transactionId;
+ }
+
+ public void setTransactionId(TransactionId transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public byte getType() {
+ return type;
+ }
+
+ public void setType(byte type) {
+ this.type = type;
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ switch (type) {
+ case TransactionInfo.BEGIN:
+ return visitor.processBeginTransaction(this);
+ case TransactionInfo.END:
+ return visitor.processEndTransaction(this);
+ case TransactionInfo.PREPARE:
+ return visitor.processPrepareTransaction(this);
+ case TransactionInfo.COMMIT_ONE_PHASE:
+ return visitor.processCommitTransactionOnePhase(this);
+ case TransactionInfo.COMMIT_TWO_PHASE:
+ return visitor.processCommitTransactionTwoPhase(this);
+ case TransactionInfo.ROLLBACK:
+ return visitor.processRollbackTransaction(this);
+ case TransactionInfo.RECOVER:
+ return visitor.processRecoverTransactions(this);
+ case TransactionInfo.FORGET:
+ return visitor.processForgetTransaction(this);
+ default:
+ throw new IOException("Transaction info type unknown: " + type);
+ }
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/TransactionInfo.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/WireFormatInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/WireFormatInfo.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/WireFormatInfo.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/WireFormatInfo.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.state.CommandVisitor;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.MarshallingSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * @openwire:marshaller code="1"
+ * @version $Revision$
+ */
+public class WireFormatInfo implements Command, MarshallAware {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.WIREFORMAT_INFO;
+ private static final int MAX_PROPERTY_SIZE = 1024 * 4;
+ private static final byte MAGIC[] = new byte[] {'A', 'c', 't', 'i', 'v', 'e', 'M', 'Q'};
+
+ protected byte magic[] = MAGIC;
+ protected int version;
+ protected ByteSequence marshalledProperties;
+
+ protected transient Map<String, Object> properties;
+ private transient Endpoint from;
+ private transient Endpoint to;
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public boolean isWireFormatInfo() {
+ return true;
+ }
+
+ public boolean isMarshallAware() {
+ return true;
+ }
+
+ /**
+ * @openwire:property version=1 size=8 testSize=-1
+ */
+ public byte[] getMagic() {
+ return magic;
+ }
+
+ public void setMagic(byte[] magic) {
+ this.magic = magic;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public int getVersion() {
+ return version;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public ByteSequence getMarshalledProperties() {
+ return marshalledProperties;
+ }
+
+ public void setMarshalledProperties(ByteSequence marshalledProperties) {
+ this.marshalledProperties = marshalledProperties;
+ }
+
+ /**
+ * The endpoint within the transport where this message came from.
+ */
+ public Endpoint getFrom() {
+ return from;
+ }
+
+ public void setFrom(Endpoint from) {
+ this.from = from;
+ }
+
+ /**
+ * The endpoint within the transport where this message is going to - null
+ * means all endpoints.
+ */
+ public Endpoint getTo() {
+ return to;
+ }
+
+ public void setTo(Endpoint to) {
+ this.to = to;
+ }
+
+ // ////////////////////
+ //
+ // Implementation Methods.
+ //
+ // ////////////////////
+
+ public Object getProperty(String name) throws IOException {
+ if (properties == null) {
+ if (marshalledProperties == null) {
+ return null;
+ }
+ properties = unmarsallProperties(marshalledProperties);
+ }
+ return properties.get(name);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Map<String, Object> getProperties() throws IOException {
+ if (properties == null) {
+ if (marshalledProperties == null) {
+ return Collections.EMPTY_MAP;
+ }
+ properties = unmarsallProperties(marshalledProperties);
+ }
+ return Collections.unmodifiableMap(properties);
+ }
+
+ public void clearProperties() {
+ marshalledProperties = null;
+ properties = null;
+ }
+
+ public void setProperty(String name, Object value) throws IOException {
+ lazyCreateProperties();
+ properties.put(name, value);
+ }
+
+ protected void lazyCreateProperties() throws IOException {
+ if (properties == null) {
+ if (marshalledProperties == null) {
+ properties = new HashMap<String, Object>();
+ } else {
+ properties = unmarsallProperties(marshalledProperties);
+ marshalledProperties = null;
+ }
+ }
+ }
+
+ private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException {
+ return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)), MAX_PROPERTY_SIZE);
+ }
+
+ public void beforeMarshall(WireFormat wireFormat) throws IOException {
+ // Need to marshal the properties.
+ if (marshalledProperties == null && properties != null) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream os = new DataOutputStream(baos);
+ MarshallingSupport.marshalPrimitiveMap(properties, os);
+ os.close();
+ marshalledProperties = baos.toByteSequence();
+ }
+ }
+
+ public void afterMarshall(WireFormat wireFormat) throws IOException {
+ }
+
+ public void beforeUnmarshall(WireFormat wireFormat) throws IOException {
+ }
+
+ public void afterUnmarshall(WireFormat wireFormat) throws IOException {
+ }
+
+ public boolean isValid() {
+ return magic != null && Arrays.equals(magic, MAGIC);
+ }
+
+ public void setResponseRequired(boolean responseRequired) {
+ }
+
+ /**
+ * @throws IOException
+ */
+ public boolean isCacheEnabled() throws IOException {
+ return Boolean.TRUE == getProperty("CacheEnabled");
+ }
+
+ public void setCacheEnabled(boolean cacheEnabled) throws IOException {
+ setProperty("CacheEnabled", cacheEnabled ? Boolean.TRUE : Boolean.FALSE);
+ }
+
+ /**
+ * @throws IOException
+ */
+ public boolean isStackTraceEnabled() throws IOException {
+ return Boolean.TRUE == getProperty("StackTraceEnabled");
+ }
+
+ public void setStackTraceEnabled(boolean stackTraceEnabled) throws IOException {
+ setProperty("StackTraceEnabled", stackTraceEnabled ? Boolean.TRUE : Boolean.FALSE);
+ }
+
+ /**
+ * @throws IOException
+ */
+ public boolean isTcpNoDelayEnabled() throws IOException {
+ return Boolean.TRUE == getProperty("TcpNoDelayEnabled");
+ }
+
+ public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) throws IOException {
+ setProperty("TcpNoDelayEnabled", tcpNoDelayEnabled ? Boolean.TRUE : Boolean.FALSE);
+ }
+
+ /**
+ * @throws IOException
+ */
+ public boolean isSizePrefixDisabled() throws IOException {
+ return Boolean.TRUE == getProperty("SizePrefixDisabled");
+ }
+
+ public void setSizePrefixDisabled(boolean prefixPacketSize) throws IOException {
+ setProperty("SizePrefixDisabled", prefixPacketSize ? Boolean.TRUE : Boolean.FALSE);
+ }
+
+ /**
+ * @throws IOException
+ */
+ public boolean isTightEncodingEnabled() throws IOException {
+ return Boolean.TRUE == getProperty("TightEncodingEnabled");
+ }
+
+ public void setTightEncodingEnabled(boolean tightEncodingEnabled) throws IOException {
+ setProperty("TightEncodingEnabled", tightEncodingEnabled ? Boolean.TRUE : Boolean.FALSE);
+ }
+
+ /**
+ * @throws IOException
+ */
+ public long getMaxInactivityDuration() throws IOException {
+ Long l = (Long)getProperty("MaxInactivityDuration");
+ return l == null ? 0 : l.longValue();
+ }
+
+ public void setMaxInactivityDuration(long maxInactivityDuration) throws IOException {
+ setProperty("MaxInactivityDuration", new Long(maxInactivityDuration));
+ }
+
+ public long getMaxInactivityDurationInitalDelay() throws IOException {
+ Long l = (Long)getProperty("MaxInactivityDurationInitalDelay");
+ return l == null ? 0 : l.longValue();
+ }
+
+ public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) throws IOException {
+ setProperty("MaxInactivityDurationInitalDelay", new Long(maxInactivityDurationInitalDelay));
+ }
+
+
+
+ /**
+ * @throws IOException
+ */
+ public int getCacheSize() throws IOException {
+ Integer i = (Integer)getProperty("CacheSize");
+ return i == null ? 0 : i.intValue();
+ }
+
+ public void setCacheSize(int cacheSize) throws IOException {
+ setProperty("CacheSize", new Integer(cacheSize));
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ return visitor.processWireFormat(this);
+ }
+
+ public String toString() {
+ Map<String, Object> p = null;
+ try {
+ p = getProperties();
+ } catch (IOException ignore) {
+ }
+ return "WireFormatInfo { version=" + version + ", properties=" + p + ", magic=" + toString(magic) + "}";
+ }
+
+ private String toString(byte[] data) {
+ StringBuffer sb = new StringBuffer();
+ sb.append('[');
+ for (int i = 0; i < data.length; i++) {
+ if (i != 0) {
+ sb.append(',');
+ }
+ sb.append((char)data[i]);
+ }
+ sb.append(']');
+ return sb.toString();
+ }
+
+ // /////////////////////////////////////////////////////////////
+ //
+ // This are not implemented.
+ //
+ // /////////////////////////////////////////////////////////////
+
+ public void setCommandId(int value) {
+ }
+
+ public int getCommandId() {
+ return 0;
+ }
+
+ public boolean isResponseRequired() {
+ return false;
+ }
+
+ public boolean isResponse() {
+ return false;
+ }
+
+ public boolean isBrokerInfo() {
+ return false;
+ }
+
+ public boolean isMessageDispatch() {
+ return false;
+ }
+
+ public boolean isMessage() {
+ return false;
+ }
+
+ public boolean isMessageAck() {
+ return false;
+ }
+
+ public boolean isMessageDispatchNotification() {
+ return false;
+ }
+
+ public boolean isShutdownInfo() {
+ return false;
+ }
+
+ public void setCachedMarshalledForm(WireFormat wireFormat, ByteSequence data) {
+ }
+
+ public ByteSequence getCachedMarshalledForm(WireFormat wireFormat) {
+ return null;
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/WireFormatInfo.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/XATransactionId.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/XATransactionId.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/XATransactionId.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/XATransactionId.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import java.util.Arrays;
+import javax.transaction.xa.Xid;
+import org.apache.activemq.util.HexSupport;
+
+/**
+ * @openwire:marshaller code="112"
+ * @version $Revision: 1.6 $
+ */
+public class XATransactionId extends TransactionId implements Xid, Comparable {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_XA_TRANSACTION_ID;
+
+ private int formatId;
+ private byte[] branchQualifier;
+ private byte[] globalTransactionId;
+
+ private transient int hash;
+ private transient String transactionKey;
+
+ public XATransactionId() {
+ }
+
+ public XATransactionId(Xid xid) {
+ this.formatId = xid.getFormatId();
+ this.globalTransactionId = xid.getGlobalTransactionId();
+ this.branchQualifier = xid.getBranchQualifier();
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public synchronized String getTransactionKey() {
+ if (transactionKey == null) {
+ transactionKey = "XID:" + formatId + ":" + HexSupport.toHexFromBytes(globalTransactionId) + ":"
+ + HexSupport.toHexFromBytes(branchQualifier);
+ }
+ return transactionKey;
+ }
+
+ public String toString() {
+ return getTransactionKey();
+ }
+
+ public boolean isXATransaction() {
+ return true;
+ }
+
+ public boolean isLocalTransaction() {
+ return false;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public int getFormatId() {
+ return formatId;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public byte[] getGlobalTransactionId() {
+ return globalTransactionId;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public byte[] getBranchQualifier() {
+ return branchQualifier;
+ }
+
+ public void setBranchQualifier(byte[] branchQualifier) {
+ this.branchQualifier = branchQualifier;
+ this.hash = 0;
+ }
+
+ public void setFormatId(int formatId) {
+ this.formatId = formatId;
+ this.hash = 0;
+ }
+
+ public void setGlobalTransactionId(byte[] globalTransactionId) {
+ this.globalTransactionId = globalTransactionId;
+ this.hash = 0;
+ }
+
+ public int hashCode() {
+ if (hash == 0) {
+ hash = formatId;
+ hash = hash(globalTransactionId, hash);
+ hash = hash(branchQualifier, hash);
+ if (hash == 0) {
+ hash = 0xaceace;
+ }
+ }
+ return hash;
+ }
+
+ private static int hash(byte[] bytes, int hash) {
+ int size = bytes.length;
+ for (int i = 0; i < size; i++) {
+ hash ^= bytes[i] << ((i % 4) * 8);
+ }
+ return hash;
+ }
+
+ public boolean equals(Object o) {
+ if (o == null || o.getClass() != XATransactionId.class) {
+ return false;
+ }
+ XATransactionId xid = (XATransactionId)o;
+ return xid.formatId == formatId && Arrays.equals(xid.globalTransactionId, globalTransactionId)
+ && Arrays.equals(xid.branchQualifier, branchQualifier);
+ }
+
+ public int compareTo(Object o) {
+ if (o == null || o.getClass() != XATransactionId.class) {
+ return -1;
+ }
+ XATransactionId xid = (XATransactionId)o;
+ return getTransactionKey().compareTo(xid.getTransactionKey());
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/XATransactionId.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/package.html?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/package.html (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/package.html Mon Jun 1 18:37:41 2009
@@ -0,0 +1,27 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+<p>
+Command objects used via the Command Pattern to communicate among nodes
+</p>
+
+</body>
+</html>
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/package.html
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/filter/NoLocalExpression.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/filter/NoLocalExpression.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/filter/NoLocalExpression.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/filter/NoLocalExpression.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.filter;
+
+import org.apache.activemq.broker.openwire.OpenwireMessageEvaluationContext;
+import org.apache.activemq.command.Message;
+
+public class NoLocalExpression implements BooleanExpression {
+
+ private final String connectionId;
+
+ public NoLocalExpression(String connectionId) {
+ this.connectionId = connectionId;
+ }
+
+ public boolean matches(MessageEvaluationContext mec) {
+ Message message = ((OpenwireMessageEvaluationContext)mec).getMessage();
+ if (message.isDropped()) {
+ return false;
+ }
+ return !connectionId.equals(message.getMessageId().getProducerId().getConnectionId());
+ }
+
+ public Object evaluate(MessageEvaluationContext message) {
+ return matches(message) ? Boolean.TRUE : Boolean.FALSE;
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/filter/NoLocalExpression.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/BooleanStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/BooleanStream.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/BooleanStream.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/BooleanStream.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public final class BooleanStream {
+
+ byte data[] = new byte[48];
+ short arrayLimit;
+ short arrayPos;
+ byte bytePos;
+
+ public boolean readBoolean() throws IOException {
+ assert arrayPos <= arrayLimit;
+ byte b = data[arrayPos];
+ boolean rc = ((b >> bytePos) & 0x01) != 0;
+ bytePos++;
+ if (bytePos >= 8) {
+ bytePos = 0;
+ arrayPos++;
+ }
+ return rc;
+ }
+
+ public void writeBoolean(boolean value) throws IOException {
+ if (bytePos == 0) {
+ arrayLimit++;
+ if (arrayLimit >= data.length) {
+ // re-grow the array.
+ byte d[] = new byte[data.length * 2];
+ System.arraycopy(data, 0, d, 0, data.length);
+ data = d;
+ }
+ }
+ if (value) {
+ data[arrayPos] |= 0x01 << bytePos;
+ }
+ bytePos++;
+ if (bytePos >= 8) {
+ bytePos = 0;
+ arrayPos++;
+ }
+ }
+
+ public void marshal(DataOutput dataOut) throws IOException {
+ if (arrayLimit < 64) {
+ dataOut.writeByte(arrayLimit);
+ } else if (arrayLimit < 256) { // max value of unsigned byte
+ dataOut.writeByte(0xC0);
+ dataOut.writeByte(arrayLimit);
+ } else {
+ dataOut.writeByte(0x80);
+ dataOut.writeShort(arrayLimit);
+ }
+
+ dataOut.write(data, 0, arrayLimit);
+ clear();
+ }
+
+ public void marshal(ByteBuffer dataOut) {
+ if (arrayLimit < 64) {
+ dataOut.put((byte)arrayLimit);
+ } else if (arrayLimit < 256) { // max value of unsigned byte
+ dataOut.put((byte)0xC0);
+ dataOut.put((byte)arrayLimit);
+ } else {
+ dataOut.put((byte)0x80);
+ dataOut.putShort(arrayLimit);
+ }
+
+ dataOut.put(data, 0, arrayLimit);
+ }
+
+ public void unmarshal(DataInput dataIn) throws IOException {
+
+ arrayLimit = (short)(dataIn.readByte() & 0xFF);
+ if (arrayLimit == 0xC0) {
+ arrayLimit = (short)(dataIn.readByte() & 0xFF);
+ } else if (arrayLimit == 0x80) {
+ arrayLimit = dataIn.readShort();
+ }
+ if (data.length < arrayLimit) {
+ data = new byte[arrayLimit];
+ }
+ dataIn.readFully(data, 0, arrayLimit);
+ clear();
+ }
+
+ public void clear() {
+ arrayPos = 0;
+ bytePos = 0;
+ }
+
+ public int marshalledSize() {
+ if (arrayLimit < 64) {
+ return 1 + arrayLimit;
+ } else if (arrayLimit < 256) {
+ return 2 + arrayLimit;
+ } else {
+ return 3 + arrayLimit;
+ }
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/BooleanStream.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/CommandIdComparator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/CommandIdComparator.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/CommandIdComparator.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/CommandIdComparator.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire;
+
+import java.util.Comparator;
+
+import org.apache.activemq.command.Command;
+
+/**
+ * A @{link Comparator} of commands using their {@link Command#getCommandId()}
+ *
+ * @version $Revision: 564814 $
+ */
+public class CommandIdComparator implements Comparator<Command> {
+
+ public int compare(Command c1, Command c2) {
+ return c1.getCommandId() - c2.getCommandId();
+ }
+
+}
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/DataStreamMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/DataStreamMarshaller.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/DataStreamMarshaller.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/DataStreamMarshaller.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.activemq.command.DataStructure;
+
+public interface DataStreamMarshaller {
+
+ byte getDataStructureType();
+ DataStructure createObject();
+
+ int tightMarshal1(OpenWireFormat format, Object c, BooleanStream bs) throws IOException;
+ void tightMarshal2(OpenWireFormat format, Object c, DataOutput ds, BooleanStream bs) throws IOException;
+ void tightUnmarshal(OpenWireFormat format, Object data, DataInput dis, BooleanStream bs) throws IOException;
+
+ void looseMarshal(OpenWireFormat format, Object c, DataOutput ds) throws IOException;
+ void looseUnmarshal(OpenWireFormat format, Object data, DataInput dis) throws IOException;
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/DataStreamMarshaller.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,659 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.command.CommandTypes;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.ByteSequenceData;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ *
+ * @version $Revision$
+ */
+public final class OpenWireFormat implements WireFormat {
+
+ public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_VERSION;
+
+ static final byte NULL_TYPE = CommandTypes.NULL;
+ private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2;
+ private static final int MARSHAL_CACHE_FREE_SPACE = 100;
+
+ private DataStreamMarshaller dataMarshallers[];
+ private int version;
+ private boolean stackTraceEnabled;
+ private boolean tcpNoDelayEnabled;
+ private boolean cacheEnabled;
+ private boolean tightEncodingEnabled;
+ private boolean sizePrefixDisabled;
+
+ // The following fields are used for value caching
+ private short nextMarshallCacheIndex;
+ private short nextMarshallCacheEvictionIndex;
+ private Map<DataStructure, Short> marshallCacheMap = new HashMap<DataStructure, Short>();
+ private DataStructure marshallCache[] = new DataStructure[MARSHAL_CACHE_SIZE];
+ private DataStructure unmarshallCache[] = new DataStructure[MARSHAL_CACHE_SIZE];
+ private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
+ private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream();
+ private WireFormatInfo preferedWireFormatInfo;
+
+ private AtomicBoolean receivingMessage = new AtomicBoolean(false);
+
+ public OpenWireFormat() {
+ this(DEFAULT_VERSION);
+ }
+
+ public OpenWireFormat(int i) {
+ setVersion(i);
+ }
+
+ public int hashCode() {
+ return version ^ (cacheEnabled ? 0x10000000 : 0x20000000)
+ ^ (stackTraceEnabled ? 0x01000000 : 0x02000000)
+ ^ (tightEncodingEnabled ? 0x00100000 : 0x00200000)
+ ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000);
+ }
+
+ public OpenWireFormat copy() {
+ OpenWireFormat answer = new OpenWireFormat();
+ answer.version = version;
+ answer.stackTraceEnabled = stackTraceEnabled;
+ answer.tcpNoDelayEnabled = tcpNoDelayEnabled;
+ answer.cacheEnabled = cacheEnabled;
+ answer.tightEncodingEnabled = tightEncodingEnabled;
+ answer.sizePrefixDisabled = sizePrefixDisabled;
+ answer.preferedWireFormatInfo = preferedWireFormatInfo;
+ return answer;
+ }
+
+ public boolean equals(Object object) {
+ if (object == null) {
+ return false;
+ }
+ OpenWireFormat o = (OpenWireFormat)object;
+ return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled
+ && o.version == version && o.tightEncodingEnabled == tightEncodingEnabled
+ && o.sizePrefixDisabled == sizePrefixDisabled;
+ }
+
+
+ public String toString() {
+ return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled="
+ + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + "}";
+ // return "OpenWireFormat{id="+id+",
+ // tightEncodingEnabled="+tightEncodingEnabled+"}";
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public synchronized ByteSequence marshal(Object command) throws IOException {
+
+ if (cacheEnabled) {
+ runMarshallCacheEvictionSweep();
+ }
+
+// MarshallAware ma = null;
+// // If not using value caching, then the marshaled form is always the
+// // same
+// if (!cacheEnabled && ((DataStructure)command).isMarshallAware()) {
+// ma = (MarshallAware)command;
+// }
+
+ ByteSequence sequence = null;
+ // if( ma!=null ) {
+ // sequence = ma.getCachedMarshalledForm(this);
+ // }
+
+ if (sequence == null) {
+
+ int size = 1;
+ if (command != null) {
+
+ DataStructure c = (DataStructure)command;
+ byte type = c.getDataStructureType();
+ DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+ if (dsm == null) {
+ throw new IOException("Unknown data type: " + type);
+ }
+ if (tightEncodingEnabled) {
+
+ BooleanStream bs = new BooleanStream();
+ size += dsm.tightMarshal1(this, c, bs);
+ size += bs.marshalledSize();
+
+ bytesOut.restart(size);
+ if (!sizePrefixDisabled) {
+ bytesOut.writeInt(size);
+ }
+ bytesOut.writeByte(type);
+ bs.marshal(bytesOut);
+ dsm.tightMarshal2(this, c, bytesOut, bs);
+ sequence = bytesOut.toByteSequence();
+
+ } else {
+ bytesOut.restart();
+ if (!sizePrefixDisabled) {
+ bytesOut.writeInt(0); // we don't know the final size
+ // yet but write this here for
+ // now.
+ }
+ bytesOut.writeByte(type);
+ dsm.looseMarshal(this, c, bytesOut);
+ sequence = bytesOut.toByteSequence();
+
+ if (!sizePrefixDisabled) {
+ size = sequence.getLength() - 4;
+ int pos = sequence.offset;
+ ByteSequenceData.writeIntBig(sequence, size);
+ sequence.offset = pos;
+ }
+ }
+
+ } else {
+ bytesOut.restart(5);
+ bytesOut.writeInt(size);
+ bytesOut.writeByte(NULL_TYPE);
+ sequence = bytesOut.toByteSequence();
+ }
+
+ // if( ma!=null ) {
+ // ma.setCachedMarshalledForm(this, sequence);
+ // }
+ }
+ return sequence;
+ }
+
+ public synchronized Object unmarshal(ByteSequence sequence) throws IOException {
+ bytesIn.restart(sequence);
+ // DataInputStream dis = new DataInputStream(new
+ // ByteArrayInputStream(sequence));
+
+ if (!sizePrefixDisabled) {
+ int size = bytesIn.readInt();
+ if (sequence.getLength() - 4 != size) {
+ // throw new IOException("Packet size does not match marshaled
+ // size");
+ }
+ }
+
+ Object command = doUnmarshal(bytesIn);
+ // if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) {
+ // ((MarshallAware) command).setCachedMarshalledForm(this, sequence);
+ // }
+ return command;
+ }
+
+ public synchronized void marshal(Object o, DataOutput dataOut) throws IOException {
+
+ if (cacheEnabled) {
+ runMarshallCacheEvictionSweep();
+ }
+
+ int size = 1;
+ if (o != null) {
+
+ DataStructure c = (DataStructure)o;
+ byte type = c.getDataStructureType();
+ DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+ if (dsm == null) {
+ throw new IOException("Unknown data type: " + type);
+ }
+ if (tightEncodingEnabled) {
+ BooleanStream bs = new BooleanStream();
+ size += dsm.tightMarshal1(this, c, bs);
+ size += bs.marshalledSize();
+
+ if (!sizePrefixDisabled) {
+ dataOut.writeInt(size);
+ }
+
+ dataOut.writeByte(type);
+ bs.marshal(dataOut);
+ dsm.tightMarshal2(this, c, dataOut, bs);
+
+ } else {
+ DataOutput looseOut = dataOut;
+
+ if (!sizePrefixDisabled) {
+ bytesOut.restart();
+ looseOut = bytesOut;
+ }
+
+ looseOut.writeByte(type);
+ dsm.looseMarshal(this, c, looseOut);
+
+ if (!sizePrefixDisabled) {
+ ByteSequence sequence = bytesOut.toByteSequence();
+ dataOut.writeInt(sequence.getLength());
+ dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
+ }
+
+ }
+
+ } else {
+ if (!sizePrefixDisabled) {
+ dataOut.writeInt(size);
+ }
+ dataOut.writeByte(NULL_TYPE);
+ }
+ }
+
+ public Object unmarshal(DataInput dis) throws IOException {
+ DataInput dataIn = dis;
+ if (!sizePrefixDisabled) {
+ dis.readInt();
+ // int size = dis.readInt();
+ // byte[] data = new byte[size];
+ // dis.readFully(data);
+ // bytesIn.restart(data);
+ // dataIn = bytesIn;
+ }
+ return doUnmarshal(dataIn);
+ }
+
+ /**
+ * Used by NIO or AIO transports
+ */
+ public int tightMarshal1(Object o, BooleanStream bs) throws IOException {
+ int size = 1;
+ if (o != null) {
+ DataStructure c = (DataStructure)o;
+ byte type = c.getDataStructureType();
+ DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+ if (dsm == null) {
+ throw new IOException("Unknown data type: " + type);
+ }
+
+ size += dsm.tightMarshal1(this, c, bs);
+ size += bs.marshalledSize();
+ }
+ return size;
+ }
+
+ /**
+ * Used by NIO or AIO transports; note that the size is not written as part
+ * of this method.
+ */
+ public void tightMarshal2(Object o, DataOutput ds, BooleanStream bs) throws IOException {
+ if (cacheEnabled) {
+ runMarshallCacheEvictionSweep();
+ }
+
+ if (o != null) {
+ DataStructure c = (DataStructure)o;
+ byte type = c.getDataStructureType();
+ DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+ if (dsm == null) {
+ throw new IOException("Unknown data type: " + type);
+ }
+ ds.writeByte(type);
+ bs.marshal(ds);
+ dsm.tightMarshal2(this, c, ds, bs);
+ }
+ }
+
+ /**
+ * Allows you to dynamically switch the version of the openwire protocol
+ * being used.
+ *
+ * @param version
+ */
+ public void setVersion(int version) {
+ String mfName = "org.apache.activemq.openwire.v" + version + ".MarshallerFactory";
+ Class mfClass;
+ try {
+ mfClass = Class.forName(mfName, false, getClass().getClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw (IllegalArgumentException)new IllegalArgumentException("Invalid version: " + version
+ + ", could not load " + mfName)
+ .initCause(e);
+ }
+ try {
+ Method method = mfClass.getMethod("createMarshallerMap", new Class[] {OpenWireFormat.class});
+ dataMarshallers = (DataStreamMarshaller[])method.invoke(null, new Object[] {this});
+ } catch (Throwable e) {
+ throw (IllegalArgumentException)new IllegalArgumentException(
+ "Invalid version: "
+ + version
+ + ", "
+ + mfName
+ + " does not properly implement the createMarshallerMap method.")
+ .initCause(e);
+ }
+ this.version = version;
+ }
+
+ public Object doUnmarshal(DataInput dis) throws IOException {
+ byte dataType = dis.readByte();
+ receivingMessage.set(true);
+ if (dataType != NULL_TYPE) {
+ DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
+ if (dsm == null) {
+ throw new IOException("Unknown data type: " + dataType);
+ }
+ Object data = dsm.createObject();
+ if (this.tightEncodingEnabled) {
+ BooleanStream bs = new BooleanStream();
+ bs.unmarshal(dis);
+ dsm.tightUnmarshal(this, data, dis, bs);
+ } else {
+ dsm.looseUnmarshal(this, data, dis);
+ }
+ receivingMessage.set(false);
+ return data;
+ } else {
+ receivingMessage.set(false);
+ return null;
+ }
+ }
+
+ // public void debug(String msg) {
+ // String t = (Thread.currentThread().getName()+" ").substring(0, 40);
+ // System.out.println(t+": "+msg);
+ // }
+ public int tightMarshalNestedObject1(DataStructure o, BooleanStream bs) throws IOException {
+ bs.writeBoolean(o != null);
+ if (o == null) {
+ return 0;
+ }
+
+ if (o.isMarshallAware()) {
+ // MarshallAware ma = (MarshallAware)o;
+ ByteSequence sequence = null;
+ // sequence=ma.getCachedMarshalledForm(this);
+ bs.writeBoolean(sequence != null);
+ if (sequence != null) {
+ return 1 + sequence.getLength();
+ }
+ }
+
+ byte type = o.getDataStructureType();
+ DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+ if (dsm == null) {
+ throw new IOException("Unknown data type: " + type);
+ }
+ return 1 + dsm.tightMarshal1(this, o, bs);
+ }
+
+ public void tightMarshalNestedObject2(DataStructure o, DataOutput ds, BooleanStream bs)
+ throws IOException {
+ if (!bs.readBoolean()) {
+ return;
+ }
+
+ byte type = o.getDataStructureType();
+ ds.writeByte(type);
+
+ if (o.isMarshallAware() && bs.readBoolean()) {
+
+ // We should not be doing any caching
+ throw new IOException("Corrupted stream");
+ // MarshallAware ma = (MarshallAware) o;
+ // ByteSequence sequence=ma.getCachedMarshalledForm(this);
+ // ds.write(sequence.getData(), sequence.getOffset(),
+ // sequence.getLength());
+
+ } else {
+
+ DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+ if (dsm == null) {
+ throw new IOException("Unknown data type: " + type);
+ }
+ dsm.tightMarshal2(this, o, ds, bs);
+
+ }
+ }
+
+ public DataStructure tightUnmarshalNestedObject(DataInput dis, BooleanStream bs) throws IOException {
+ if (bs.readBoolean()) {
+
+ byte dataType = dis.readByte();
+ DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
+ if (dsm == null) {
+ throw new IOException("Unknown data type: " + dataType);
+ }
+ DataStructure data = dsm.createObject();
+
+ if (data.isMarshallAware() && bs.readBoolean()) {
+
+ dis.readInt();
+ dis.readByte();
+
+ BooleanStream bs2 = new BooleanStream();
+ bs2.unmarshal(dis);
+ dsm.tightUnmarshal(this, data, dis, bs2);
+
+ // TODO: extract the sequence from the dis and associate it.
+ // MarshallAware ma = (MarshallAware)data
+ // ma.setCachedMarshalledForm(this, sequence);
+
+ } else {
+ dsm.tightUnmarshal(this, data, dis, bs);
+ }
+
+ return data;
+ } else {
+ return null;
+ }
+ }
+
+ public DataStructure looseUnmarshalNestedObject(DataInput dis) throws IOException {
+ if (dis.readBoolean()) {
+
+ byte dataType = dis.readByte();
+ DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
+ if (dsm == null) {
+ throw new IOException("Unknown data type: " + dataType);
+ }
+ DataStructure data = dsm.createObject();
+ dsm.looseUnmarshal(this, data, dis);
+ return data;
+
+ } else {
+ return null;
+ }
+ }
+
+ public void looseMarshalNestedObject(DataStructure o, DataOutput dataOut) throws IOException {
+ dataOut.writeBoolean(o != null);
+ if (o != null) {
+ byte type = o.getDataStructureType();
+ dataOut.writeByte(type);
+ DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+ if (dsm == null) {
+ throw new IOException("Unknown data type: " + type);
+ }
+ dsm.looseMarshal(this, o, dataOut);
+ }
+ }
+
+ public void runMarshallCacheEvictionSweep() {
+ // Do we need to start evicting??
+ while (marshallCacheMap.size() > marshallCache.length - MARSHAL_CACHE_FREE_SPACE) {
+
+ marshallCacheMap.remove(marshallCache[nextMarshallCacheEvictionIndex]);
+ marshallCache[nextMarshallCacheEvictionIndex] = null;
+
+ nextMarshallCacheEvictionIndex++;
+ if (nextMarshallCacheEvictionIndex >= marshallCache.length) {
+ nextMarshallCacheEvictionIndex = 0;
+ }
+
+ }
+ }
+
+ public Short getMarshallCacheIndex(DataStructure o) {
+ return marshallCacheMap.get(o);
+ }
+
+ public Short addToMarshallCache(DataStructure o) {
+ short i = nextMarshallCacheIndex++;
+ if (nextMarshallCacheIndex >= marshallCache.length) {
+ nextMarshallCacheIndex = 0;
+ }
+
+ // We can only cache that item if there is space left.
+ if (marshallCacheMap.size() < marshallCache.length) {
+ marshallCache[i] = o;
+ Short index = new Short(i);
+ marshallCacheMap.put(o, index);
+ return index;
+ } else {
+ // Use -1 to indicate that the value was not cached due to cache
+ // being full.
+ return new Short((short)-1);
+ }
+ }
+
+ public void setInUnmarshallCache(short index, DataStructure o) {
+
+ // There was no space left in the cache, so we can't
+ // put this in the cache.
+ if (index == -1) {
+ return;
+ }
+
+ unmarshallCache[index] = o;
+ }
+
+ public DataStructure getFromUnmarshallCache(short index) {
+ return unmarshallCache[index];
+ }
+
+ public void setStackTraceEnabled(boolean b) {
+ stackTraceEnabled = b;
+ }
+
+ public boolean isStackTraceEnabled() {
+ return stackTraceEnabled;
+ }
+
+ public boolean isTcpNoDelayEnabled() {
+ return tcpNoDelayEnabled;
+ }
+
+ public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) {
+ this.tcpNoDelayEnabled = tcpNoDelayEnabled;
+ }
+
+ public boolean isCacheEnabled() {
+ return cacheEnabled;
+ }
+
+ public void setCacheEnabled(boolean cacheEnabled) {
+ this.cacheEnabled = cacheEnabled;
+ }
+
+ public boolean isTightEncodingEnabled() {
+ return tightEncodingEnabled;
+ }
+
+ public void setTightEncodingEnabled(boolean tightEncodingEnabled) {
+ this.tightEncodingEnabled = tightEncodingEnabled;
+ }
+
+ public boolean isSizePrefixDisabled() {
+ return sizePrefixDisabled;
+ }
+
+ public void setSizePrefixDisabled(boolean prefixPacketSize) {
+ this.sizePrefixDisabled = prefixPacketSize;
+ }
+
+ public void setPreferedWireFormatInfo(WireFormatInfo info) {
+ this.preferedWireFormatInfo = info;
+ }
+
+ public WireFormatInfo getPreferedWireFormatInfo() {
+ return preferedWireFormatInfo;
+ }
+
+ public boolean inReceive() {
+ return receivingMessage.get();
+ }
+
+ public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
+
+ if (preferedWireFormatInfo == null) {
+ throw new IllegalStateException("Wireformat cannot not be renegotiated.");
+ }
+
+ this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()));
+ info.setVersion(this.getVersion());
+
+ this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled();
+ info.setStackTraceEnabled(this.stackTraceEnabled);
+
+ this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled();
+ info.setTcpNoDelayEnabled(this.tcpNoDelayEnabled);
+
+ this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled();
+ info.setCacheEnabled(this.cacheEnabled);
+
+ this.tightEncodingEnabled = info.isTightEncodingEnabled()
+ && preferedWireFormatInfo.isTightEncodingEnabled();
+ info.setTightEncodingEnabled(this.tightEncodingEnabled);
+
+ this.sizePrefixDisabled = info.isSizePrefixDisabled()
+ && preferedWireFormatInfo.isSizePrefixDisabled();
+ info.setSizePrefixDisabled(this.sizePrefixDisabled);
+
+ if (cacheEnabled) {
+
+ int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize());
+ info.setCacheSize(size);
+
+ if (size == 0) {
+ size = MARSHAL_CACHE_SIZE;
+ }
+
+ marshallCache = new DataStructure[size];
+ unmarshallCache = new DataStructure[size];
+ nextMarshallCacheIndex = 0;
+ nextMarshallCacheEvictionIndex = 0;
+ marshallCacheMap = new HashMap<DataStructure, Short>();
+ } else {
+ marshallCache = null;
+ unmarshallCache = null;
+ nextMarshallCacheIndex = 0;
+ nextMarshallCacheEvictionIndex = 0;
+ marshallCacheMap = null;
+ }
+
+ }
+
+ protected int min(int version1, int version2) {
+ if (version1 < version2 && version1 > 0 || version2 <= 0) {
+ return version1;
+ }
+ return version2;
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire;
+
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+/**
+ * @version $Revision$
+ */
+public class OpenWireFormatFactory implements WireFormatFactory {
+
+ //
+ // The default values here are what the wire format changes to after a
+ // default negotiation.
+ //
+
+ private int version = OpenWireFormat.DEFAULT_VERSION;
+ private boolean stackTraceEnabled = true;
+ private boolean tcpNoDelayEnabled = true;
+ private boolean cacheEnabled = true;
+ private boolean tightEncodingEnabled = true;
+ private boolean sizePrefixDisabled;
+ private long maxInactivityDuration = 30*1000;
+ private long maxInactivityDurationInitalDelay = 10*1000;
+ private int cacheSize = 1024;
+
+ public WireFormat createWireFormat() {
+ WireFormatInfo info = new WireFormatInfo();
+ info.setVersion(version);
+
+ try {
+ info.setStackTraceEnabled(stackTraceEnabled);
+ info.setCacheEnabled(cacheEnabled);
+ info.setTcpNoDelayEnabled(tcpNoDelayEnabled);
+ info.setTightEncodingEnabled(tightEncodingEnabled);
+ info.setSizePrefixDisabled(sizePrefixDisabled);
+ info.setMaxInactivityDuration(maxInactivityDuration);
+ info.setMaxInactivityDurationInitalDelay(maxInactivityDurationInitalDelay);
+ info.setCacheSize(cacheSize);
+ } catch (Exception e) {
+ IllegalStateException ise = new IllegalStateException("Could not configure WireFormatInfo");
+ ise.initCause(e);
+ throw ise;
+ }
+
+ OpenWireFormat f = new OpenWireFormat();
+ f.setPreferedWireFormatInfo(info);
+ return f;
+ }
+
+ public boolean isStackTraceEnabled() {
+ return stackTraceEnabled;
+ }
+
+ public void setStackTraceEnabled(boolean stackTraceEnabled) {
+ this.stackTraceEnabled = stackTraceEnabled;
+ }
+
+ public boolean isTcpNoDelayEnabled() {
+ return tcpNoDelayEnabled;
+ }
+
+ public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) {
+ this.tcpNoDelayEnabled = tcpNoDelayEnabled;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ public boolean isCacheEnabled() {
+ return cacheEnabled;
+ }
+
+ public void setCacheEnabled(boolean cacheEnabled) {
+ this.cacheEnabled = cacheEnabled;
+ }
+
+ public boolean isTightEncodingEnabled() {
+ return tightEncodingEnabled;
+ }
+
+ public void setTightEncodingEnabled(boolean tightEncodingEnabled) {
+ this.tightEncodingEnabled = tightEncodingEnabled;
+ }
+
+ public boolean isSizePrefixDisabled() {
+ return sizePrefixDisabled;
+ }
+
+ public void setSizePrefixDisabled(boolean sizePrefixDisabled) {
+ this.sizePrefixDisabled = sizePrefixDisabled;
+ }
+
+ public long getMaxInactivityDuration() {
+ return maxInactivityDuration;
+ }
+
+ public void setMaxInactivityDuration(long maxInactivityDuration) {
+ this.maxInactivityDuration = maxInactivityDuration;
+ }
+
+ public int getCacheSize() {
+ return cacheSize;
+ }
+
+ public void setCacheSize(int cacheSize) {
+ this.cacheSize = cacheSize;
+ }
+
+ public long getMaxInactivityDurationInitalDelay() {
+ return maxInactivityDurationInitalDelay;
+ }
+
+ public void setMaxInactivityDurationInitalDelay(
+ long maxInactivityDurationInitalDelay) {
+ this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
------------------------------------------------------------------------------
svn:executable = *