You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/04/27 19:33:09 UTC

svn commit: r1097189 [41/42] - in /activemq/activemq-apollo/trunk: ./ apollo-openwire/ apollo-openwire/src/ apollo-openwire/src/main/ apollo-openwire/src/main/resources/ apollo-openwire/src/main/resources/META-INF/ apollo-openwire/src/main/resources/ME...

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/package.html?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/package.html (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/package.html Wed Apr 27 17:32:51 2011
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Support for JMS Advisory messages as well as some helper listeners to listen to the clients, producers and consumers available.
+
+</body>
+</html>

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/package.html
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobTransferPolicy.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobTransferPolicy.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobTransferPolicy.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobTransferPolicy.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support.blob;
+
+/**
+ * The policy for configuring how BLOBs (Binary Large OBjects) are transferred
+ * out of band between producers, brokers and consumers.
+ *
+ * @version $Revision: $
+ */
+public class BlobTransferPolicy {
+    private String defaultUploadUrl = "http://localhost:8080/uploads/";
+    private String brokerUploadUrl;
+    private String uploadUrl;
+    private int bufferSize = 128 * 1024;
+    private org.apache.activemq.apollo.openwire.support.blob.BlobUploadStrategy uploadStrategy;
+
+    /**
+     * Returns a copy of this policy object
+     */
+    public BlobTransferPolicy copy() {
+        BlobTransferPolicy that = new BlobTransferPolicy();
+        that.defaultUploadUrl = this.defaultUploadUrl;
+        that.brokerUploadUrl = this.brokerUploadUrl;
+        that.uploadUrl = this.uploadUrl;
+        that.uploadStrategy = this.uploadStrategy;
+        return that;
+    }
+
+    public String getUploadUrl() {
+        if (uploadUrl == null) {
+            uploadUrl = getBrokerUploadUrl();
+            if (uploadUrl == null) {
+                uploadUrl = getDefaultUploadUrl();
+            }
+        }
+        return uploadUrl;
+    }
+
+    /**
+     * Sets the upload URL to use explicitly on the client which will
+     * overload the default or the broker's URL. This allows the client to decide
+     * where to upload files to irrespective of the brokers configuration.
+     */
+    public void setUploadUrl(String uploadUrl) {
+        this.uploadUrl = uploadUrl;
+    }
+
+    public String getBrokerUploadUrl() {
+        return brokerUploadUrl;
+    }
+
+    /**
+     * Called by the JMS client when a broker advertises its upload URL
+     */
+    public void setBrokerUploadUrl(String brokerUploadUrl) {
+        this.brokerUploadUrl = brokerUploadUrl;
+    }
+
+    public String getDefaultUploadUrl() {
+        return defaultUploadUrl;
+    }
+
+    /**
+     * Sets the default upload URL to use if the broker does not
+     * have a configured upload URL
+     */
+    public void setDefaultUploadUrl(String defaultUploadUrl) {
+        this.defaultUploadUrl = defaultUploadUrl;
+    }
+
+    public org.apache.activemq.apollo.openwire.support.blob.BlobUploadStrategy getUploadStrategy() {
+        if (uploadStrategy == null) {
+            uploadStrategy = createUploadStrategy();
+        }
+        return uploadStrategy;
+    }
+
+    /**
+     * Sets the upload strategy to use for uploading BLOBs to some URL
+     */
+    public void setUploadStrategy(org.apache.activemq.apollo.openwire.support.blob.BlobUploadStrategy uploadStrategy) {
+        this.uploadStrategy = uploadStrategy;
+    }
+
+    public int getBufferSize() {
+        return bufferSize;
+    }
+
+    /**
+     * Sets the default buffer size used when uploading or downloading files
+     */
+    public void setBufferSize(int bufferSize) {
+        this.bufferSize = bufferSize;
+    }
+
+    protected org.apache.activemq.apollo.openwire.support.blob.BlobUploadStrategy createUploadStrategy() {
+        return new org.apache.activemq.apollo.openwire.support.blob.DefaultBlobUploadStrategy(this);
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobUploadStrategy.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobUploadStrategy.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobUploadStrategy.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobUploadStrategy.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support.blob;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+
+import org.apache.activemq.apollo.openwire.command.ActiveMQBlobMessage;
+import org.apache.activemq.apollo.openwire.support.OpenwireException;
+
+/**
+ * Represents a strategy of uploading a file/stream to some remote
+ *
+ * @version $Revision: $
+ */
+public interface BlobUploadStrategy {
+
+    URL uploadFile(ActiveMQBlobMessage message, File file) throws OpenwireException, IOException;
+
+    URL uploadStream(ActiveMQBlobMessage message, InputStream in) throws OpenwireException, IOException;
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobUploader.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobUploader.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobUploader.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobUploader.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support.blob;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+
+import org.apache.activemq.apollo.openwire.command.ActiveMQBlobMessage;
+import org.apache.activemq.apollo.openwire.support.OpenwireException;
+
+/**
+ * A helper class to represent a required upload of a BLOB to some remote URL
+ * 
+ * @version $Revision: $
+ */
+public class BlobUploader {
+
+    private BlobTransferPolicy blobTransferPolicy;
+    private File file;
+    private InputStream in;
+
+    public BlobUploader(BlobTransferPolicy blobTransferPolicy, InputStream in) {
+        this.blobTransferPolicy = blobTransferPolicy;
+        this.in = in;
+    }
+
+    public BlobUploader(BlobTransferPolicy blobTransferPolicy, File file) {
+        this.blobTransferPolicy = blobTransferPolicy;
+        this.file = file;
+    }
+
+    public URL upload(ActiveMQBlobMessage message) throws OpenwireException, IOException {
+        if (file != null) {
+            return getStrategy().uploadFile(message, file);
+        } else {
+            return getStrategy().uploadStream(message, in);
+        }
+    }
+
+    public BlobTransferPolicy getBlobTransferPolicy() {
+        return blobTransferPolicy;
+    }
+
+    public BlobUploadStrategy getStrategy() {
+        return getBlobTransferPolicy().getUploadStrategy();
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/DefaultBlobUploadStrategy.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/DefaultBlobUploadStrategy.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/DefaultBlobUploadStrategy.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/DefaultBlobUploadStrategy.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support.blob;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import org.apache.activemq.apollo.openwire.command.ActiveMQBlobMessage;
+import org.apache.activemq.apollo.openwire.support.OpenwireException;
+
+/**
+ * A default implementation of {@link BlobUploadStrategy} which uses the URL
+ * class to upload files or streams to a remote URL
+ */
+public class DefaultBlobUploadStrategy implements BlobUploadStrategy {
+    private BlobTransferPolicy transferPolicy;
+
+    public DefaultBlobUploadStrategy(BlobTransferPolicy transferPolicy) {
+        this.transferPolicy = transferPolicy;
+    }
+
+    public URL uploadFile(ActiveMQBlobMessage message, File file) throws OpenwireException, IOException {
+        return uploadStream(message, new FileInputStream(file));
+    }
+
+    public URL uploadStream(ActiveMQBlobMessage message, InputStream fis) throws OpenwireException, IOException {
+        URL url = createUploadURL(message);
+
+        HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+        connection.setRequestMethod("PUT");
+        connection.setDoOutput(true);
+
+        // use chunked mode or otherwise URLConnection loads everything into
+        // memory
+        // (chunked mode not supported before JRE 1.5)
+        connection.setChunkedStreamingMode(transferPolicy.getBufferSize());
+
+        OutputStream os = connection.getOutputStream();
+
+        byte[] buf = new byte[transferPolicy.getBufferSize()];
+        for (int c = fis.read(buf); c != -1; c = fis.read(buf)) {
+            os.write(buf, 0, c);
+            os.flush();
+        }
+        os.close();
+        fis.close();
+
+        if (!isSuccessfulCode(connection.getResponseCode())) {
+            throw new IOException("PUT was not successful: " + connection.getResponseCode() + " "
+                                  + connection.getResponseMessage());
+        }
+
+        return url;
+    }
+
+    public void deleteFile(ActiveMQBlobMessage message) throws IOException, OpenwireException {
+        URL url = createUploadURL(message);
+
+        HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+        connection.setRequestMethod("DELETE");
+        connection.connect();
+        connection.disconnect();
+
+        if (!isSuccessfulCode(connection.getResponseCode())) {
+            throw new IOException("DELETE was not successful: " + connection.getResponseCode() + " "
+                                  + connection.getResponseMessage());
+        }
+    }
+
+    private boolean isSuccessfulCode(int responseCode) {
+        return responseCode >= 200 && responseCode < 300; // 2xx => successful
+    }
+
+    protected URL createUploadURL(ActiveMQBlobMessage message) throws OpenwireException, MalformedURLException {
+        return new URL(transferPolicy.getUploadUrl() + message.getMessageId().toString());
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/package.html?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/package.html (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/package.html Wed Apr 27 17:32:51 2011
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Helper classes for dealing with out-of-band BLOB objects
+
+</body>
+</html>

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenWireMessageDelivery.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenWireMessageDelivery.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenWireMessageDelivery.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support.broker.openwire;
+
+import java.io.IOException;
+
+import org.apache.activemq.apollo.dto.DestinationDTO;
+import org.apache.activemq.apollo.openwire.command.Message;
+import org.apache.activemq.apollo.openwire.codec.OpenWireFormat;
+import org.fusesource.hawtbuf.AsciiBuffer;
+import org.fusesource.hawtbuf.Buffer;
+
+public class OpenWireMessageDelivery {
+
+    static final private AsciiBuffer ENCODING = new AsciiBuffer("openwire");
+
+    private final Message message;
+    private AsciiBuffer producerId;
+    private OpenWireFormat storeWireFormat;
+    private PersistListener persistListener = null;
+    private final int size;
+
+    public interface PersistListener {
+        public void onMessagePersisted(OpenWireMessageDelivery delivery);
+    }
+
+    public OpenWireMessageDelivery(Message message) {
+        this.message = message;
+        this.size = message.getSize();
+    }
+
+    public void setPersistListener(PersistListener listener) {
+        persistListener = listener;
+    }
+
+    public DestinationDTO[] getDestination() {
+        return message.getDestination().toDestination();
+    }
+
+    public int getMemorySize() {
+        return size;
+    }
+
+    public int getPriority() {
+        return message.getPriority();
+    }
+
+    public AsciiBuffer getMsgId() {
+        return new AsciiBuffer(message.getMessageId().toString());
+    }
+
+    public AsciiBuffer getProducerId() {
+        if (producerId == null) {
+            producerId = new AsciiBuffer(message.getProducerId().toString());
+        }
+        return producerId;
+    }
+
+    public Message getMessage() {
+        return message;
+    }
+
+    public <T> T asType(Class<T> type) {
+        if (type == Message.class) {
+            return type.cast(message);
+        }
+        // TODO: is this right?
+        if (message.getClass().isAssignableFrom(type)) {
+            return type.cast(message);
+        }
+        return null;
+    }
+
+    public boolean isPersistent() {
+        return message.isPersistent();
+    }
+
+    public final void onMessagePersisted() {
+        if (persistListener != null) {
+            persistListener.onMessagePersisted(this);
+            persistListener = null;
+        }
+    }
+
+    public final boolean isResponseRequired() {
+        return message.isResponseRequired();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.broker.MessageDelivery#getTTE()
+     */
+    public long getExpiration() {
+        return message.getExpiration();
+    }
+
+//    public MessageEvaluationContext createMessageEvaluationContext() {
+//        return new OpenwireMessageEvaluationContext(message);
+//    }
+
+    public String toString() {
+        return message.getMessageId().toString();
+    }
+
+    public AsciiBuffer getStoreEncoding() {
+        return ENCODING;
+    }
+    
+    public Buffer getStoreEncoded() {
+        Buffer bytes;
+        try {
+            bytes = storeWireFormat.marshal(message);
+        } catch (IOException e) {
+            return null;
+        }
+        return bytes;
+    }
+    
+
+    public void setStoreWireFormat(OpenWireFormat wireFormat) {
+        this.storeWireFormat = wireFormat;
+    }
+    
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireMessageEvaluationContext.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireMessageEvaluationContext.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireMessageEvaluationContext.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireMessageEvaluationContext.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.openwire.support.broker.openwire;
+
+public class OpenwireMessageEvaluationContext {
+//
+//    private Message message;
+//
+//    public OpenwireMessageEvaluationContext() {
+//    }
+//    public OpenwireMessageEvaluationContext(Message message) {
+//        this.message = message;
+//    }
+//
+//    public Message getMessage() {
+//        return message;
+//    }
+//
+//    public void setMessage(Message message) {
+//        this.message = message;
+//    }
+//
+//
+//    public Expression getPropertyExpression(final String name) {
+//        Expression expression = JMS_PROPERTY_EXPRESSIONS.get(name);
+//        if( expression == null ) {
+//            expression = new Expression() {
+//                public Object evaluate(MessageEvaluationContext mc) throws FilterException {
+//                    try {
+//                        Message message = ((OpenwireMessageEvaluationContext) mc).message;
+//                        return message.getProperty(name);
+//                    } catch (IOException e) {
+//                        throw new FilterException(e);
+//                    }
+//                }
+//            };
+//        }
+//        return expression;
+//    }
+//
+//    public <T> T getBodyAs(Class<T> type) throws FilterException {
+//        try {
+//            if( type == String.class ) {
+//                if ( message instanceof ActiveMQTextMessage ) {
+//                    return type.cast(((ActiveMQTextMessage)message).getText());
+//                }
+//            }
+//            if( type == Buffer.class ) {
+//                if ( message instanceof ActiveMQBytesMessage ) {
+//                    ActiveMQBytesMessage bm = ((ActiveMQBytesMessage)message);
+//                    byte data[] = new byte[(int) bm.getBodyLength()];
+//                    bm.readBytes(data);
+//                    return type.cast(new Buffer(data));
+//                }
+//            }
+//            return null;
+//        } catch (JMSException e) {
+//            throw new FilterException(e);
+//        }
+//    }
+//
+//    public <T> T getDestination() {
+//        return (T) destination;
+//    }
+//    public Object getLocalConnectionId() {
+//        throw new UnsupportedOperationException();
+//    }
+//    public void setDestination(Object destination) {
+//        this.destination = destination;
+//    }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireProtocolHandler.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireProtocolHandler.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireProtocolHandler.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,1057 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support.broker.openwire;
+
+//import java.io.IOException;
+//import java.util.ArrayList;
+//import java.util.HashMap;
+//import java.util.LinkedList;
+//import java.util.concurrent.ConcurrentHashMap;
+//
+//import javax.jms.JMSException;
+//import javax.transaction.xa.XAException;
+//import javax.transaction.xa.Xid;
+//
+//import org.apache.activemq.apollo.WindowLimiter;
+//import org.apache.activemq.apollo.broker.Broker;
+//import org.apache.activemq.apollo.broker.BrokerConnection;
+//import org.apache.activemq.apollo.broker.BrokerMessageDelivery;
+//import org.apache.activemq.apollo.broker.BrokerSubscription;
+//import org.apache.activemq.apollo.broker.Destination;
+//import org.apache.activemq.apollo.broker.MessageDelivery;
+//import org.apache.activemq.apollo.broker.ProtocolHandler;
+//import org.apache.activemq.apollo.broker.Router;
+//import org.apache.activemq.apollo.broker.Transaction;
+//import org.apache.activemq.apollo.broker.VirtualHost;
+//import org.apache.activemq.apollo.broker.XidImpl;
+//import org.apache.activemq.apollo.broker.Transaction.TransactionListener;
+//import org.apache.activemq.broker.openwire.OpenWireMessageDelivery.PersistListener;
+//import org.apache.activemq.broker.store.Store.MessageRecord;
+//import org.apache.activemq.command.ActiveMQDestination;
+//import org.apache.activemq.command.BrokerId;
+//import org.apache.activemq.command.BrokerInfo;
+//import org.apache.activemq.command.Command;
+//import org.apache.activemq.command.ConnectionControl;
+//import org.apache.activemq.command.ConnectionError;
+//import org.apache.activemq.command.ConnectionId;
+//import org.apache.activemq.command.ConnectionInfo;
+//import org.apache.activemq.command.ConsumerControl;
+//import org.apache.activemq.command.ConsumerId;
+//import org.apache.activemq.command.ConsumerInfo;
+//import org.apache.activemq.command.ControlCommand;
+//import org.apache.activemq.command.DestinationInfo;
+//import org.apache.activemq.command.ExceptionResponse;
+//import org.apache.activemq.command.FlushCommand;
+//import org.apache.activemq.command.KeepAliveInfo;
+//import org.apache.activemq.command.Message;
+//import org.apache.activemq.command.MessageAck;
+//import org.apache.activemq.command.MessageDispatch;
+//import org.apache.activemq.command.MessageDispatchNotification;
+//import org.apache.activemq.command.MessageId;
+//import org.apache.activemq.command.MessagePull;
+//import org.apache.activemq.command.ProducerAck;
+//import org.apache.activemq.command.ProducerId;
+//import org.apache.activemq.command.ProducerInfo;
+//import org.apache.activemq.command.RemoveInfo;
+//import org.apache.activemq.command.RemoveSubscriptionInfo;
+//import org.apache.activemq.command.Response;
+//import org.apache.activemq.command.SessionId;
+//import org.apache.activemq.command.SessionInfo;
+//import org.apache.activemq.command.ShutdownInfo;
+//import org.apache.activemq.command.TransactionId;
+//import org.apache.activemq.command.TransactionInfo;
+//import org.apache.activemq.command.WireFormatInfo;
+//import org.apache.activemq.dispatch.DispatchPriority;
+//import org.apache.activemq.filter.BooleanExpression;
+//import org.apache.activemq.apollo.filter.FilterException;
+//import org.apache.activemq.apollo.filter.LogicExpression;
+//import org.apache.activemq.apollo.filter.NoLocalExpression;
+//import org.apache.activemq.flow.Flow;
+//import org.apache.activemq.flow.FlowController;
+//import org.apache.activemq.flow.IFlowController;
+//import org.apache.activemq.flow.IFlowLimiter;
+//import org.apache.activemq.flow.IFlowResource;
+//import org.apache.activemq.flow.ISourceController;
+//import org.apache.activemq.flow.SizeLimiter;
+//import org.apache.activemq.flow.ISinkController.FlowControllable;
+//import org.apache.activemq.openwire.OpenWireFormat;
+//import org.apache.activemq.selector.SelectorParser;
+//import org.apache.activemq.state.CommandVisitor;
+//import org.apache.activemq.transport.WireFormatNegotiator;
+//import org.fusesource.hawtbuf.Buffer;
+//import org.apache.activemq.wireformat.WireFormat;
+//import org.fusesource.hawtdispatch.Dispatch;
+
+public class OpenwireProtocolHandler { // implements ProtocolHandler, PersistListener {
+
+//    protected final HashMap<ConnectionId, ClientContext> connections = new HashMap<ConnectionId, ClientContext>();
+//    protected final HashMap<SessionId, ClientContext> sessions = new HashMap<SessionId, ClientContext>();
+//    protected final HashMap<ProducerId, ProducerContext> producers = new HashMap<ProducerId, ProducerContext>();
+//    protected final HashMap<ConsumerId, ConsumerContext> consumers = new HashMap<ConsumerId, ConsumerContext>();
+//
+//    protected final ConcurrentHashMap<TransactionId, Transaction> transactions = new ConcurrentHashMap<TransactionId, Transaction>();
+//
+//    protected BrokerConnection connection;
+//    private OpenWireFormat wireFormat;
+//    private OpenWireFormat storeWireFormat;
+//    private Router router;
+//    private VirtualHost host;
+//    private final CommandVisitor visitor;
+//
+//    ArrayList<ActiveMQDestination> temporaryDestinations = new ArrayList<ActiveMQDestination>();
+//
+//    public OpenwireProtocolHandler() {
+//        setStoreWireFormat(new OpenWireFormat());
+//
+//        visitor = new CommandVisitor() {
+//
+//            // /////////////////////////////////////////////////////////////////
+//            // Methods that keep track of the client state
+//            // /////////////////////////////////////////////////////////////////
+//            public Response processAddConnection(final ConnectionInfo info) throws Exception {
+//                if (!connections.containsKey(info.getConnectionId())) {
+//
+//                    ClientContext connection = new AbstractClientContext<MessageDelivery>(info.getConnectionId().toString(), null) {
+//                        ConnectionInfo connectionInfo = info;
+//
+//                        public void close() {
+//                            super.close();
+//                            connections.remove(connectionInfo.getConnectionId());
+//                        }
+//                    };
+//                    connections.put(info.getConnectionId(), connection);
+//
+//                    // Connections have an implicitly created "default" session identified by session id = -1
+//                    SessionId sessionId = new SessionId(info.getConnectionId(), -1);
+//                    addSession(sessionId, connection);
+//                }
+//                return ack(info);
+//            }
+//
+//            public Response processAddSession(final SessionInfo info) throws Exception {
+//                final SessionId sessionId = info.getSessionId();
+//                ClientContext connection = connections.get(sessionId.getParentId());
+//                if (connection == null) {
+//                    throw new IllegalStateException(host.getHostName() + " Cannot add a session to a connection that had not been registered: " + sessionId.getParentId());
+//                }
+//
+//                if (!sessions.containsKey(sessionId)) {
+//                    addSession(sessionId, connection);
+//                }
+//
+//                return ack(info);
+//            }
+//
+//            private void addSession(final SessionId sessionId, ClientContext connection) {
+//                ClientContext session = new AbstractClientContext<MessageDelivery>(sessionId.toString(), connection) {
+//                    public void close() {
+//                        super.close();
+//                        sessions.remove(sessionId);
+//                    }
+//                };
+//                sessions.put(sessionId, session);
+//            }
+//
+//            public Response processAddProducer(ProducerInfo info) throws Exception {
+//                ClientContext session = sessions.get(info.getProducerId().getParentId());
+//                if (session == null) {
+//                    throw new IllegalStateException(host.getHostName() + " Cannot add a producer to a session that had not been registered: " + info.getProducerId().getParentId());
+//                }
+//                if (!producers.containsKey(info.getProducerId())) {
+//                    ProducerContext producer = new ProducerContext(info, session);
+//                }
+//                return ack(info);
+//            }
+//
+//            public Response processAddConsumer(ConsumerInfo info) throws Exception {
+//                ClientContext session = sessions.get(info.getConsumerId().getParentId());
+//                if (session == null) {
+//                    throw new IllegalStateException(host.getHostName() + " Cannot add a consumer to a session that had not been registered: " + info.getConsumerId().getParentId());
+//                }
+//
+//                if (!consumers.containsKey(info.getConsumerId())) {
+//                    ConsumerContext ctx = new ConsumerContext(info, session);
+//                    ctx.start();
+//                }
+//
+//                return ack(info);
+//            }
+//
+//            public Response processRemoveConnection(RemoveInfo remove, ConnectionId info, long arg1) throws Exception {
+//                ClientContext cc = connections.get(info);
+//                if (cc != null) {
+//                    cc.close();
+//                }
+//                ack(remove);
+//                return null;
+//            }
+//
+//            public Response processRemoveSession(RemoveInfo remove, SessionId info, long arg1) throws Exception {
+//                ClientContext cc = sessions.get(info);
+//                if (cc != null) {
+//                    cc.close();
+//                }
+//                ack(remove);
+//                return null;
+//            }
+//
+//            public Response processRemoveProducer(RemoveInfo remove, ProducerId info) throws Exception {
+//                ClientContext cc = producers.get(info);
+//                if (cc != null) {
+//                    cc.close();
+//                }
+//                ack(remove);
+//                return null;
+//            }
+//
+//            public Response processRemoveConsumer(RemoveInfo remove, ConsumerId info, long arg1) throws Exception {
+//                ClientContext cc = consumers.get(info);
+//                if (cc != null) {
+//                    cc.close();
+//                }
+//                ack(remove);
+//                return null;
+//            }
+//
+//            // /////////////////////////////////////////////////////////////////
+//            // Message Processing Methods.
+//            // /////////////////////////////////////////////////////////////////
+//            public Response processMessage(Message info) throws Exception {
+//                if (info.getOriginalDestination() == null) {
+//                    info.setOriginalDestination(info.getDestination());
+//                }
+//
+//                ProducerId producerId = info.getProducerId();
+//                ProducerContext producerContext = producers.get(producerId);
+//
+//                OpenWireMessageDelivery md = new OpenWireMessageDelivery(info);
+//                md.setStoreWireFormat(storeWireFormat);
+//                TransactionId tid = info.getTransactionId();
+//                if (tid != null) {
+//                    Transaction t = locateTransaction(tid, true);
+//                    md.setTransactionId(t.getTid());
+//                } else {
+//                    md.setPersistListener(OpenwireProtocolHandler.this);
+//                }
+//
+//                // Only producers that are not using a window will block,
+//                // and if it blocks.
+//                // yes we block the connection's read thread. yes other
+//                // sessions will not get
+//                // serviced while we block here. The producer is depending
+//                // on TCP flow
+//                // control to slow him down so we have to stop ready from
+//                // the socket at this
+//                // point.
+//                while (!producerContext.controller.offer(md, null)) {
+//                    producerContext.controller.waitForFlowUnblock();
+//                }
+//
+//                if (tid != null) {
+//                    return ack(info);
+//                } else {
+//                    return null;
+//                }
+//            }
+//
+//            public Response processMessageAck(MessageAck info) throws Exception {
+//                ConsumerContext ctx = consumers.get(info.getConsumerId());
+//                ctx.ack(info);
+//                return ack(info);
+//            }
+//
+//            // Only used when client prefetch is set to zero.
+//            public Response processMessagePull(MessagePull info) throws Exception {
+//                return ack(info);
+//            }
+//
+//            // /////////////////////////////////////////////////////////////////
+//            // Control Methods
+//            // /////////////////////////////////////////////////////////////////
+//            public Response processWireFormat(WireFormatInfo info) throws Exception {
+//
+//                // Negotiate the openwire encoding options.
+//                WireFormatNegotiator wfn = new WireFormatNegotiator(connection.getTransport(), wireFormat, 1);
+//                wfn.sendWireFormat();
+//                wfn.negociate(info);
+//
+//                // Now that the encoding is negotiated.. let the client know
+//                // the details about this
+//                // broker.
+//                BrokerInfo brokerInfo = new BrokerInfo();
+//                Broker broker = connection.getBroker();
+//                brokerInfo.setBrokerId(new BrokerId(broker.getName()));
+//                brokerInfo.setBrokerName(broker.getName());
+//                if (!broker.getConnectUris().isEmpty()) {
+//                    brokerInfo.setBrokerURL(broker.getConnectUris().get(0));
+//                }
+//                connection.write(brokerInfo);
+//                return ack(info);
+//            }
+//
+//            public Response processShutdown(ShutdownInfo info) throws Exception {
+//                connection.setStopping();
+//                return ack(info);
+//            }
+//
+//            public Response processKeepAlive(KeepAliveInfo info) throws Exception {
+//                if (info.isResponseRequired()) {
+//                    info.setResponseRequired(false);
+//                    connection.write(info);
+//                }
+//                return null;
+//            }
+//
+//            public Response processFlush(FlushCommand info) throws Exception {
+//                return ack(info);
+//            }
+//
+//            public Response processConnectionControl(ConnectionControl info) throws Exception {
+//                if (info != null) {
+//                    if (info.isFaultTolerant()) {
+//                        throw new UnsupportedOperationException("Fault Tolerance");
+//                    }
+//                }
+//                return ack(info);
+//            }
+//
+//            public Response processConnectionError(ConnectionError info) throws Exception {
+//                return ack(info);
+//            }
+//
+//            public Response processConsumerControl(ConsumerControl info) throws Exception {
+//                return ack(info);
+//            }
+//
+//            // /////////////////////////////////////////////////////////////////
+//            // Methods for server management
+//            // /////////////////////////////////////////////////////////////////
+//            public Response processAddDestination(DestinationInfo info) throws Exception {
+//                ActiveMQDestination destination = info.getDestination();
+//                if (destination.isTemporary()) {
+//                    // Keep track of it so that we can remove them this connection
+//                    // shuts down.
+//                    temporaryDestinations.add(destination);
+//                }
+//                host.createQueue(destination);
+//                return ack(info);
+//            }
+//
+//            public Response processRemoveDestination(DestinationInfo info) throws Exception {
+//                throw new UnsupportedOperationException();
+//            }
+//
+//            public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
+//                throw new UnsupportedOperationException();
+//            }
+//
+//            public Response processControlCommand(ControlCommand info) throws Exception {
+//                throw new UnsupportedOperationException();
+//            }
+//
+//            // /////////////////////////////////////////////////////////////////
+//            // Methods for transaction management
+//            // /////////////////////////////////////////////////////////////////
+//            public Response processBeginTransaction(TransactionInfo info) throws Exception {
+//                TransactionId tid = info.getTransactionId();
+//
+//                Transaction t = locateTransaction(tid, false);
+//                if (t == null) {
+//
+//                    Buffer xid = null;
+//                    if (tid.isXATransaction()) {
+//                        xid = XidImpl.toBuffer((Xid) tid);
+//                    }
+//                    t = host.getTransactionManager().createTransaction(xid);
+//                    transactions.put(tid, t);
+//                }
+//
+//                return ack(info);
+//            }
+//
+//            public Response processCommitTransactionOnePhase(final TransactionInfo info) throws Exception {
+//                final TransactionId tid = info.getTransactionId();
+//                Transaction t = locateTransaction(tid, true);
+//
+//                TransactionListener listener = null;
+//                if (info.isResponseRequired()) {
+//                    listener = new TransactionListener() {
+//
+//                        @Override
+//                        public void onCommit(Transaction t) {
+//                            transactions.remove(tid);
+//                            ack(info);
+//                        }
+//
+//                        @Override
+//                        public void onRollback(Transaction t) {
+//                            transactions.remove(tid);
+//                            ExceptionResponse r = new ExceptionResponse(new XAException("RolledBack"));
+//                            r.setCorrelationId(info.getCommandId());
+//                            connection.write(r);
+//                        }
+//
+//                    };
+//                }
+//
+//                t.commit(true, listener);
+//                transactions.remove(tid);
+//                return null;
+//            }
+//
+//            public Response processCommitTransactionTwoPhase(final TransactionInfo info) throws Exception {
+//                final TransactionId tid = info.getTransactionId();
+//                Transaction t = locateTransaction(tid, true);
+//
+//                TransactionListener listener = null;
+//                if (info.isResponseRequired()) {
+//                    listener = new TransactionListener() {
+//
+//                        @Override
+//                        public void onCommit(Transaction t) {
+//                            transactions.remove(tid);
+//                            ack(info);
+//                        }
+//
+//                        @Override
+//                        public void onRollback(Transaction t) {
+//                            transactions.remove(tid);
+//                            ExceptionResponse r = new ExceptionResponse(new XAException("RolledBack"));
+//                            r.setCorrelationId(info.getCommandId());
+//                            connection.write(r);
+//                        }
+//
+//                    };
+//                }
+//
+//                t.commit(false, listener);
+//                return null;
+//            }
+//
+//            public Response processEndTransaction(TransactionInfo info) throws Exception {
+//                //Shouldn't actually do anything, send by client to ensure that it is
+//                //in sync with broker transaction state.
+//                //TODO need to investigate whether this should wait for prior transaction
+//                //state to flush out?
+//                new UnsupportedOperationException().printStackTrace();
+//                return ack(info);
+//            }
+//
+//            public Response processForgetTransaction(TransactionInfo info) throws Exception {
+//                return processRollbackTransaction(info);
+//            }
+//
+//            public Response processPrepareTransaction(final TransactionInfo info) throws Exception {
+//                final TransactionId tid = info.getTransactionId();
+//                Transaction t = locateTransaction(tid, true);
+//
+//                TransactionListener listener = null;
+//                if (info.isResponseRequired()) {
+//                    listener = new TransactionListener() {
+//
+//                        @Override
+//                        public void onPrepared(Transaction t) {
+//                            ack(info);
+//                        }
+//                    };
+//                }
+//                t.prepare(listener);
+//                return null;
+//            }
+//
+//            public Response processRecoverTransactions(TransactionInfo info) throws Exception {
+//                //TODO
+//                throw new UnsupportedOperationException();
+//            }
+//
+//            public Response processRollbackTransaction(final TransactionInfo info) throws Exception {
+//                final TransactionId tid = info.getTransactionId();
+//                Transaction t = locateTransaction(tid, true);
+//
+//                TransactionListener listener = null;
+//                if (info.isResponseRequired()) {
+//                    listener = new TransactionListener() {
+//
+//                        @Override
+//                        public void onRollback(Transaction t) {
+//                            ack(info);
+//                        }
+//                    };
+//                }
+//                t.rollback(listener);
+//                transactions.remove(tid);
+//                return null;
+//            }
+//
+//            // /////////////////////////////////////////////////////////////////
+//            // Methods for cluster operations
+//            // These commands are sent to the broker when it's acting like a
+//            // client to another broker.
+//            // /////////////////////////////////////////////////////////////////
+//            public Response processBrokerInfo(BrokerInfo info) throws Exception {
+//                throw new UnsupportedOperationException();
+//            }
+//
+//            public Response processMessageDispatch(MessageDispatch info) throws Exception {
+//                throw new UnsupportedOperationException();
+//            }
+//
+//            public Response processMessageDispatchNotification(MessageDispatchNotification info) throws Exception {
+//                throw new UnsupportedOperationException();
+//            }
+//
+//            public Response processProducerAck(ProducerAck info) throws Exception {
+//                return ack(info);
+//            }
+//        };
+//    }
+//
+//    private Transaction locateTransaction(TransactionId tid, boolean expected) throws XAException, JMSException {
+//        Transaction t;
+//
+//        if (tid.isLocalTransaction()) {
+//            t = transactions.get(tid);
+//        } else {
+//            t = host.getTransactionManager().getXATransaction(XidImpl.toBuffer((Xid) tid));
+//        }
+//
+//        if (t == null && expected) {
+//            if (tid.isXATransaction()) {
+//                XAException e = new XAException("Transaction '" + tid + "' has not been started.");
+//                e.errorCode = XAException.XAER_NOTA;
+//                throw e;
+//            } else {
+//                throw new JMSException("Transaction '" + tid + "' has not been started.");
+//            }
+//        }
+//        return t;
+//    }
+//
+//    public void start() throws Exception {
+//
+//    }
+//
+//    public void stop() throws Exception {
+//    }
+//
+//    public void onCommand(Object o) {
+//        boolean responseRequired = false;
+//        int commandId = 0;
+//        try {
+//            Command command = (Command) o;
+//            commandId = command.getCommandId();
+//            responseRequired = command.isResponseRequired();
+//            //System.out.println(o);
+//            command.visit(visitor);
+//        } catch (Exception e) {
+//            if (responseRequired) {
+//                ExceptionResponse response = new ExceptionResponse(e);
+//                response.setCorrelationId(commandId);
+//                connection.write(response);
+//            } else {
+//                connection.onException(e);
+//            }
+//        } catch (Throwable t) {
+//            if (responseRequired) {
+//                ExceptionResponse response = new ExceptionResponse(t);
+//                response.setCorrelationId(commandId);
+//                connection.write(response);
+//            } else {
+//                connection.onException(new RuntimeException(t));
+//            }
+//        }
+//    }
+//
+//    public void onException(Exception error) {
+//        if (!connection.isStopping()) {
+//            error.printStackTrace();
+//            new Thread() {
+//                @Override
+//                public void run() {
+//                    try {
+//                        connection.stop();
+//                    } catch (Exception ignore) {
+//                    }
+//                }
+//            }.start();
+//        }
+//    }
+//
+//    public void onMessagePersisted(OpenWireMessageDelivery delivery) {
+//        // TODO This method should not block:
+//        // Either add to output queue, or spin off in a separate thread.
+//        ack(delivery.getMessage());
+//    }
+//
+//    Response ack(Command command) {
+//        if (command.isResponseRequired()) {
+//            Response rc = new Response();
+//            rc.setCorrelationId(command.getCommandId());
+//            connection.write(rc);
+//        }
+//        return null;
+//    }
+//
+//    // /////////////////////////////////////////////////////////////////
+//    // Internal Support Methods
+//    // /////////////////////////////////////////////////////////////////
+//
+//    class ProducerContext extends AbstractClientContext<OpenWireMessageDelivery> {
+//
+//        protected final Object inboundMutex = new Object();
+//        private IFlowController<OpenWireMessageDelivery> controller;
+//        private final ProducerInfo info;
+//
+//        public ProducerContext(final ProducerInfo info, ClientContext parent) {
+//            super(info.getProducerId().toString(), parent);
+//            this.info = info;
+//            producers.put(info.getProducerId(), this);
+//            final Flow flow = new Flow("broker-" + super.getResourceName() + "-inbound", false);
+//
+//            // Openwire only uses credit windows at the producer level for
+//            // producers that request the feature.
+//            IFlowLimiter<OpenWireMessageDelivery> limiter;
+//            if (info.getWindowSize() > 0) {
+//                limiter = new WindowLimiter<OpenWireMessageDelivery>(false, flow, info.getWindowSize(), info.getWindowSize() / 2) {
+//                    @Override
+//                    protected void sendCredit(int credit) {
+//                        ProducerAck ack = new ProducerAck(info.getProducerId(), credit);
+//                        connection.write(ack);
+//                    }
+//                };
+//            } else {
+//
+//                limiter = new SizeLimiter<OpenWireMessageDelivery>(1024*64, 1024*32);
+//            }
+//
+//            controller = new FlowController<OpenWireMessageDelivery>(new FlowControllable<OpenWireMessageDelivery>() {
+//                public void flowElemAccepted(ISourceController<OpenWireMessageDelivery> controller, OpenWireMessageDelivery msg) {
+//                    router.route(msg, controller, true);
+//                    controller.elementDispatched(msg);
+//                }
+//
+//                public IFlowResource getFlowResource() {
+//                    return ProducerContext.this;
+//                }
+//            }, flow, limiter, inboundMutex);
+//
+//            super.onFlowOpened(controller);
+//        }
+//
+//        public void close() {
+//            super.close();
+//            producers.remove(info);
+//        }
+//    }
+//
+//    class ConsumerContext extends AbstractClientContext<MessageDelivery> implements ProtocolHandler.ConsumerContext {
+//
+//        private final ConsumerInfo info;
+//        private String name;
+//        private BooleanExpression selector;
+//        private boolean isDurable;
+//        private boolean isQueueReceiver;
+//
+//        private final FlowController<MessageDelivery> controller;
+//        private final WindowLimiter<MessageDelivery> limiter;
+//
+//        private HashMap<MessageId, SubscriptionDelivery<MessageDelivery>> pendingMessages = new HashMap<MessageId, SubscriptionDelivery<MessageDelivery>>();
+//        private LinkedList<MessageId> pendingMessageIds = new LinkedList<MessageId>();
+//        private BrokerSubscription brokerSubscription;
+//        private int borrowedLimterCredits;
+//
+//        public ConsumerContext(final ConsumerInfo info, ClientContext parent) throws Exception {
+//            super(info.getConsumerId().toString(), parent);
+//            this.info = info;
+//            this.name = info.getConsumerId().toString();
+//            consumers.put(info.getConsumerId(), this);
+//
+//            Flow flow = new Flow("broker-" + name + "-outbound", false);
+//            selector = parseSelector(info);
+//            limiter = new WindowLimiter<MessageDelivery>(true, flow, info.getPrefetchSize(), info.getPrefetchSize() / 2) {
+//                @Override
+//                public int getElementSize(MessageDelivery m) {
+//                    return 1;
+//                }
+//            };
+//
+//            isQueueReceiver = info.getDestination().isQueue();
+//            if (info.getSubscriptionName() != null) {
+//                isDurable = true;
+//            }
+//            controller = new FlowController<MessageDelivery>(null, flow, limiter, this);
+//            controller.useOverFlowQueue(false);
+//            controller.setExecutor(Dispatch.getGlobalQueue());
+//            super.onFlowOpened(controller);
+//        }
+//
+//        public void start() throws Exception {
+//            brokerSubscription = host.createSubscription(this);
+//            brokerSubscription.connect(this);
+//        }
+//
+//        public boolean offer(final MessageDelivery message, ISourceController<?> source, SubscriptionDelivery<MessageDelivery> callback) {
+//            if (!controller.offer(message, source)) {
+//                return false;
+//            } else {
+//                sendInternal(message, controller, callback);
+//                return true;
+//            }
+//        }
+//
+//        public void add(final MessageDelivery message, ISourceController<?> source, SubscriptionDelivery<MessageDelivery> callback) {
+//            controller.add(message, source);
+//            sendInternal(message, controller, callback);
+//        }
+//
+//        private void sendInternal(final MessageDelivery message, ISourceController<?> controller, SubscriptionDelivery<MessageDelivery> callback) {
+//            Message msg = message.asType(Message.class);
+//            MessageDispatch md = new MessageDispatch();
+//            md.setConsumerId(info.getConsumerId());
+//            md.setMessage(msg);
+//            md.setDestination(msg.getDestination());
+//            // Add to the pending list if persistent and we are durable:
+//            if (callback != null) {
+//                if (callback.isRedelivery()) {
+//                    md.setRedeliveryCounter(1);
+//                }
+//                synchronized (this) {
+//                    Object old = pendingMessages.put(msg.getMessageId(), callback);
+//                    if (old != null) {
+//                        new Exception("Duplicate message id: " + msg.getMessageId()).printStackTrace();
+//                    }
+//                    pendingMessageIds.add(msg.getMessageId());
+//                    connection.write(md);
+//                }
+//            } else {
+//                connection.write(md);
+//            }
+//        }
+//
+//        public void ack(MessageAck info) throws XAException, JMSException {
+//            // TODO: The pending message queue could probably be optimized to
+//            // avoid having to create a new list here.
+//            int flowCredit = info.getMessageCount();
+//            if (info.isDeliveredAck()) {
+//                // This ack is just trying to expand the flow control window size without actually
+//                // acking the message.  Keep track of how many limiter credits we borrow since they need
+//                // to get paid back with real acks later.
+//                borrowedLimterCredits += flowCredit;
+//                limiter.onProtocolCredit(flowCredit);
+//            } else if (info.isStandardAck()) {
+//                TransactionId tid = info.getTransactionId();
+//                Transaction transaction = null;
+//                if (tid != null) {
+//                    transaction = locateTransaction(tid, true);
+//                }
+//
+//                LinkedList<SubscriptionDelivery<MessageDelivery>> acked = new LinkedList<SubscriptionDelivery<MessageDelivery>>();
+//                synchronized (this) {
+//                    MessageId id = info.getLastMessageId();
+//                    if (isDurable() || isQueueReceiver()) {
+//                        while (!pendingMessageIds.isEmpty()) {
+//                            MessageId pendingId = pendingMessageIds.getFirst();
+//                            SubscriptionDelivery<MessageDelivery> callback = pendingMessages.remove(pendingId);
+//                            acked.add(callback);
+//                            pendingMessageIds.removeFirst();
+//                            if (pendingId.equals(id)) {
+//                                break;
+//                            }
+//                        }
+//                    }
+//
+//                    // Did we have DeliveredAcks previously sent?  Then the
+//                    // the flow window has already been credited.  We need to
+//                    // pay back the borrowed limiter credits before giving
+//                    // credits directly to the limiter.
+//                    if (borrowedLimterCredits > 0) {
+//                        if (flowCredit > borrowedLimterCredits) {
+//                            flowCredit -= borrowedLimterCredits;
+//                            borrowedLimterCredits = 0;
+//                        } else {
+//                            borrowedLimterCredits -= flowCredit;
+//                            flowCredit = 0;
+//                        }
+//                    }
+//                    limiter.onProtocolCredit(flowCredit);
+//                }
+//
+//                if (transaction == null) {
+//                    // Delete outside of synchronization on queue to avoid contention
+//                    // with enqueueing threads.
+//                    for (SubscriptionDelivery<MessageDelivery> callback : acked) {
+//                        callback.acknowledge();
+//                    }
+//                } else {
+//                    // Delete outside of synchronization on queue to avoid contention
+//                    // with enqueueing threads.
+//                    for (SubscriptionDelivery<MessageDelivery> callback : acked) {
+//                        transaction.addAck(callback);
+//                    }
+//                }
+//            }
+//        }
+//
+//        public boolean hasSelector() {
+//            return selector != null;
+//        }
+//
+//        public boolean matches(MessageDelivery message) {
+//            Message msg = message.asType(Message.class);
+//            if (msg == null) {
+//                return false;
+//            }
+//
+//            OpenwireMessageEvaluationContext selectorContext = new OpenwireMessageEvaluationContext(msg);
+//            selectorContext.setDestination(info.getDestination());
+//            try {
+//                return (selector == null || selector.matches(selectorContext));
+//            } catch (FilterException e) {
+//                e.printStackTrace();
+//                return false;
+//            }
+//        }
+//
+//        public boolean isDurable() {
+//            return info.isDurable();
+//        }
+//
+//        public boolean isQueueReceiver() {
+//            return isQueueReceiver;
+//        }
+//
+//        public boolean isExclusive() {
+//            return info.isExclusive();
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see org.apache.activemq.queue.Subscription#isBrowser()
+//         */
+//        public boolean isBrowser() {
+//            return info.isBrowser();
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.queue.Subscription#isRemoveOnDispatch(java.lang
+//         * .Object)
+//         */
+//        public boolean isRemoveOnDispatch(MessageDelivery elem) {
+//            if (isQueueReceiver()) {
+//                return false;
+//            }
+//            return !elem.isPersistent() || !isDurable;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+//         * #getDestination()
+//         */
+//        public Destination getDestination() {
+//            return info.getDestination();
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+//         * #getJMSSelector()
+//         */
+//        public String getSelectorString() {
+//            return info.getSelector();
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+//         * #getSubscriptionName()
+//         */
+//        public String getSubscriptionName() {
+//            return info.getSubscriptionName();
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+//         * #getFullSelector()
+//         */
+//        public BooleanExpression getSelectorExpression() {
+//            return selector;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+//         * #getJMSSelector()
+//         */
+//        public String getSelector() {
+//            return info.getSelector();
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+//         * #getConnection()
+//         */
+//        public BrokerConnection getConnection() {
+//            return connection;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+//         * #getConsumerId()
+//         */
+//        public String getConsumerId() {
+//            return name;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see org.apache.activemq.flow.IFlowSink#add(java.lang.Object,
+//         * org.apache.activemq.flow.ISourceController)
+//         */
+//        public void add(MessageDelivery message, ISourceController<?> source) {
+//            add(message, source, null);
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see org.apache.activemq.flow.IFlowSink#offer(java.lang.Object,
+//         * org.apache.activemq.flow.ISourceController)
+//         */
+//        public boolean offer(MessageDelivery message, ISourceController<?> source) {
+//            return offer(message, source, null);
+//        }
+//
+//        public boolean autoCreateDestination() {
+//            return true;
+//        }
+//
+//        public String toString() {
+//            return info.getConsumerId().toString();
+//        }
+//
+//        public void close() {
+//            brokerSubscription.disconnect(this);
+//
+//            if (isDurable() || isQueueReceiver()) {
+//                LinkedList<SubscriptionDelivery<MessageDelivery>> unacquired = null;
+//
+//                synchronized (this) {
+//
+//                    unacquired = new LinkedList<SubscriptionDelivery<MessageDelivery>>();
+//                    while (!pendingMessageIds.isEmpty()) {
+//                        MessageId pendingId = pendingMessageIds.getLast();
+//                        SubscriptionDelivery<MessageDelivery> callback = pendingMessages.remove(pendingId);
+//                        unacquired.add(callback);
+//                        pendingMessageIds.removeLast();
+//                    }
+//                    limiter.onProtocolCredit(unacquired.size());
+//                }
+//
+//                if (unacquired != null) {
+//                    // Delete outside of synchronization on queue to avoid contention
+//                    // with enqueueing threads.
+//                    for (SubscriptionDelivery<MessageDelivery> callback : unacquired) {
+//                        callback.unacquire(controller);
+//                    }
+//                }
+//            }
+//
+//            super.close();
+//            consumers.remove(info.getConsumerId());
+//        }
+//
+//        public boolean isPersistent() {
+//            return true;
+//        }
+//    }
+//
+//    private static BooleanExpression parseSelector(ConsumerInfo info) throws FilterException {
+//        BooleanExpression rc = null;
+//        if (info.getSelector() != null) {
+//            rc = SelectorParser.parse(info.getSelector());
+//        }
+//        if (info.isNoLocal()) {
+//            if (rc == null) {
+//                rc = new NoLocalExpression(info.getConsumerId().getConnectionId());
+//            } else {
+//                rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc);
+//            }
+//        }
+//        if (info.getAdditionalPredicate() != null) {
+//            if (rc == null) {
+//                rc = info.getAdditionalPredicate();
+//            } else {
+//                rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc);
+//            }
+//        }
+//        return rc;
+//    }
+//
+//    public BrokerConnection getConnection() {
+//        return connection;
+//    }
+//
+//    public void setConnection(BrokerConnection connection) {
+//        this.connection = connection;
+//        this.host = connection.getBroker().getDefaultVirtualHost();
+//        this.router = host.getRouter();
+//    }
+//
+//    public void setWireFormat(WireFormat wireFormat) {
+//        this.wireFormat = (OpenWireFormat) wireFormat;
+//        setStoreWireFormat(this.wireFormat.copy());
+//    }
+//
+//    private void setStoreWireFormat(OpenWireFormat wireFormat) {
+//        this.storeWireFormat = wireFormat;
+//        storeWireFormat.setVersion(OpenWireFormat.DEFAULT_VERSION);
+//        storeWireFormat.setCacheEnabled(false);
+//        storeWireFormat.setTightEncodingEnabled(false);
+//        storeWireFormat.setSizePrefixDisabled(false);
+//    }
+//
+//    public BrokerMessageDelivery createMessageDelivery(MessageRecord record) throws IOException {
+//        Buffer buf = record.getBuffer();
+//        Message message = (Message) storeWireFormat.unmarshal(new Buffer(buf.data, buf.offset, buf.length));
+//        OpenWireMessageDelivery delivery = new OpenWireMessageDelivery(message);
+//        delivery.setStoreWireFormat(storeWireFormat);
+//        return delivery;
+//    }
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/Destination.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/Destination.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/Destination.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support.broker.region;
+
+
+/**
+ * @version $Revision: 1.12 $
+ */
+public interface Destination {
+
+    int getMinimumMessageSize();
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/Destination.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/MessageReference.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/MessageReference.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/MessageReference.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/MessageReference.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support.broker.region;
+
+import java.io.IOException;
+
+import org.apache.activemq.apollo.openwire.command.ConsumerId;
+import org.apache.activemq.apollo.openwire.command.Message;
+import org.apache.activemq.apollo.openwire.command.MessageId;
+
+/**
+ * Keeps track of a message that is flowing through the Broker.  This 
+ * object may hold a hard reference to the message or only hold the
+ * id of the message if the message has been persisted on in a MessageStore.
+ * 
+ * @version $Revision: 1.15 $
+ */
+public interface MessageReference {
+    
+    MessageId getMessageId();
+    Message getMessageHardRef();
+    Message getMessage() throws IOException;
+    boolean isPersistent();
+    
+    int getRedeliveryCounter();
+    void incrementRedeliveryCounter();
+    
+    int getReferenceCount();
+    
+    int incrementReferenceCount();
+    int decrementReferenceCount();
+    ConsumerId getTargetConsumerId();
+    int getSize();
+    long getExpiration();
+    String getGroupID();
+    int getGroupSequence();
+    
+    /**
+     * Returns true if this message is expired
+     */
+    boolean isExpired();
+
+    /**
+     * Returns true if this message is dropped.
+     */
+    boolean isDropped();
+    
+    /**
+     * @return true if the message is an advisory
+     */
+    boolean isAdvisory();
+    
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/MessageReference.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitor.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitor.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitor.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support.state;
+
+import org.apache.activemq.apollo.openwire.command.BrokerInfo;
+import org.apache.activemq.apollo.openwire.command.ConnectionControl;
+import org.apache.activemq.apollo.openwire.command.ConnectionError;
+import org.apache.activemq.apollo.openwire.command.ConnectionId;
+import org.apache.activemq.apollo.openwire.command.ConnectionInfo;
+import org.apache.activemq.apollo.openwire.command.ConsumerControl;
+import org.apache.activemq.apollo.openwire.command.ConsumerId;
+import org.apache.activemq.apollo.openwire.command.ConsumerInfo;
+import org.apache.activemq.apollo.openwire.command.ControlCommand;
+import org.apache.activemq.apollo.openwire.command.DestinationInfo;
+import org.apache.activemq.apollo.openwire.command.FlushCommand;
+import org.apache.activemq.apollo.openwire.command.KeepAliveInfo;
+import org.apache.activemq.apollo.openwire.command.Message;
+import org.apache.activemq.apollo.openwire.command.MessageAck;
+import org.apache.activemq.apollo.openwire.command.MessageDispatch;
+import org.apache.activemq.apollo.openwire.command.MessageDispatchNotification;
+import org.apache.activemq.apollo.openwire.command.MessagePull;
+import org.apache.activemq.apollo.openwire.command.ProducerAck;
+import org.apache.activemq.apollo.openwire.command.ProducerId;
+import org.apache.activemq.apollo.openwire.command.ProducerInfo;
+import org.apache.activemq.apollo.openwire.command.RemoveInfo;
+import org.apache.activemq.apollo.openwire.command.RemoveSubscriptionInfo;
+import org.apache.activemq.apollo.openwire.command.Response;
+import org.apache.activemq.apollo.openwire.command.SessionId;
+import org.apache.activemq.apollo.openwire.command.SessionInfo;
+import org.apache.activemq.apollo.openwire.command.ShutdownInfo;
+import org.apache.activemq.apollo.openwire.command.TransactionInfo;
+import org.apache.activemq.apollo.openwire.command.WireFormatInfo;
+
+public interface CommandVisitor {
+
+    Response processAddConnection(ConnectionInfo info) throws Exception;
+
+    Response processAddSession(SessionInfo info) throws Exception;
+
+    Response processAddProducer(ProducerInfo info) throws Exception;
+
+    Response processAddConsumer(ConsumerInfo info) throws Exception;
+
+    Response processRemoveConnection(RemoveInfo info, ConnectionId id, long lastDeliveredSequenceId) throws Exception;
+
+    Response processRemoveSession(RemoveInfo info, SessionId id, long lastDeliveredSequenceId) throws Exception;
+
+    Response processRemoveProducer(RemoveInfo info,ProducerId id) throws Exception;
+
+    Response processRemoveConsumer(RemoveInfo info,ConsumerId id, long lastDeliveredSequenceId) throws Exception;
+
+    Response processAddDestination(DestinationInfo info) throws Exception;
+
+    Response processRemoveDestination(DestinationInfo info) throws Exception;
+
+    Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception;
+
+    Response processMessage(Message send) throws Exception;
+
+    Response processMessageAck(MessageAck ack) throws Exception;
+
+    Response processMessagePull(MessagePull pull) throws Exception;
+
+    Response processBeginTransaction(TransactionInfo info) throws Exception;
+
+    Response processPrepareTransaction(TransactionInfo info) throws Exception;
+
+    Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception;
+
+    Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception;
+
+    Response processRollbackTransaction(TransactionInfo info) throws Exception;
+
+    Response processWireFormat(WireFormatInfo info) throws Exception;
+
+    Response processKeepAlive(KeepAliveInfo info) throws Exception;
+
+    Response processShutdown(ShutdownInfo info) throws Exception;
+
+    Response processFlush(FlushCommand command) throws Exception;
+
+    Response processBrokerInfo(BrokerInfo info) throws Exception;
+
+    Response processRecoverTransactions(TransactionInfo info) throws Exception;
+
+    Response processForgetTransaction(TransactionInfo info) throws Exception;
+
+    Response processEndTransaction(TransactionInfo info) throws Exception;
+
+    Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception;
+
+    Response processProducerAck(ProducerAck ack) throws Exception;
+
+    Response processMessageDispatch(MessageDispatch dispatch) throws Exception;
+
+    Response processControlCommand(ControlCommand command) throws Exception;
+
+    Response processConnectionError(ConnectionError error) throws Exception;
+
+    Response processConnectionControl(ConnectionControl control) throws Exception;
+
+    Response processConsumerControl(ConsumerControl control) throws Exception;
+
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitor.java
------------------------------------------------------------------------------
    svn:executable = *