You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/04/27 19:33:09 UTC
svn commit: r1097189 [41/42] - in /activemq/activemq-apollo/trunk: ./
apollo-openwire/ apollo-openwire/src/ apollo-openwire/src/main/
apollo-openwire/src/main/resources/
apollo-openwire/src/main/resources/META-INF/
apollo-openwire/src/main/resources/ME...
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/package.html?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/package.html (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/package.html Wed Apr 27 17:32:51 2011
@@ -0,0 +1,25 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Support for JMS Advisory messages as well as some helper listeners to listen to the clients, producers and consumers available.
+
+</body>
+</html>
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/package.html
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobTransferPolicy.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobTransferPolicy.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobTransferPolicy.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobTransferPolicy.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support.blob;
+
+/**
+ * The policy for configuring how BLOBs (Binary Large OBjects) are transferred
+ * out of band between producers, brokers and consumers.
+ *
+ * @version $Revision: $
+ */
+public class BlobTransferPolicy {
+ private String defaultUploadUrl = "http://localhost:8080/uploads/";
+ private String brokerUploadUrl;
+ private String uploadUrl;
+ private int bufferSize = 128 * 1024;
+ private org.apache.activemq.apollo.openwire.support.blob.BlobUploadStrategy uploadStrategy;
+
+ /**
+ * Returns a copy of this policy object
+ */
+ public BlobTransferPolicy copy() {
+ BlobTransferPolicy that = new BlobTransferPolicy();
+ that.defaultUploadUrl = this.defaultUploadUrl;
+ that.brokerUploadUrl = this.brokerUploadUrl;
+ that.uploadUrl = this.uploadUrl;
+ that.uploadStrategy = this.uploadStrategy;
+ return that;
+ }
+
+ public String getUploadUrl() {
+ if (uploadUrl == null) {
+ uploadUrl = getBrokerUploadUrl();
+ if (uploadUrl == null) {
+ uploadUrl = getDefaultUploadUrl();
+ }
+ }
+ return uploadUrl;
+ }
+
+ /**
+ * Sets the upload URL to use explicitly on the client which will
+ * overload the default or the broker's URL. This allows the client to decide
+ * where to upload files to irrespective of the brokers configuration.
+ */
+ public void setUploadUrl(String uploadUrl) {
+ this.uploadUrl = uploadUrl;
+ }
+
+ public String getBrokerUploadUrl() {
+ return brokerUploadUrl;
+ }
+
+ /**
+ * Called by the JMS client when a broker advertises its upload URL
+ */
+ public void setBrokerUploadUrl(String brokerUploadUrl) {
+ this.brokerUploadUrl = brokerUploadUrl;
+ }
+
+ public String getDefaultUploadUrl() {
+ return defaultUploadUrl;
+ }
+
+ /**
+ * Sets the default upload URL to use if the broker does not
+ * have a configured upload URL
+ */
+ public void setDefaultUploadUrl(String defaultUploadUrl) {
+ this.defaultUploadUrl = defaultUploadUrl;
+ }
+
+ public org.apache.activemq.apollo.openwire.support.blob.BlobUploadStrategy getUploadStrategy() {
+ if (uploadStrategy == null) {
+ uploadStrategy = createUploadStrategy();
+ }
+ return uploadStrategy;
+ }
+
+ /**
+ * Sets the upload strategy to use for uploading BLOBs to some URL
+ */
+ public void setUploadStrategy(org.apache.activemq.apollo.openwire.support.blob.BlobUploadStrategy uploadStrategy) {
+ this.uploadStrategy = uploadStrategy;
+ }
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ /**
+ * Sets the default buffer size used when uploading or downloading files
+ */
+ public void setBufferSize(int bufferSize) {
+ this.bufferSize = bufferSize;
+ }
+
+ protected org.apache.activemq.apollo.openwire.support.blob.BlobUploadStrategy createUploadStrategy() {
+ return new org.apache.activemq.apollo.openwire.support.blob.DefaultBlobUploadStrategy(this);
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobUploadStrategy.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobUploadStrategy.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobUploadStrategy.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobUploadStrategy.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support.blob;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+
+import org.apache.activemq.apollo.openwire.command.ActiveMQBlobMessage;
+import org.apache.activemq.apollo.openwire.support.OpenwireException;
+
+/**
+ * Represents a strategy of uploading a file/stream to some remote
+ *
+ * @version $Revision: $
+ */
+public interface BlobUploadStrategy {
+
+ URL uploadFile(ActiveMQBlobMessage message, File file) throws OpenwireException, IOException;
+
+ URL uploadStream(ActiveMQBlobMessage message, InputStream in) throws OpenwireException, IOException;
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobUploader.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobUploader.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobUploader.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobUploader.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support.blob;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+
+import org.apache.activemq.apollo.openwire.command.ActiveMQBlobMessage;
+import org.apache.activemq.apollo.openwire.support.OpenwireException;
+
+/**
+ * A helper class to represent a required upload of a BLOB to some remote URL
+ *
+ * @version $Revision: $
+ */
+public class BlobUploader {
+
+ private BlobTransferPolicy blobTransferPolicy;
+ private File file;
+ private InputStream in;
+
+ public BlobUploader(BlobTransferPolicy blobTransferPolicy, InputStream in) {
+ this.blobTransferPolicy = blobTransferPolicy;
+ this.in = in;
+ }
+
+ public BlobUploader(BlobTransferPolicy blobTransferPolicy, File file) {
+ this.blobTransferPolicy = blobTransferPolicy;
+ this.file = file;
+ }
+
+ public URL upload(ActiveMQBlobMessage message) throws OpenwireException, IOException {
+ if (file != null) {
+ return getStrategy().uploadFile(message, file);
+ } else {
+ return getStrategy().uploadStream(message, in);
+ }
+ }
+
+ public BlobTransferPolicy getBlobTransferPolicy() {
+ return blobTransferPolicy;
+ }
+
+ public BlobUploadStrategy getStrategy() {
+ return getBlobTransferPolicy().getUploadStrategy();
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/DefaultBlobUploadStrategy.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/DefaultBlobUploadStrategy.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/DefaultBlobUploadStrategy.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/DefaultBlobUploadStrategy.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support.blob;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import org.apache.activemq.apollo.openwire.command.ActiveMQBlobMessage;
+import org.apache.activemq.apollo.openwire.support.OpenwireException;
+
+/**
+ * A default implementation of {@link BlobUploadStrategy} which uses the URL
+ * class to upload files or streams to a remote URL
+ */
+public class DefaultBlobUploadStrategy implements BlobUploadStrategy {
+ private BlobTransferPolicy transferPolicy;
+
+ public DefaultBlobUploadStrategy(BlobTransferPolicy transferPolicy) {
+ this.transferPolicy = transferPolicy;
+ }
+
+ public URL uploadFile(ActiveMQBlobMessage message, File file) throws OpenwireException, IOException {
+ return uploadStream(message, new FileInputStream(file));
+ }
+
+ public URL uploadStream(ActiveMQBlobMessage message, InputStream fis) throws OpenwireException, IOException {
+ URL url = createUploadURL(message);
+
+ HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+ connection.setRequestMethod("PUT");
+ connection.setDoOutput(true);
+
+ // use chunked mode or otherwise URLConnection loads everything into
+ // memory
+ // (chunked mode not supported before JRE 1.5)
+ connection.setChunkedStreamingMode(transferPolicy.getBufferSize());
+
+ OutputStream os = connection.getOutputStream();
+
+ byte[] buf = new byte[transferPolicy.getBufferSize()];
+ for (int c = fis.read(buf); c != -1; c = fis.read(buf)) {
+ os.write(buf, 0, c);
+ os.flush();
+ }
+ os.close();
+ fis.close();
+
+ if (!isSuccessfulCode(connection.getResponseCode())) {
+ throw new IOException("PUT was not successful: " + connection.getResponseCode() + " "
+ + connection.getResponseMessage());
+ }
+
+ return url;
+ }
+
+ public void deleteFile(ActiveMQBlobMessage message) throws IOException, OpenwireException {
+ URL url = createUploadURL(message);
+
+ HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+ connection.setRequestMethod("DELETE");
+ connection.connect();
+ connection.disconnect();
+
+ if (!isSuccessfulCode(connection.getResponseCode())) {
+ throw new IOException("DELETE was not successful: " + connection.getResponseCode() + " "
+ + connection.getResponseMessage());
+ }
+ }
+
+ private boolean isSuccessfulCode(int responseCode) {
+ return responseCode >= 200 && responseCode < 300; // 2xx => successful
+ }
+
+ protected URL createUploadURL(ActiveMQBlobMessage message) throws OpenwireException, MalformedURLException {
+ return new URL(transferPolicy.getUploadUrl() + message.getMessageId().toString());
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/package.html?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/package.html (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/package.html Wed Apr 27 17:32:51 2011
@@ -0,0 +1,25 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Helper classes for dealing with out-of-band BLOB objects
+
+</body>
+</html>
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenWireMessageDelivery.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenWireMessageDelivery.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenWireMessageDelivery.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support.broker.openwire;
+
+import java.io.IOException;
+
+import org.apache.activemq.apollo.dto.DestinationDTO;
+import org.apache.activemq.apollo.openwire.command.Message;
+import org.apache.activemq.apollo.openwire.codec.OpenWireFormat;
+import org.fusesource.hawtbuf.AsciiBuffer;
+import org.fusesource.hawtbuf.Buffer;
+
+public class OpenWireMessageDelivery {
+
+ static final private AsciiBuffer ENCODING = new AsciiBuffer("openwire");
+
+ private final Message message;
+ private AsciiBuffer producerId;
+ private OpenWireFormat storeWireFormat;
+ private PersistListener persistListener = null;
+ private final int size;
+
+ public interface PersistListener {
+ public void onMessagePersisted(OpenWireMessageDelivery delivery);
+ }
+
+ public OpenWireMessageDelivery(Message message) {
+ this.message = message;
+ this.size = message.getSize();
+ }
+
+ public void setPersistListener(PersistListener listener) {
+ persistListener = listener;
+ }
+
+ public DestinationDTO[] getDestination() {
+ return message.getDestination().toDestination();
+ }
+
+ public int getMemorySize() {
+ return size;
+ }
+
+ public int getPriority() {
+ return message.getPriority();
+ }
+
+ public AsciiBuffer getMsgId() {
+ return new AsciiBuffer(message.getMessageId().toString());
+ }
+
+ public AsciiBuffer getProducerId() {
+ if (producerId == null) {
+ producerId = new AsciiBuffer(message.getProducerId().toString());
+ }
+ return producerId;
+ }
+
+ public Message getMessage() {
+ return message;
+ }
+
+ public <T> T asType(Class<T> type) {
+ if (type == Message.class) {
+ return type.cast(message);
+ }
+ // TODO: is this right?
+ if (message.getClass().isAssignableFrom(type)) {
+ return type.cast(message);
+ }
+ return null;
+ }
+
+ public boolean isPersistent() {
+ return message.isPersistent();
+ }
+
+ public final void onMessagePersisted() {
+ if (persistListener != null) {
+ persistListener.onMessagePersisted(this);
+ persistListener = null;
+ }
+ }
+
+ public final boolean isResponseRequired() {
+ return message.isResponseRequired();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.broker.MessageDelivery#getTTE()
+ */
+ public long getExpiration() {
+ return message.getExpiration();
+ }
+
+// public MessageEvaluationContext createMessageEvaluationContext() {
+// return new OpenwireMessageEvaluationContext(message);
+// }
+
+ public String toString() {
+ return message.getMessageId().toString();
+ }
+
+ public AsciiBuffer getStoreEncoding() {
+ return ENCODING;
+ }
+
+ public Buffer getStoreEncoded() {
+ Buffer bytes;
+ try {
+ bytes = storeWireFormat.marshal(message);
+ } catch (IOException e) {
+ return null;
+ }
+ return bytes;
+ }
+
+
+ public void setStoreWireFormat(OpenWireFormat wireFormat) {
+ this.storeWireFormat = wireFormat;
+ }
+
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireMessageEvaluationContext.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireMessageEvaluationContext.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireMessageEvaluationContext.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireMessageEvaluationContext.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.openwire.support.broker.openwire;
+
+public class OpenwireMessageEvaluationContext {
+//
+// private Message message;
+//
+// public OpenwireMessageEvaluationContext() {
+// }
+// public OpenwireMessageEvaluationContext(Message message) {
+// this.message = message;
+// }
+//
+// public Message getMessage() {
+// return message;
+// }
+//
+// public void setMessage(Message message) {
+// this.message = message;
+// }
+//
+//
+// public Expression getPropertyExpression(final String name) {
+// Expression expression = JMS_PROPERTY_EXPRESSIONS.get(name);
+// if( expression == null ) {
+// expression = new Expression() {
+// public Object evaluate(MessageEvaluationContext mc) throws FilterException {
+// try {
+// Message message = ((OpenwireMessageEvaluationContext) mc).message;
+// return message.getProperty(name);
+// } catch (IOException e) {
+// throw new FilterException(e);
+// }
+// }
+// };
+// }
+// return expression;
+// }
+//
+// public <T> T getBodyAs(Class<T> type) throws FilterException {
+// try {
+// if( type == String.class ) {
+// if ( message instanceof ActiveMQTextMessage ) {
+// return type.cast(((ActiveMQTextMessage)message).getText());
+// }
+// }
+// if( type == Buffer.class ) {
+// if ( message instanceof ActiveMQBytesMessage ) {
+// ActiveMQBytesMessage bm = ((ActiveMQBytesMessage)message);
+// byte data[] = new byte[(int) bm.getBodyLength()];
+// bm.readBytes(data);
+// return type.cast(new Buffer(data));
+// }
+// }
+// return null;
+// } catch (JMSException e) {
+// throw new FilterException(e);
+// }
+// }
+//
+// public <T> T getDestination() {
+// return (T) destination;
+// }
+// public Object getLocalConnectionId() {
+// throw new UnsupportedOperationException();
+// }
+// public void setDestination(Object destination) {
+// this.destination = destination;
+// }
+
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireProtocolHandler.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireProtocolHandler.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireProtocolHandler.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,1057 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support.broker.openwire;
+
+//import java.io.IOException;
+//import java.util.ArrayList;
+//import java.util.HashMap;
+//import java.util.LinkedList;
+//import java.util.concurrent.ConcurrentHashMap;
+//
+//import javax.jms.JMSException;
+//import javax.transaction.xa.XAException;
+//import javax.transaction.xa.Xid;
+//
+//import org.apache.activemq.apollo.WindowLimiter;
+//import org.apache.activemq.apollo.broker.Broker;
+//import org.apache.activemq.apollo.broker.BrokerConnection;
+//import org.apache.activemq.apollo.broker.BrokerMessageDelivery;
+//import org.apache.activemq.apollo.broker.BrokerSubscription;
+//import org.apache.activemq.apollo.broker.Destination;
+//import org.apache.activemq.apollo.broker.MessageDelivery;
+//import org.apache.activemq.apollo.broker.ProtocolHandler;
+//import org.apache.activemq.apollo.broker.Router;
+//import org.apache.activemq.apollo.broker.Transaction;
+//import org.apache.activemq.apollo.broker.VirtualHost;
+//import org.apache.activemq.apollo.broker.XidImpl;
+//import org.apache.activemq.apollo.broker.Transaction.TransactionListener;
+//import org.apache.activemq.broker.openwire.OpenWireMessageDelivery.PersistListener;
+//import org.apache.activemq.broker.store.Store.MessageRecord;
+//import org.apache.activemq.command.ActiveMQDestination;
+//import org.apache.activemq.command.BrokerId;
+//import org.apache.activemq.command.BrokerInfo;
+//import org.apache.activemq.command.Command;
+//import org.apache.activemq.command.ConnectionControl;
+//import org.apache.activemq.command.ConnectionError;
+//import org.apache.activemq.command.ConnectionId;
+//import org.apache.activemq.command.ConnectionInfo;
+//import org.apache.activemq.command.ConsumerControl;
+//import org.apache.activemq.command.ConsumerId;
+//import org.apache.activemq.command.ConsumerInfo;
+//import org.apache.activemq.command.ControlCommand;
+//import org.apache.activemq.command.DestinationInfo;
+//import org.apache.activemq.command.ExceptionResponse;
+//import org.apache.activemq.command.FlushCommand;
+//import org.apache.activemq.command.KeepAliveInfo;
+//import org.apache.activemq.command.Message;
+//import org.apache.activemq.command.MessageAck;
+//import org.apache.activemq.command.MessageDispatch;
+//import org.apache.activemq.command.MessageDispatchNotification;
+//import org.apache.activemq.command.MessageId;
+//import org.apache.activemq.command.MessagePull;
+//import org.apache.activemq.command.ProducerAck;
+//import org.apache.activemq.command.ProducerId;
+//import org.apache.activemq.command.ProducerInfo;
+//import org.apache.activemq.command.RemoveInfo;
+//import org.apache.activemq.command.RemoveSubscriptionInfo;
+//import org.apache.activemq.command.Response;
+//import org.apache.activemq.command.SessionId;
+//import org.apache.activemq.command.SessionInfo;
+//import org.apache.activemq.command.ShutdownInfo;
+//import org.apache.activemq.command.TransactionId;
+//import org.apache.activemq.command.TransactionInfo;
+//import org.apache.activemq.command.WireFormatInfo;
+//import org.apache.activemq.dispatch.DispatchPriority;
+//import org.apache.activemq.filter.BooleanExpression;
+//import org.apache.activemq.apollo.filter.FilterException;
+//import org.apache.activemq.apollo.filter.LogicExpression;
+//import org.apache.activemq.apollo.filter.NoLocalExpression;
+//import org.apache.activemq.flow.Flow;
+//import org.apache.activemq.flow.FlowController;
+//import org.apache.activemq.flow.IFlowController;
+//import org.apache.activemq.flow.IFlowLimiter;
+//import org.apache.activemq.flow.IFlowResource;
+//import org.apache.activemq.flow.ISourceController;
+//import org.apache.activemq.flow.SizeLimiter;
+//import org.apache.activemq.flow.ISinkController.FlowControllable;
+//import org.apache.activemq.openwire.OpenWireFormat;
+//import org.apache.activemq.selector.SelectorParser;
+//import org.apache.activemq.state.CommandVisitor;
+//import org.apache.activemq.transport.WireFormatNegotiator;
+//import org.fusesource.hawtbuf.Buffer;
+//import org.apache.activemq.wireformat.WireFormat;
+//import org.fusesource.hawtdispatch.Dispatch;
+
+public class OpenwireProtocolHandler { // implements ProtocolHandler, PersistListener {
+
+// protected final HashMap<ConnectionId, ClientContext> connections = new HashMap<ConnectionId, ClientContext>();
+// protected final HashMap<SessionId, ClientContext> sessions = new HashMap<SessionId, ClientContext>();
+// protected final HashMap<ProducerId, ProducerContext> producers = new HashMap<ProducerId, ProducerContext>();
+// protected final HashMap<ConsumerId, ConsumerContext> consumers = new HashMap<ConsumerId, ConsumerContext>();
+//
+// protected final ConcurrentHashMap<TransactionId, Transaction> transactions = new ConcurrentHashMap<TransactionId, Transaction>();
+//
+// protected BrokerConnection connection;
+// private OpenWireFormat wireFormat;
+// private OpenWireFormat storeWireFormat;
+// private Router router;
+// private VirtualHost host;
+// private final CommandVisitor visitor;
+//
+// ArrayList<ActiveMQDestination> temporaryDestinations = new ArrayList<ActiveMQDestination>();
+//
+// public OpenwireProtocolHandler() {
+// setStoreWireFormat(new OpenWireFormat());
+//
+// visitor = new CommandVisitor() {
+//
+// // /////////////////////////////////////////////////////////////////
+// // Methods that keep track of the client state
+// // /////////////////////////////////////////////////////////////////
+// public Response processAddConnection(final ConnectionInfo info) throws Exception {
+// if (!connections.containsKey(info.getConnectionId())) {
+//
+// ClientContext connection = new AbstractClientContext<MessageDelivery>(info.getConnectionId().toString(), null) {
+// ConnectionInfo connectionInfo = info;
+//
+// public void close() {
+// super.close();
+// connections.remove(connectionInfo.getConnectionId());
+// }
+// };
+// connections.put(info.getConnectionId(), connection);
+//
+// // Connections have an implicitly created "default" session identified by session id = -1
+// SessionId sessionId = new SessionId(info.getConnectionId(), -1);
+// addSession(sessionId, connection);
+// }
+// return ack(info);
+// }
+//
+// public Response processAddSession(final SessionInfo info) throws Exception {
+// final SessionId sessionId = info.getSessionId();
+// ClientContext connection = connections.get(sessionId.getParentId());
+// if (connection == null) {
+// throw new IllegalStateException(host.getHostName() + " Cannot add a session to a connection that had not been registered: " + sessionId.getParentId());
+// }
+//
+// if (!sessions.containsKey(sessionId)) {
+// addSession(sessionId, connection);
+// }
+//
+// return ack(info);
+// }
+//
+// private void addSession(final SessionId sessionId, ClientContext connection) {
+// ClientContext session = new AbstractClientContext<MessageDelivery>(sessionId.toString(), connection) {
+// public void close() {
+// super.close();
+// sessions.remove(sessionId);
+// }
+// };
+// sessions.put(sessionId, session);
+// }
+//
+// public Response processAddProducer(ProducerInfo info) throws Exception {
+// ClientContext session = sessions.get(info.getProducerId().getParentId());
+// if (session == null) {
+// throw new IllegalStateException(host.getHostName() + " Cannot add a producer to a session that had not been registered: " + info.getProducerId().getParentId());
+// }
+// if (!producers.containsKey(info.getProducerId())) {
+// ProducerContext producer = new ProducerContext(info, session);
+// }
+// return ack(info);
+// }
+//
+// public Response processAddConsumer(ConsumerInfo info) throws Exception {
+// ClientContext session = sessions.get(info.getConsumerId().getParentId());
+// if (session == null) {
+// throw new IllegalStateException(host.getHostName() + " Cannot add a consumer to a session that had not been registered: " + info.getConsumerId().getParentId());
+// }
+//
+// if (!consumers.containsKey(info.getConsumerId())) {
+// ConsumerContext ctx = new ConsumerContext(info, session);
+// ctx.start();
+// }
+//
+// return ack(info);
+// }
+//
+// public Response processRemoveConnection(RemoveInfo remove, ConnectionId info, long arg1) throws Exception {
+// ClientContext cc = connections.get(info);
+// if (cc != null) {
+// cc.close();
+// }
+// ack(remove);
+// return null;
+// }
+//
+// public Response processRemoveSession(RemoveInfo remove, SessionId info, long arg1) throws Exception {
+// ClientContext cc = sessions.get(info);
+// if (cc != null) {
+// cc.close();
+// }
+// ack(remove);
+// return null;
+// }
+//
+// public Response processRemoveProducer(RemoveInfo remove, ProducerId info) throws Exception {
+// ClientContext cc = producers.get(info);
+// if (cc != null) {
+// cc.close();
+// }
+// ack(remove);
+// return null;
+// }
+//
+// public Response processRemoveConsumer(RemoveInfo remove, ConsumerId info, long arg1) throws Exception {
+// ClientContext cc = consumers.get(info);
+// if (cc != null) {
+// cc.close();
+// }
+// ack(remove);
+// return null;
+// }
+//
+// // /////////////////////////////////////////////////////////////////
+// // Message Processing Methods.
+// // /////////////////////////////////////////////////////////////////
+// public Response processMessage(Message info) throws Exception {
+// if (info.getOriginalDestination() == null) {
+// info.setOriginalDestination(info.getDestination());
+// }
+//
+// ProducerId producerId = info.getProducerId();
+// ProducerContext producerContext = producers.get(producerId);
+//
+// OpenWireMessageDelivery md = new OpenWireMessageDelivery(info);
+// md.setStoreWireFormat(storeWireFormat);
+// TransactionId tid = info.getTransactionId();
+// if (tid != null) {
+// Transaction t = locateTransaction(tid, true);
+// md.setTransactionId(t.getTid());
+// } else {
+// md.setPersistListener(OpenwireProtocolHandler.this);
+// }
+//
+// // Only producers that are not using a window will block,
+// // and if it blocks.
+// // yes we block the connection's read thread. yes other
+// // sessions will not get
+// // serviced while we block here. The producer is depending
+// // on TCP flow
+// // control to slow him down so we have to stop ready from
+// // the socket at this
+// // point.
+// while (!producerContext.controller.offer(md, null)) {
+// producerContext.controller.waitForFlowUnblock();
+// }
+//
+// if (tid != null) {
+// return ack(info);
+// } else {
+// return null;
+// }
+// }
+//
+// public Response processMessageAck(MessageAck info) throws Exception {
+// ConsumerContext ctx = consumers.get(info.getConsumerId());
+// ctx.ack(info);
+// return ack(info);
+// }
+//
+// // Only used when client prefetch is set to zero.
+// public Response processMessagePull(MessagePull info) throws Exception {
+// return ack(info);
+// }
+//
+// // /////////////////////////////////////////////////////////////////
+// // Control Methods
+// // /////////////////////////////////////////////////////////////////
+// public Response processWireFormat(WireFormatInfo info) throws Exception {
+//
+// // Negotiate the openwire encoding options.
+// WireFormatNegotiator wfn = new WireFormatNegotiator(connection.getTransport(), wireFormat, 1);
+// wfn.sendWireFormat();
+// wfn.negociate(info);
+//
+// // Now that the encoding is negotiated.. let the client know
+// // the details about this
+// // broker.
+// BrokerInfo brokerInfo = new BrokerInfo();
+// Broker broker = connection.getBroker();
+// brokerInfo.setBrokerId(new BrokerId(broker.getName()));
+// brokerInfo.setBrokerName(broker.getName());
+// if (!broker.getConnectUris().isEmpty()) {
+// brokerInfo.setBrokerURL(broker.getConnectUris().get(0));
+// }
+// connection.write(brokerInfo);
+// return ack(info);
+// }
+//
+// public Response processShutdown(ShutdownInfo info) throws Exception {
+// connection.setStopping();
+// return ack(info);
+// }
+//
+// public Response processKeepAlive(KeepAliveInfo info) throws Exception {
+// if (info.isResponseRequired()) {
+// info.setResponseRequired(false);
+// connection.write(info);
+// }
+// return null;
+// }
+//
+// public Response processFlush(FlushCommand info) throws Exception {
+// return ack(info);
+// }
+//
+// public Response processConnectionControl(ConnectionControl info) throws Exception {
+// if (info != null) {
+// if (info.isFaultTolerant()) {
+// throw new UnsupportedOperationException("Fault Tolerance");
+// }
+// }
+// return ack(info);
+// }
+//
+// public Response processConnectionError(ConnectionError info) throws Exception {
+// return ack(info);
+// }
+//
+// public Response processConsumerControl(ConsumerControl info) throws Exception {
+// return ack(info);
+// }
+//
+// // /////////////////////////////////////////////////////////////////
+// // Methods for server management
+// // /////////////////////////////////////////////////////////////////
+// public Response processAddDestination(DestinationInfo info) throws Exception {
+// ActiveMQDestination destination = info.getDestination();
+// if (destination.isTemporary()) {
+// // Keep track of it so that we can remove them this connection
+// // shuts down.
+// temporaryDestinations.add(destination);
+// }
+// host.createQueue(destination);
+// return ack(info);
+// }
+//
+// public Response processRemoveDestination(DestinationInfo info) throws Exception {
+// throw new UnsupportedOperationException();
+// }
+//
+// public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
+// throw new UnsupportedOperationException();
+// }
+//
+// public Response processControlCommand(ControlCommand info) throws Exception {
+// throw new UnsupportedOperationException();
+// }
+//
+// // /////////////////////////////////////////////////////////////////
+// // Methods for transaction management
+// // /////////////////////////////////////////////////////////////////
+// public Response processBeginTransaction(TransactionInfo info) throws Exception {
+// TransactionId tid = info.getTransactionId();
+//
+// Transaction t = locateTransaction(tid, false);
+// if (t == null) {
+//
+// Buffer xid = null;
+// if (tid.isXATransaction()) {
+// xid = XidImpl.toBuffer((Xid) tid);
+// }
+// t = host.getTransactionManager().createTransaction(xid);
+// transactions.put(tid, t);
+// }
+//
+// return ack(info);
+// }
+//
+// public Response processCommitTransactionOnePhase(final TransactionInfo info) throws Exception {
+// final TransactionId tid = info.getTransactionId();
+// Transaction t = locateTransaction(tid, true);
+//
+// TransactionListener listener = null;
+// if (info.isResponseRequired()) {
+// listener = new TransactionListener() {
+//
+// @Override
+// public void onCommit(Transaction t) {
+// transactions.remove(tid);
+// ack(info);
+// }
+//
+// @Override
+// public void onRollback(Transaction t) {
+// transactions.remove(tid);
+// ExceptionResponse r = new ExceptionResponse(new XAException("RolledBack"));
+// r.setCorrelationId(info.getCommandId());
+// connection.write(r);
+// }
+//
+// };
+// }
+//
+// t.commit(true, listener);
+// transactions.remove(tid);
+// return null;
+// }
+//
+// public Response processCommitTransactionTwoPhase(final TransactionInfo info) throws Exception {
+// final TransactionId tid = info.getTransactionId();
+// Transaction t = locateTransaction(tid, true);
+//
+// TransactionListener listener = null;
+// if (info.isResponseRequired()) {
+// listener = new TransactionListener() {
+//
+// @Override
+// public void onCommit(Transaction t) {
+// transactions.remove(tid);
+// ack(info);
+// }
+//
+// @Override
+// public void onRollback(Transaction t) {
+// transactions.remove(tid);
+// ExceptionResponse r = new ExceptionResponse(new XAException("RolledBack"));
+// r.setCorrelationId(info.getCommandId());
+// connection.write(r);
+// }
+//
+// };
+// }
+//
+// t.commit(false, listener);
+// return null;
+// }
+//
+// public Response processEndTransaction(TransactionInfo info) throws Exception {
+// //Shouldn't actually do anything, send by client to ensure that it is
+// //in sync with broker transaction state.
+// //TODO need to investigate whether this should wait for prior transaction
+// //state to flush out?
+// new UnsupportedOperationException().printStackTrace();
+// return ack(info);
+// }
+//
+// public Response processForgetTransaction(TransactionInfo info) throws Exception {
+// return processRollbackTransaction(info);
+// }
+//
+// public Response processPrepareTransaction(final TransactionInfo info) throws Exception {
+// final TransactionId tid = info.getTransactionId();
+// Transaction t = locateTransaction(tid, true);
+//
+// TransactionListener listener = null;
+// if (info.isResponseRequired()) {
+// listener = new TransactionListener() {
+//
+// @Override
+// public void onPrepared(Transaction t) {
+// ack(info);
+// }
+// };
+// }
+// t.prepare(listener);
+// return null;
+// }
+//
+// public Response processRecoverTransactions(TransactionInfo info) throws Exception {
+// //TODO
+// throw new UnsupportedOperationException();
+// }
+//
+// public Response processRollbackTransaction(final TransactionInfo info) throws Exception {
+// final TransactionId tid = info.getTransactionId();
+// Transaction t = locateTransaction(tid, true);
+//
+// TransactionListener listener = null;
+// if (info.isResponseRequired()) {
+// listener = new TransactionListener() {
+//
+// @Override
+// public void onRollback(Transaction t) {
+// ack(info);
+// }
+// };
+// }
+// t.rollback(listener);
+// transactions.remove(tid);
+// return null;
+// }
+//
+// // /////////////////////////////////////////////////////////////////
+// // Methods for cluster operations
+// // These commands are sent to the broker when it's acting like a
+// // client to another broker.
+// // /////////////////////////////////////////////////////////////////
+// public Response processBrokerInfo(BrokerInfo info) throws Exception {
+// throw new UnsupportedOperationException();
+// }
+//
+// public Response processMessageDispatch(MessageDispatch info) throws Exception {
+// throw new UnsupportedOperationException();
+// }
+//
+// public Response processMessageDispatchNotification(MessageDispatchNotification info) throws Exception {
+// throw new UnsupportedOperationException();
+// }
+//
+// public Response processProducerAck(ProducerAck info) throws Exception {
+// return ack(info);
+// }
+// };
+// }
+//
+// private Transaction locateTransaction(TransactionId tid, boolean expected) throws XAException, JMSException {
+// Transaction t;
+//
+// if (tid.isLocalTransaction()) {
+// t = transactions.get(tid);
+// } else {
+// t = host.getTransactionManager().getXATransaction(XidImpl.toBuffer((Xid) tid));
+// }
+//
+// if (t == null && expected) {
+// if (tid.isXATransaction()) {
+// XAException e = new XAException("Transaction '" + tid + "' has not been started.");
+// e.errorCode = XAException.XAER_NOTA;
+// throw e;
+// } else {
+// throw new JMSException("Transaction '" + tid + "' has not been started.");
+// }
+// }
+// return t;
+// }
+//
+// public void start() throws Exception {
+//
+// }
+//
+// public void stop() throws Exception {
+// }
+//
+// public void onCommand(Object o) {
+// boolean responseRequired = false;
+// int commandId = 0;
+// try {
+// Command command = (Command) o;
+// commandId = command.getCommandId();
+// responseRequired = command.isResponseRequired();
+// //System.out.println(o);
+// command.visit(visitor);
+// } catch (Exception e) {
+// if (responseRequired) {
+// ExceptionResponse response = new ExceptionResponse(e);
+// response.setCorrelationId(commandId);
+// connection.write(response);
+// } else {
+// connection.onException(e);
+// }
+// } catch (Throwable t) {
+// if (responseRequired) {
+// ExceptionResponse response = new ExceptionResponse(t);
+// response.setCorrelationId(commandId);
+// connection.write(response);
+// } else {
+// connection.onException(new RuntimeException(t));
+// }
+// }
+// }
+//
+// public void onException(Exception error) {
+// if (!connection.isStopping()) {
+// error.printStackTrace();
+// new Thread() {
+// @Override
+// public void run() {
+// try {
+// connection.stop();
+// } catch (Exception ignore) {
+// }
+// }
+// }.start();
+// }
+// }
+//
+// public void onMessagePersisted(OpenWireMessageDelivery delivery) {
+// // TODO This method should not block:
+// // Either add to output queue, or spin off in a separate thread.
+// ack(delivery.getMessage());
+// }
+//
+// Response ack(Command command) {
+// if (command.isResponseRequired()) {
+// Response rc = new Response();
+// rc.setCorrelationId(command.getCommandId());
+// connection.write(rc);
+// }
+// return null;
+// }
+//
+// // /////////////////////////////////////////////////////////////////
+// // Internal Support Methods
+// // /////////////////////////////////////////////////////////////////
+//
+// class ProducerContext extends AbstractClientContext<OpenWireMessageDelivery> {
+//
+// protected final Object inboundMutex = new Object();
+// private IFlowController<OpenWireMessageDelivery> controller;
+// private final ProducerInfo info;
+//
+// public ProducerContext(final ProducerInfo info, ClientContext parent) {
+// super(info.getProducerId().toString(), parent);
+// this.info = info;
+// producers.put(info.getProducerId(), this);
+// final Flow flow = new Flow("broker-" + super.getResourceName() + "-inbound", false);
+//
+// // Openwire only uses credit windows at the producer level for
+// // producers that request the feature.
+// IFlowLimiter<OpenWireMessageDelivery> limiter;
+// if (info.getWindowSize() > 0) {
+// limiter = new WindowLimiter<OpenWireMessageDelivery>(false, flow, info.getWindowSize(), info.getWindowSize() / 2) {
+// @Override
+// protected void sendCredit(int credit) {
+// ProducerAck ack = new ProducerAck(info.getProducerId(), credit);
+// connection.write(ack);
+// }
+// };
+// } else {
+//
+// limiter = new SizeLimiter<OpenWireMessageDelivery>(1024*64, 1024*32);
+// }
+//
+// controller = new FlowController<OpenWireMessageDelivery>(new FlowControllable<OpenWireMessageDelivery>() {
+// public void flowElemAccepted(ISourceController<OpenWireMessageDelivery> controller, OpenWireMessageDelivery msg) {
+// router.route(msg, controller, true);
+// controller.elementDispatched(msg);
+// }
+//
+// public IFlowResource getFlowResource() {
+// return ProducerContext.this;
+// }
+// }, flow, limiter, inboundMutex);
+//
+// super.onFlowOpened(controller);
+// }
+//
+// public void close() {
+// super.close();
+// producers.remove(info);
+// }
+// }
+//
+// class ConsumerContext extends AbstractClientContext<MessageDelivery> implements ProtocolHandler.ConsumerContext {
+//
+// private final ConsumerInfo info;
+// private String name;
+// private BooleanExpression selector;
+// private boolean isDurable;
+// private boolean isQueueReceiver;
+//
+// private final FlowController<MessageDelivery> controller;
+// private final WindowLimiter<MessageDelivery> limiter;
+//
+// private HashMap<MessageId, SubscriptionDelivery<MessageDelivery>> pendingMessages = new HashMap<MessageId, SubscriptionDelivery<MessageDelivery>>();
+// private LinkedList<MessageId> pendingMessageIds = new LinkedList<MessageId>();
+// private BrokerSubscription brokerSubscription;
+// private int borrowedLimterCredits;
+//
+// public ConsumerContext(final ConsumerInfo info, ClientContext parent) throws Exception {
+// super(info.getConsumerId().toString(), parent);
+// this.info = info;
+// this.name = info.getConsumerId().toString();
+// consumers.put(info.getConsumerId(), this);
+//
+// Flow flow = new Flow("broker-" + name + "-outbound", false);
+// selector = parseSelector(info);
+// limiter = new WindowLimiter<MessageDelivery>(true, flow, info.getPrefetchSize(), info.getPrefetchSize() / 2) {
+// @Override
+// public int getElementSize(MessageDelivery m) {
+// return 1;
+// }
+// };
+//
+// isQueueReceiver = info.getDestination().isQueue();
+// if (info.getSubscriptionName() != null) {
+// isDurable = true;
+// }
+// controller = new FlowController<MessageDelivery>(null, flow, limiter, this);
+// controller.useOverFlowQueue(false);
+// controller.setExecutor(Dispatch.getGlobalQueue());
+// super.onFlowOpened(controller);
+// }
+//
+// public void start() throws Exception {
+// brokerSubscription = host.createSubscription(this);
+// brokerSubscription.connect(this);
+// }
+//
+// public boolean offer(final MessageDelivery message, ISourceController<?> source, SubscriptionDelivery<MessageDelivery> callback) {
+// if (!controller.offer(message, source)) {
+// return false;
+// } else {
+// sendInternal(message, controller, callback);
+// return true;
+// }
+// }
+//
+// public void add(final MessageDelivery message, ISourceController<?> source, SubscriptionDelivery<MessageDelivery> callback) {
+// controller.add(message, source);
+// sendInternal(message, controller, callback);
+// }
+//
+// private void sendInternal(final MessageDelivery message, ISourceController<?> controller, SubscriptionDelivery<MessageDelivery> callback) {
+// Message msg = message.asType(Message.class);
+// MessageDispatch md = new MessageDispatch();
+// md.setConsumerId(info.getConsumerId());
+// md.setMessage(msg);
+// md.setDestination(msg.getDestination());
+// // Add to the pending list if persistent and we are durable:
+// if (callback != null) {
+// if (callback.isRedelivery()) {
+// md.setRedeliveryCounter(1);
+// }
+// synchronized (this) {
+// Object old = pendingMessages.put(msg.getMessageId(), callback);
+// if (old != null) {
+// new Exception("Duplicate message id: " + msg.getMessageId()).printStackTrace();
+// }
+// pendingMessageIds.add(msg.getMessageId());
+// connection.write(md);
+// }
+// } else {
+// connection.write(md);
+// }
+// }
+//
+// public void ack(MessageAck info) throws XAException, JMSException {
+// // TODO: The pending message queue could probably be optimized to
+// // avoid having to create a new list here.
+// int flowCredit = info.getMessageCount();
+// if (info.isDeliveredAck()) {
+// // This ack is just trying to expand the flow control window size without actually
+// // acking the message. Keep track of how many limiter credits we borrow since they need
+// // to get paid back with real acks later.
+// borrowedLimterCredits += flowCredit;
+// limiter.onProtocolCredit(flowCredit);
+// } else if (info.isStandardAck()) {
+// TransactionId tid = info.getTransactionId();
+// Transaction transaction = null;
+// if (tid != null) {
+// transaction = locateTransaction(tid, true);
+// }
+//
+// LinkedList<SubscriptionDelivery<MessageDelivery>> acked = new LinkedList<SubscriptionDelivery<MessageDelivery>>();
+// synchronized (this) {
+// MessageId id = info.getLastMessageId();
+// if (isDurable() || isQueueReceiver()) {
+// while (!pendingMessageIds.isEmpty()) {
+// MessageId pendingId = pendingMessageIds.getFirst();
+// SubscriptionDelivery<MessageDelivery> callback = pendingMessages.remove(pendingId);
+// acked.add(callback);
+// pendingMessageIds.removeFirst();
+// if (pendingId.equals(id)) {
+// break;
+// }
+// }
+// }
+//
+// // Did we have DeliveredAcks previously sent? Then the
+// // the flow window has already been credited. We need to
+// // pay back the borrowed limiter credits before giving
+// // credits directly to the limiter.
+// if (borrowedLimterCredits > 0) {
+// if (flowCredit > borrowedLimterCredits) {
+// flowCredit -= borrowedLimterCredits;
+// borrowedLimterCredits = 0;
+// } else {
+// borrowedLimterCredits -= flowCredit;
+// flowCredit = 0;
+// }
+// }
+// limiter.onProtocolCredit(flowCredit);
+// }
+//
+// if (transaction == null) {
+// // Delete outside of synchronization on queue to avoid contention
+// // with enqueueing threads.
+// for (SubscriptionDelivery<MessageDelivery> callback : acked) {
+// callback.acknowledge();
+// }
+// } else {
+// // Delete outside of synchronization on queue to avoid contention
+// // with enqueueing threads.
+// for (SubscriptionDelivery<MessageDelivery> callback : acked) {
+// transaction.addAck(callback);
+// }
+// }
+// }
+// }
+//
+// public boolean hasSelector() {
+// return selector != null;
+// }
+//
+// public boolean matches(MessageDelivery message) {
+// Message msg = message.asType(Message.class);
+// if (msg == null) {
+// return false;
+// }
+//
+// OpenwireMessageEvaluationContext selectorContext = new OpenwireMessageEvaluationContext(msg);
+// selectorContext.setDestination(info.getDestination());
+// try {
+// return (selector == null || selector.matches(selectorContext));
+// } catch (FilterException e) {
+// e.printStackTrace();
+// return false;
+// }
+// }
+//
+// public boolean isDurable() {
+// return info.isDurable();
+// }
+//
+// public boolean isQueueReceiver() {
+// return isQueueReceiver;
+// }
+//
+// public boolean isExclusive() {
+// return info.isExclusive();
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see org.apache.activemq.queue.Subscription#isBrowser()
+// */
+// public boolean isBrowser() {
+// return info.isBrowser();
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.queue.Subscription#isRemoveOnDispatch(java.lang
+// * .Object)
+// */
+// public boolean isRemoveOnDispatch(MessageDelivery elem) {
+// if (isQueueReceiver()) {
+// return false;
+// }
+// return !elem.isPersistent() || !isDurable;
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+// * #getDestination()
+// */
+// public Destination getDestination() {
+// return info.getDestination();
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+// * #getJMSSelector()
+// */
+// public String getSelectorString() {
+// return info.getSelector();
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+// * #getSubscriptionName()
+// */
+// public String getSubscriptionName() {
+// return info.getSubscriptionName();
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+// * #getFullSelector()
+// */
+// public BooleanExpression getSelectorExpression() {
+// return selector;
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+// * #getJMSSelector()
+// */
+// public String getSelector() {
+// return info.getSelector();
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+// * #getConnection()
+// */
+// public BrokerConnection getConnection() {
+// return connection;
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+// * #getConsumerId()
+// */
+// public String getConsumerId() {
+// return name;
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see org.apache.activemq.flow.IFlowSink#add(java.lang.Object,
+// * org.apache.activemq.flow.ISourceController)
+// */
+// public void add(MessageDelivery message, ISourceController<?> source) {
+// add(message, source, null);
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see org.apache.activemq.flow.IFlowSink#offer(java.lang.Object,
+// * org.apache.activemq.flow.ISourceController)
+// */
+// public boolean offer(MessageDelivery message, ISourceController<?> source) {
+// return offer(message, source, null);
+// }
+//
+// public boolean autoCreateDestination() {
+// return true;
+// }
+//
+// public String toString() {
+// return info.getConsumerId().toString();
+// }
+//
+// public void close() {
+// brokerSubscription.disconnect(this);
+//
+// if (isDurable() || isQueueReceiver()) {
+// LinkedList<SubscriptionDelivery<MessageDelivery>> unacquired = null;
+//
+// synchronized (this) {
+//
+// unacquired = new LinkedList<SubscriptionDelivery<MessageDelivery>>();
+// while (!pendingMessageIds.isEmpty()) {
+// MessageId pendingId = pendingMessageIds.getLast();
+// SubscriptionDelivery<MessageDelivery> callback = pendingMessages.remove(pendingId);
+// unacquired.add(callback);
+// pendingMessageIds.removeLast();
+// }
+// limiter.onProtocolCredit(unacquired.size());
+// }
+//
+// if (unacquired != null) {
+// // Delete outside of synchronization on queue to avoid contention
+// // with enqueueing threads.
+// for (SubscriptionDelivery<MessageDelivery> callback : unacquired) {
+// callback.unacquire(controller);
+// }
+// }
+// }
+//
+// super.close();
+// consumers.remove(info.getConsumerId());
+// }
+//
+// public boolean isPersistent() {
+// return true;
+// }
+// }
+//
+// private static BooleanExpression parseSelector(ConsumerInfo info) throws FilterException {
+// BooleanExpression rc = null;
+// if (info.getSelector() != null) {
+// rc = SelectorParser.parse(info.getSelector());
+// }
+// if (info.isNoLocal()) {
+// if (rc == null) {
+// rc = new NoLocalExpression(info.getConsumerId().getConnectionId());
+// } else {
+// rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc);
+// }
+// }
+// if (info.getAdditionalPredicate() != null) {
+// if (rc == null) {
+// rc = info.getAdditionalPredicate();
+// } else {
+// rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc);
+// }
+// }
+// return rc;
+// }
+//
+// public BrokerConnection getConnection() {
+// return connection;
+// }
+//
+// public void setConnection(BrokerConnection connection) {
+// this.connection = connection;
+// this.host = connection.getBroker().getDefaultVirtualHost();
+// this.router = host.getRouter();
+// }
+//
+// public void setWireFormat(WireFormat wireFormat) {
+// this.wireFormat = (OpenWireFormat) wireFormat;
+// setStoreWireFormat(this.wireFormat.copy());
+// }
+//
+// private void setStoreWireFormat(OpenWireFormat wireFormat) {
+// this.storeWireFormat = wireFormat;
+// storeWireFormat.setVersion(OpenWireFormat.DEFAULT_VERSION);
+// storeWireFormat.setCacheEnabled(false);
+// storeWireFormat.setTightEncodingEnabled(false);
+// storeWireFormat.setSizePrefixDisabled(false);
+// }
+//
+// public BrokerMessageDelivery createMessageDelivery(MessageRecord record) throws IOException {
+// Buffer buf = record.getBuffer();
+// Message message = (Message) storeWireFormat.unmarshal(new Buffer(buf.data, buf.offset, buf.length));
+// OpenWireMessageDelivery delivery = new OpenWireMessageDelivery(message);
+// delivery.setStoreWireFormat(storeWireFormat);
+// return delivery;
+// }
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/Destination.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/Destination.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/Destination.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support.broker.region;
+
+
+/**
+ * @version $Revision: 1.12 $
+ */
+public interface Destination {
+
+ int getMinimumMessageSize();
+}
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/Destination.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/MessageReference.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/MessageReference.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/MessageReference.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/MessageReference.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support.broker.region;
+
+import java.io.IOException;
+
+import org.apache.activemq.apollo.openwire.command.ConsumerId;
+import org.apache.activemq.apollo.openwire.command.Message;
+import org.apache.activemq.apollo.openwire.command.MessageId;
+
+/**
+ * Keeps track of a message that is flowing through the Broker. This
+ * object may hold a hard reference to the message or only hold the
+ * id of the message if the message has been persisted on in a MessageStore.
+ *
+ * @version $Revision: 1.15 $
+ */
+public interface MessageReference {
+
+ MessageId getMessageId();
+ Message getMessageHardRef();
+ Message getMessage() throws IOException;
+ boolean isPersistent();
+
+ int getRedeliveryCounter();
+ void incrementRedeliveryCounter();
+
+ int getReferenceCount();
+
+ int incrementReferenceCount();
+ int decrementReferenceCount();
+ ConsumerId getTargetConsumerId();
+ int getSize();
+ long getExpiration();
+ String getGroupID();
+ int getGroupSequence();
+
+ /**
+ * Returns true if this message is expired
+ */
+ boolean isExpired();
+
+ /**
+ * Returns true if this message is dropped.
+ */
+ boolean isDropped();
+
+ /**
+ * @return true if the message is an advisory
+ */
+ boolean isAdvisory();
+
+}
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/MessageReference.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitor.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitor.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitor.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support.state;
+
+import org.apache.activemq.apollo.openwire.command.BrokerInfo;
+import org.apache.activemq.apollo.openwire.command.ConnectionControl;
+import org.apache.activemq.apollo.openwire.command.ConnectionError;
+import org.apache.activemq.apollo.openwire.command.ConnectionId;
+import org.apache.activemq.apollo.openwire.command.ConnectionInfo;
+import org.apache.activemq.apollo.openwire.command.ConsumerControl;
+import org.apache.activemq.apollo.openwire.command.ConsumerId;
+import org.apache.activemq.apollo.openwire.command.ConsumerInfo;
+import org.apache.activemq.apollo.openwire.command.ControlCommand;
+import org.apache.activemq.apollo.openwire.command.DestinationInfo;
+import org.apache.activemq.apollo.openwire.command.FlushCommand;
+import org.apache.activemq.apollo.openwire.command.KeepAliveInfo;
+import org.apache.activemq.apollo.openwire.command.Message;
+import org.apache.activemq.apollo.openwire.command.MessageAck;
+import org.apache.activemq.apollo.openwire.command.MessageDispatch;
+import org.apache.activemq.apollo.openwire.command.MessageDispatchNotification;
+import org.apache.activemq.apollo.openwire.command.MessagePull;
+import org.apache.activemq.apollo.openwire.command.ProducerAck;
+import org.apache.activemq.apollo.openwire.command.ProducerId;
+import org.apache.activemq.apollo.openwire.command.ProducerInfo;
+import org.apache.activemq.apollo.openwire.command.RemoveInfo;
+import org.apache.activemq.apollo.openwire.command.RemoveSubscriptionInfo;
+import org.apache.activemq.apollo.openwire.command.Response;
+import org.apache.activemq.apollo.openwire.command.SessionId;
+import org.apache.activemq.apollo.openwire.command.SessionInfo;
+import org.apache.activemq.apollo.openwire.command.ShutdownInfo;
+import org.apache.activemq.apollo.openwire.command.TransactionInfo;
+import org.apache.activemq.apollo.openwire.command.WireFormatInfo;
+
+public interface CommandVisitor {
+
+ Response processAddConnection(ConnectionInfo info) throws Exception;
+
+ Response processAddSession(SessionInfo info) throws Exception;
+
+ Response processAddProducer(ProducerInfo info) throws Exception;
+
+ Response processAddConsumer(ConsumerInfo info) throws Exception;
+
+ Response processRemoveConnection(RemoveInfo info, ConnectionId id, long lastDeliveredSequenceId) throws Exception;
+
+ Response processRemoveSession(RemoveInfo info, SessionId id, long lastDeliveredSequenceId) throws Exception;
+
+ Response processRemoveProducer(RemoveInfo info,ProducerId id) throws Exception;
+
+ Response processRemoveConsumer(RemoveInfo info,ConsumerId id, long lastDeliveredSequenceId) throws Exception;
+
+ Response processAddDestination(DestinationInfo info) throws Exception;
+
+ Response processRemoveDestination(DestinationInfo info) throws Exception;
+
+ Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception;
+
+ Response processMessage(Message send) throws Exception;
+
+ Response processMessageAck(MessageAck ack) throws Exception;
+
+ Response processMessagePull(MessagePull pull) throws Exception;
+
+ Response processBeginTransaction(TransactionInfo info) throws Exception;
+
+ Response processPrepareTransaction(TransactionInfo info) throws Exception;
+
+ Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception;
+
+ Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception;
+
+ Response processRollbackTransaction(TransactionInfo info) throws Exception;
+
+ Response processWireFormat(WireFormatInfo info) throws Exception;
+
+ Response processKeepAlive(KeepAliveInfo info) throws Exception;
+
+ Response processShutdown(ShutdownInfo info) throws Exception;
+
+ Response processFlush(FlushCommand command) throws Exception;
+
+ Response processBrokerInfo(BrokerInfo info) throws Exception;
+
+ Response processRecoverTransactions(TransactionInfo info) throws Exception;
+
+ Response processForgetTransaction(TransactionInfo info) throws Exception;
+
+ Response processEndTransaction(TransactionInfo info) throws Exception;
+
+ Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception;
+
+ Response processProducerAck(ProducerAck ack) throws Exception;
+
+ Response processMessageDispatch(MessageDispatch dispatch) throws Exception;
+
+ Response processControlCommand(ControlCommand command) throws Exception;
+
+ Response processConnectionError(ConnectionError error) throws Exception;
+
+ Response processConnectionControl(ConnectionControl control) throws Exception;
+
+ Response processConsumerControl(ConsumerControl control) throws Exception;
+
+}
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitor.java
------------------------------------------------------------------------------
svn:executable = *