You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jclouds.apache.org by ev...@apache.org on 2013/11/15 18:00:00 UTC

git commit: The create and stream methods in the Message API for OpenStack Marconi.

Updated Branches:
  refs/heads/master e052acb29 -> 22bf21f5b


The create and stream methods in the Message API for OpenStack Marconi.


Project: http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/commit/22bf21f5
Tree: http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/tree/22bf21f5
Diff: http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/diff/22bf21f5

Branch: refs/heads/master
Commit: 22bf21f5b89e208c61929516ba47651392eb4945
Parents: e052acb
Author: Everett Toews <ev...@rackspace.com>
Authored: Fri Oct 18 17:29:51 2013 -0500
Committer: Everett Toews <ev...@rackspace.com>
Committed: Fri Nov 15 10:59:41 2013 -0600

----------------------------------------------------------------------
 .../openstack/marconi/v1/MarconiApi.java        |  17 +-
 .../marconi/v1/MarconiApiMetadata.java          |   1 +
 .../openstack/marconi/v1/domain/Aged.java       |  50 +++--
 .../marconi/v1/domain/CreateMessage.java        | 133 +++++++++++++
 .../openstack/marconi/v1/domain/Message.java    | 159 +++++++++++++++
 .../marconi/v1/domain/MessageStream.java        |  66 +++++++
 .../marconi/v1/domain/MessagesCreated.java      | 112 +++++++++++
 .../marconi/v1/domain/MessagesStats.java        |  21 +-
 .../openstack/marconi/v1/domain/QueueStats.java |  45 ++++-
 .../marconi/v1/features/MessageApi.java         |  80 ++++++++
 .../openstack/marconi/v1/features/QueueApi.java |   7 +
 .../marconi/v1/functions/ParseMessages.java     | 103 ++++++++++
 .../v1/functions/ParseMessagesCreated.java      |  55 ++++++
 .../marconi/v1/functions/ParseQueueStats.java   |  71 +++++++
 .../marconi/v1/options/StreamOptions.java       | 119 +++++++++++
 .../marconi/v1/features/MessageApiLiveTest.java | 147 ++++++++++++++
 .../marconi/v1/features/MessageApiMockTest.java | 198 +++++++++++++++++++
 .../marconi/v1/features/QueueApiLiveTest.java   |  30 +++
 .../marconi/v1/features/QueueApiMockTest.java   |  33 ++++
 .../us/CloudQueuesUSProviderMetadata.java       |   7 +-
 .../us/CloudQueuesUSMessageApiLiveTest.java     |  30 +++
 21 files changed, 1454 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/22bf21f5/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApi.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApi.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApi.java
index 23f643e..fe4c048 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApi.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApi.java
@@ -18,13 +18,15 @@ package org.jclouds.openstack.marconi.v1;
 
 import com.google.inject.Provides;
 import org.jclouds.javax.annotation.Nullable;
-import org.jclouds.location.Region;
 import org.jclouds.location.Zone;
 import org.jclouds.location.functions.ZoneToEndpoint;
+import org.jclouds.openstack.marconi.v1.features.MessageApi;
 import org.jclouds.openstack.marconi.v1.features.QueueApi;
 import org.jclouds.rest.annotations.Delegate;
 import org.jclouds.rest.annotations.EndpointParam;
 
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
 import java.io.Closeable;
 import java.util.Set;
 
@@ -43,8 +45,21 @@ public interface MarconiApi extends Closeable {
 
    /**
     * Provides access to Queue features.
+    *
+    * @param zone The zone where this queue will live.
     */
    @Delegate
    QueueApi getQueueApiForZone(
          @EndpointParam(parser = ZoneToEndpoint.class) @Nullable String zone);
+
+   /**
+    * Provides access to Message features.
+    *
+    * @param zone The zone where this queue will live.
+    * @param name Name of the queue.
+    */
+   @Delegate
+   @Path("/queues/{name}")
+   MessageApi getMessageApiForZoneAndQueue(
+         @EndpointParam(parser = ZoneToEndpoint.class) @Nullable String zone, @PathParam("name") String name);
 }

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/22bf21f5/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApiMetadata.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApiMetadata.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApiMetadata.java
index bee1aba..24584cb 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApiMetadata.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApiMetadata.java
@@ -57,6 +57,7 @@ public class MarconiApiMetadata extends BaseHttpApiMetadata<MarconiApi> {
       Properties properties = BaseHttpApiMetadata.defaultProperties();
       properties.setProperty(SERVICE_TYPE, ServiceType.QUEUES);
       properties.setProperty(CREDENTIAL_TYPE, CredentialTypes.PASSWORD_CREDENTIALS);
+
       return properties;
    }
 

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/22bf21f5/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Aged.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Aged.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Aged.java
index 84ac899..818dff6 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Aged.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Aged.java
@@ -20,8 +20,12 @@ package org.jclouds.openstack.marconi.v1.domain;
 
 import com.google.common.base.Objects;
 
+import javax.inject.Named;
+import java.beans.ConstructorProperties;
 import java.util.Date;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * The age of messages in a queue.
  *
@@ -29,12 +33,18 @@ import java.util.Date;
  */
 public class Aged {
 
-   private int age;
-   private Date created;
+   private final int age;
+   private final Date created;
+   @Named("href")
+   private final String id;
 
-   protected Aged(int age, Date created) {
+   @ConstructorProperties({
+         "age", "created", "href"
+   })
+   protected Aged(int age, Date created, String id) {
       this.age = age;
-      this.created = created;
+      this.created = checkNotNull(created, "created required");
+      this.id = checkNotNull(id, "id required");
    }
 
    /**
@@ -51,9 +61,16 @@ public class Aged {
       return created;
    }
 
+   /**
+    * @return Id of the oldest/newest message.
+    */
+   public String getId() {
+      return id;
+   }
+
    @Override
    public int hashCode() {
-      return Objects.hashCode(age, created);
+      return Objects.hashCode(age, created, id);
    }
 
    @Override
@@ -61,12 +78,13 @@ public class Aged {
       if (this == obj) return true;
       if (obj == null || getClass() != obj.getClass()) return false;
       Aged that = Aged.class.cast(obj);
-      return Objects.equal(this.age, that.age) && Objects.equal(this.created, that.created);
+      return Objects.equal(this.age, that.age) && Objects.equal(this.created, that.created)
+            && Objects.equal(this.id, that.id);
    }
 
    protected Objects.ToStringHelper string() {
       return Objects.toStringHelper(this)
-         .add("age", age).add("created", created);
+         .add("age", age).add("created", created).add("id", id);
    }
 
    @Override
@@ -87,9 +105,10 @@ public class Aged {
 
       protected int age;
       protected Date created;
+      protected String id;
 
       /**
-       * @see Aged#age
+       * @see Aged#getAge()
        */
       public Builder age(int age) {
          this.age = age;
@@ -97,19 +116,27 @@ public class Aged {
       }
 
       /**
-       * @see Aged#created
+       * @see Aged#getCreated()
        */
       public Builder created(Date created) {
          this.created = created;
          return self();
       }
 
+      /**
+       * @see Aged#getId()
+       */
+      public Builder id(String id) {
+         this.id = id;
+         return self();
+      }
+
       public Aged build() {
-         return new Aged(age, created);
+         return new Aged(age, created, id);
       }
 
       public Builder fromAged(Aged in) {
-         return this.age(in.getAge()).created(in.getCreated());
+         return this.age(in.getAge()).created(in.getCreated()).id(in.getId());
       }
    }
 
@@ -119,5 +146,4 @@ public class Aged {
          return this;
       }
    }
-
 }

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/22bf21f5/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/CreateMessage.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/CreateMessage.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/CreateMessage.java
new file mode 100644
index 0000000..5958b68
--- /dev/null
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/CreateMessage.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to jclouds, Inc. (jclouds) under one or more
+ * contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  jclouds licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jclouds.openstack.marconi.v1.domain;
+
+import com.google.common.base.Objects;
+import org.jclouds.domain.JsonBall;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A message to be sent to a queue.
+ *
+ * @author Everett Toews
+ */
+public class CreateMessage {
+
+   private int ttl;
+   private String body;
+
+   protected CreateMessage(int ttl, String body) {
+      this.ttl = ttl;
+      this.body = checkNotNull(body, "body required");
+   }
+
+   /**
+    * @see Builder#ttl(int)
+    */
+   public int getTTL() {
+      return ttl;
+   }
+
+   /**
+    * @see Builder#body(String)
+    */
+   public String getBody() {
+      return body;
+   }
+
+   @Override
+   public int hashCode() {
+      return Objects.hashCode(ttl, body);
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+      if (this == obj) return true;
+      if (obj == null || getClass() != obj.getClass()) return false;
+      CreateMessage that = CreateMessage.class.cast(obj);
+      return Objects.equal(this.ttl, that.ttl) && Objects.equal(this.body, that.body);
+   }
+
+   protected Objects.ToStringHelper string() {
+      return Objects.toStringHelper(this)
+         .add("ttl", ttl).add("body", body);
+   }
+
+   @Override
+   public String toString() {
+      return string().toString();
+   }
+
+   public static Builder builder() {
+      return new ConcreteBuilder();
+   }
+
+   public Builder toBuilder() {
+      return new ConcreteBuilder().fromMessage(this);
+   }
+
+   public static abstract class Builder {
+      protected abstract Builder self();
+
+      protected int ttl;
+      protected String body;
+
+      /**
+       * @param ttl The time-to-live of the message in seconds. The ttl attribute specifies how long the server waits
+       *            before marking the message as expired and removing it from the queue. The valid range of values for
+       *            the ttl are configurable by your cloud provider. Consult your cloud provider documentation to learn
+       *            the valid range.
+       *            </p>
+       *            Note that the server might not actually delete the message until its age has reached up to
+       *            (ttl + 60) seconds, to allow for flexibility in storage implementations.
+       */
+      public Builder ttl(int ttl) {
+         this.ttl = ttl;
+         return self();
+      }
+
+      /**
+       * @param json Specifies an arbitrary JSON document that constitutes the body of the message being sent.
+       *             The size of the message allowed in one message is configurable by your cloud provider. Consult
+       *             your cloud provider documentation to learn the valid range.
+       */
+      public Builder body(String json) {
+         checkNotNull(json, "body required");
+         this.body = new JsonBall(json).toString();
+         return self();
+      }
+
+      public CreateMessage build() {
+         return new CreateMessage(ttl, body);
+      }
+
+      public Builder fromMessage(CreateMessage in) {
+         return this.ttl(in.getTTL()).body(in.getBody());
+      }
+   }
+
+   private static class ConcreteBuilder extends Builder {
+      @Override
+      protected ConcreteBuilder self() {
+         return this;
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/22bf21f5/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Message.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Message.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Message.java
new file mode 100644
index 0000000..a1f3eb7
--- /dev/null
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Message.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to jclouds, Inc. (jclouds) under one or more
+ * contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  jclouds licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jclouds.openstack.marconi.v1.domain;
+
+import com.google.common.base.Objects;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A message to be sent to a queue.
+ *
+ * @author Everett Toews
+ */
+public class Message {
+
+   private final String id;
+   private final int ttl;
+   private final String body;
+   private final int age;
+
+   protected Message(String id, int ttl, String body, int age) {
+      this.id = checkNotNull(id, "id required");
+      this.ttl = ttl;
+      this.body = checkNotNull(body, "body required");
+      this.age = age;
+   }
+
+   /**
+    * @return The id of this message.
+    */
+   public String getId() {
+      return id;
+   }
+
+   /**
+    * @see CreateMessage.Builder#ttl(int)
+    */
+   public int getTTL() {
+      return ttl;
+   }
+
+   /**
+    * @see CreateMessage.Builder#body(String)
+    */
+   public String getBody() {
+      return body;
+   }
+
+   /**
+    * @return Age of this message in seconds.
+    */
+   public int getAge() {
+      return age;
+   }
+
+   @Override
+   public int hashCode() {
+      return Objects.hashCode(id);
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+      if (this == obj) return true;
+      if (obj == null || getClass() != obj.getClass()) return false;
+      Message that = Message.class.cast(obj);
+      return Objects.equal(this.id, that.id);
+   }
+
+   protected Objects.ToStringHelper string() {
+      return Objects.toStringHelper(this)
+         .add("id", id).add("ttl", ttl).add("body", body).add("age", age);
+   }
+
+   @Override
+   public String toString() {
+      return string().toString();
+   }
+
+   public static Builder builder() {
+      return new ConcreteBuilder();
+   }
+
+   public Builder toBuilder() {
+      return new ConcreteBuilder().fromMessage(this);
+   }
+
+   public static abstract class Builder {
+      protected abstract Builder self();
+
+      protected String id;
+      protected int ttl;
+      protected String body;
+      protected int age;
+
+      /**
+       * @see Message#getId()
+       */
+      public Builder id(String id) {
+         this.id = id;
+         return self();
+      }
+
+      /**
+       * @see CreateMessage.Builder#ttl(int)
+       */
+      public Builder ttl(int ttl) {
+         this.ttl = ttl;
+         return self();
+      }
+
+      /**
+       * @see CreateMessage.Builder#body(String)
+       */
+      public Builder body(String json) {
+         this.body = json;
+         return self();
+      }
+
+      /**
+       * @see Message#getAge()
+       */
+      public Builder age(int age) {
+         this.age = age;
+         return self();
+      }
+
+      public Message build() {
+         return new Message(id, ttl, body, age);
+      }
+
+      public Builder fromMessage(Message in) {
+         return this.id(in.getId()).ttl(in.getTTL()).body(in.getBody()).age(in.getAge());
+      }
+   }
+
+   private static class ConcreteBuilder extends Builder {
+      @Override
+      protected ConcreteBuilder self() {
+         return this;
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/22bf21f5/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessageStream.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessageStream.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessageStream.java
new file mode 100644
index 0000000..84b67b5
--- /dev/null
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessageStream.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.openstack.marconi.v1.domain;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import org.jclouds.openstack.marconi.v1.options.StreamOptions;
+import org.jclouds.openstack.v2_0.domain.Link;
+import org.jclouds.openstack.v2_0.domain.PaginatedCollection;
+
+import static org.jclouds.http.utils.Queries.queryParser;
+
+public class MessageStream extends PaginatedCollection<Message> {
+   protected MessageStream(Iterable<Message> resources, Iterable<Link> links) {
+      super(resources, links);
+   }
+
+   /**
+    * Only call this method if {@code nextMarker().isPresent()} returns true.
+    *
+    * @return The options necessary to get the next page of messages.
+    */
+   public StreamOptions nextStreamOptions() {
+      return StreamOptions.class.cast(nextMarker().get());
+   }
+
+   @Override
+   public Optional<Object> nextMarker() {
+      Optional<Link> nextMarkerLink = Iterables.tryFind(getLinks(), IS_NEXT_LINK);
+      return nextMarkerLink.transform(TO_LIST_OPTIONS);
+   }
+
+   private static final Predicate<Link> IS_NEXT_LINK = new Predicate<Link>() {
+      @Override
+      public boolean apply(Link link) {
+         return Link.Relation.NEXT == link.getRelation();
+      }
+   };
+
+   private static final Function<Link, Object> TO_LIST_OPTIONS = new Function<Link, Object>() {
+      @Override
+      public Object apply(Link link) {
+         Multimap<String, String> queryParams = queryParser().apply(link.getHref().getRawQuery());
+         StreamOptions paginationOptions = StreamOptions.Builder.queryParameters(queryParams);
+
+         return paginationOptions;
+      }
+   };
+}

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/22bf21f5/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessagesCreated.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessagesCreated.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessagesCreated.java
new file mode 100644
index 0000000..de80b25
--- /dev/null
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessagesCreated.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to jclouds, Inc. (jclouds) under one or more
+ * contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  jclouds licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jclouds.openstack.marconi.v1.domain;
+
+import com.google.common.base.Objects;
+
+import javax.inject.Named;
+import java.beans.ConstructorProperties;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The response to creating messages on a queue.
+ *
+ * @author Everett Toews
+ */
+public class MessagesCreated {
+
+   @Named("resources")
+   private final List<String> messageIds;
+
+   @ConstructorProperties({
+         "resources"
+   })
+   protected MessagesCreated(List<String> messageIds) {
+      this.messageIds = checkNotNull(messageIds, "messageIds required");
+   }
+
+   /**
+    * @return A list of message ids that correspond to each message submitted in the request, in order.
+    */
+   public List<String> getMessageIds() {
+      return messageIds;
+   }
+
+   @Override
+   public int hashCode() {
+      return Objects.hashCode(messageIds);
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+      if (this == obj) return true;
+      if (obj == null || getClass() != obj.getClass()) return false;
+      MessagesCreated that = MessagesCreated.class.cast(obj);
+      return Objects.equal(this.messageIds, that.messageIds);
+   }
+
+   protected Objects.ToStringHelper string() {
+      return Objects.toStringHelper(this).add("messageIds", messageIds);
+   }
+
+   @Override
+   public String toString() {
+      return string().toString();
+   }
+
+   public static Builder builder() {
+      return new ConcreteBuilder();
+   }
+
+   public Builder toBuilder() {
+      return new ConcreteBuilder().fromMessageCreated(this);
+   }
+
+   public static abstract class Builder {
+      protected abstract Builder self();
+
+      protected List<String> messageIds;
+
+      /**
+       * @see MessagesCreated#getMessageIds()
+       */
+      public Builder messageIds(List<String> messageIds) {
+         this.messageIds = messageIds;
+         return self();
+      }
+
+      public MessagesCreated build() {
+         return new MessagesCreated(messageIds);
+      }
+
+      public Builder fromMessageCreated(MessagesCreated in) {
+         return this.messageIds(in.getMessageIds());
+      }
+   }
+
+   private static class ConcreteBuilder extends Builder {
+      @Override
+      protected ConcreteBuilder self() {
+         return this;
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/22bf21f5/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessagesStats.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessagesStats.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessagesStats.java
index 1440592..247b3a9 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessagesStats.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessagesStats.java
@@ -31,11 +31,11 @@ import java.beans.ConstructorProperties;
  */
 public class MessagesStats {
 
-   private int claimed;
-   private int free;
-   private int total;
-   private Aged oldest;
-   private Aged newest;
+   private final int claimed;
+   private final int free;
+   private final int total;
+   private final Aged oldest;
+   private final Aged newest;
 
    @ConstructorProperties({
          "claimed", "free", "total", "oldest", "newest"
@@ -128,7 +128,7 @@ public class MessagesStats {
       protected Aged newest;
 
       /**
-       * @see MessagesStats#claimed
+       * @see MessagesStats#getClaimed()
        */
       public Builder claimed(int claimed) {
          this.claimed = claimed;
@@ -136,7 +136,7 @@ public class MessagesStats {
       }
 
       /**
-       * @see MessagesStats#free
+       * @see MessagesStats#getFree()
        */
       public Builder free(int free) {
          this.free = free;
@@ -144,7 +144,7 @@ public class MessagesStats {
       }
 
       /**
-       * @see MessagesStats#total
+       * @see MessagesStats#getTotal()
        */
       public Builder total(int total) {
          this.total = total;
@@ -152,7 +152,7 @@ public class MessagesStats {
       }
 
       /**
-       * @see MessagesStats#oldest
+       * @see MessagesStats#getOldest()
        */
       public Builder oldest(Aged oldest) {
          this.oldest = oldest;
@@ -160,7 +160,7 @@ public class MessagesStats {
       }
 
       /**
-       * @see MessagesStats#newest
+       * @see MessagesStats#getNewest()
        */
       public Builder newest(Aged newest) {
          this.newest = newest;
@@ -183,5 +183,4 @@ public class MessagesStats {
          return this;
       }
    }
-
 }

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/22bf21f5/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/QueueStats.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/QueueStats.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/QueueStats.java
index 492f432..fe991ba 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/QueueStats.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/QueueStats.java
@@ -20,7 +20,7 @@ package org.jclouds.openstack.marconi.v1.domain;
 
 import com.google.common.base.Objects;
 
-import java.beans.ConstructorProperties;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
  * Queue statistics, including how many messages are in the queue.
@@ -29,10 +29,10 @@ import java.beans.ConstructorProperties;
  */
 public class QueueStats {
 
-   private MessagesStats messages;
+   private final MessagesStats messages;
 
-   protected QueueStats(MessagesStats messages) {
-      this.messages = messages;
+   protected QueueStats(MessagesStats messageStats) {
+      this.messages = checkNotNull(messageStats);
    }
 
    /**
@@ -64,4 +64,41 @@ public class QueueStats {
    public String toString() {
       return string().toString();
    }
+
+   public static Builder builder() {
+      return new ConcreteBuilder();
+   }
+
+   public Builder toBuilder() {
+      return new ConcreteBuilder().fromQueueStats(this);
+   }
+
+   public static abstract class Builder {
+      protected abstract Builder self();
+
+      protected MessagesStats messagesStats;
+
+      /**
+       * @see QueueStats#getMessagesStats()
+       */
+      public Builder messageStats(MessagesStats messagesStats) {
+         this.messagesStats = messagesStats;
+         return self();
+      }
+
+      public QueueStats build() {
+         return new QueueStats(messagesStats);
+      }
+
+      public Builder fromQueueStats(QueueStats in) {
+         return this.messageStats(in.getMessagesStats());
+      }
+   }
+
+   private static class ConcreteBuilder extends Builder {
+      @Override
+      protected ConcreteBuilder self() {
+         return this;
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/22bf21f5/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java
new file mode 100644
index 0000000..ecfb2c5
--- /dev/null
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.openstack.marconi.v1.features;
+
+import org.jclouds.Fallbacks;
+import org.jclouds.openstack.keystone.v2_0.KeystoneFallbacks;
+import org.jclouds.openstack.keystone.v2_0.filters.AuthenticateRequest;
+import org.jclouds.openstack.marconi.v1.domain.CreateMessage;
+import org.jclouds.openstack.marconi.v1.domain.MessagesCreated;
+import org.jclouds.openstack.marconi.v1.domain.MessageStream;
+import org.jclouds.openstack.marconi.v1.functions.ParseMessages;
+import org.jclouds.openstack.marconi.v1.functions.ParseMessagesCreated;
+import org.jclouds.openstack.marconi.v1.options.StreamOptions;
+import org.jclouds.rest.annotations.BinderParam;
+import org.jclouds.rest.annotations.Fallback;
+import org.jclouds.rest.annotations.RequestFilters;
+import org.jclouds.rest.annotations.ResponseParser;
+import org.jclouds.rest.annotations.SkipEncoding;
+import org.jclouds.rest.binders.BindToJsonPayload;
+
+import javax.inject.Named;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.HeaderParam;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.MediaType;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Provides access to Messages via their REST API.
+ *
+ * @author Everett Toews
+ */
+@SkipEncoding({'/', '='})
+@RequestFilters(AuthenticateRequest.class)
+public interface MessageApi {
+   /**
+    * Create message(s) on a queue.
+    *
+    * @param clientId A UUID for each client instance. The UUID must be submitted in its canonical form (for example,
+    *                 3381af92-2b9e-11e3-b191-71861300734c). The client generates the Client-ID once. Client-ID
+    *                 persists between restarts of the client so the client should reuse that same Client-ID. All
+    *                 message-related operations require the use of Client-ID in the headers to ensure that messages
+    *                 are not echoed back to the client that posted them, unless the client explicitly requests this.
+    * @param messages The messages created on the queue. The number of messages allowed in one request are configurable
+    *                 by your cloud provider. Consult your cloud provider documentation to learn the maximum.
+    */
+   @Named("message:create")
+   @POST
+   @Path("/messages")
+   @ResponseParser(ParseMessagesCreated.class)
+   @Fallback(Fallbacks.FalseOnNotFoundOr404.class)
+   MessagesCreated create(@HeaderParam("Client-ID") UUID clientId,
+                          @BinderParam(BindToJsonPayload.class) List<CreateMessage> messages);
+
+   @Named("message:stream")
+   @GET
+   @ResponseParser(ParseMessages.class)
+   @Consumes(MediaType.APPLICATION_JSON)
+   @Fallback(KeystoneFallbacks.EmptyPaginatedCollectionOnNotFoundOr404.class)
+   @Path("/messages")
+   MessageStream stream(@HeaderParam("Client-ID") UUID clientId,
+                        StreamOptions... options);
+}

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/22bf21f5/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/QueueApi.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/QueueApi.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/QueueApi.java
index aca5782..c80c610 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/QueueApi.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/QueueApi.java
@@ -19,9 +19,11 @@ package org.jclouds.openstack.marconi.v1.features;
 import org.jclouds.Fallbacks;
 import org.jclouds.openstack.keystone.v2_0.filters.AuthenticateRequest;
 import org.jclouds.openstack.marconi.v1.domain.QueueStats;
+import org.jclouds.openstack.marconi.v1.functions.ParseQueueStats;
 import org.jclouds.rest.annotations.BinderParam;
 import org.jclouds.rest.annotations.Fallback;
 import org.jclouds.rest.annotations.RequestFilters;
+import org.jclouds.rest.annotations.ResponseParser;
 import org.jclouds.rest.annotations.SkipEncoding;
 import org.jclouds.rest.binders.BindToJsonPayload;
 
@@ -44,6 +46,8 @@ import java.util.Map;
 @SkipEncoding({'/', '='})
 @RequestFilters(AuthenticateRequest.class)
 public interface QueueApi {
+   // TODO: Move name parameter into MarconiApi.getQueueApiForZone(String name, String zone)
+
    /**
     * Create a queue.
     *
@@ -80,6 +84,8 @@ public interface QueueApi {
    @Fallback(Fallbacks.FalseOnNotFoundOr404.class)
    boolean exists(@PathParam("name") String name);
 
+   // TODO stream method!
+
    /**
     * Sets metadata for the specified queue.
     * <p/>
@@ -124,6 +130,7 @@ public interface QueueApi {
    @GET
    @Path("queues/{name}/stats")
    @Consumes(MediaType.APPLICATION_JSON)
+   @ResponseParser(ParseQueueStats.class)
    @Fallback(Fallbacks.FalseOnNotFoundOr404.class)
    QueueStats getStats(@PathParam("name") String name);
 }

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/22bf21f5/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessages.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessages.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessages.java
new file mode 100644
index 0000000..163483d
--- /dev/null
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessages.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.openstack.marconi.v1.functions;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.jclouds.http.HttpResponse;
+import org.jclouds.http.functions.ParseJson;
+import org.jclouds.openstack.marconi.v1.domain.Message;
+import org.jclouds.openstack.marconi.v1.domain.MessageStream;
+import org.jclouds.openstack.v2_0.domain.Link;
+import org.jclouds.openstack.v2_0.domain.PaginatedCollection;
+
+import javax.inject.Inject;
+import java.beans.ConstructorProperties;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * @author Everett Toews
+ */
+public class ParseMessages implements Function<HttpResponse, MessageStream> {
+
+   private final ParseJson<MessagesWithHref> json;
+
+   @Inject
+   ParseMessages(ParseJson<MessagesWithHref> json) {
+      this.json = checkNotNull(json, "json");
+   }
+
+   @Override
+   public MessageStream apply(HttpResponse response) {
+      // An empty message stream has a 204 response code
+      if (response.getStatusCode() == 204) {
+         return new Messages(ImmutableSet.<Message> of(), ImmutableSet.<Link> of());
+      }
+
+      MessagesWithHref messagesWithHref = json.apply(response);
+      Iterable<Message> messages = Iterables.transform(messagesWithHref, TO_MESSAGE);
+
+      return new Messages(messages, messagesWithHref.getLinks());
+   }
+
+   private static String getMessageId(String rawMessageHref) {
+      // strip off everything but the message id
+      return rawMessageHref.substring(rawMessageHref.lastIndexOf('/')+1);
+   }
+
+   protected static final Function<MessageWithHref, Message> TO_MESSAGE = new Function<MessageWithHref, Message>() {
+      @Override
+      public Message apply(MessageWithHref messageWithHref) {
+         return messageWithHref.toBuilder().id(getMessageId(messageWithHref.getId())).build();
+      }
+   };
+
+   protected static final Function<String, String> TO_MESSAGE_ID = new Function<String, String>() {
+      @Override
+      public String apply(String messageIdWithHref) {
+         return getMessageId(messageIdWithHref);
+      }
+   };
+
+   private static class Messages extends MessageStream {
+
+      @ConstructorProperties({ "messages", "links" })
+      protected Messages(Iterable<Message> messages, Iterable<Link> links) {
+         super(messages, links);
+      }
+   }
+
+   private static class MessagesWithHref extends PaginatedCollection<MessageWithHref> {
+
+      @ConstructorProperties({ "messages", "links" })
+      protected MessagesWithHref(Iterable<MessageWithHref> messagesWithHref, Iterable<Link> links) {
+         super(messagesWithHref, links);
+      }
+   }
+
+   private static class MessageWithHref extends Message {
+
+      @ConstructorProperties({ "href", "ttl", "body", "age" })
+      protected MessageWithHref(String href, int ttl, String body, int age) {
+         super(href, ttl, body, age);
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/22bf21f5/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java
new file mode 100644
index 0000000..d2d4d83
--- /dev/null
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.openstack.marconi.v1.functions;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
+import org.jclouds.http.HttpResponse;
+import org.jclouds.http.functions.ParseJson;
+import org.jclouds.openstack.marconi.v1.domain.MessagesCreated;
+
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.jclouds.openstack.marconi.v1.functions.ParseMessages.TO_MESSAGE_ID;
+
+/**
+ * This parses the messages created on a queue.
+ * 
+ * @author Everett Toews
+ */
+public class ParseMessagesCreated implements Function<HttpResponse, MessagesCreated> {
+
+   private final ParseJson<MessagesCreated> json;
+
+   @Inject
+   ParseMessagesCreated(ParseJson<MessagesCreated> json) {
+      this.json = checkNotNull(json, "json");
+   }
+
+   public MessagesCreated apply(HttpResponse from) {
+      MessagesCreated rawMessagesCreated = json.apply(from);
+      List<String> messageIds = Lists.transform(rawMessagesCreated.getMessageIds(), TO_MESSAGE_ID);
+
+      MessagesCreated messagesCreated = MessagesCreated.builder()
+            .messageIds(messageIds)
+            .build();
+
+      return messagesCreated;
+   }
+}

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/22bf21f5/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java
new file mode 100644
index 0000000..fb87c6c
--- /dev/null
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.openstack.marconi.v1.functions;
+
+import com.google.common.base.Function;
+import com.google.inject.Inject;
+import org.jclouds.http.HttpResponse;
+import org.jclouds.http.functions.ParseJson;
+import org.jclouds.openstack.marconi.v1.domain.Aged;
+import org.jclouds.openstack.marconi.v1.domain.MessagesStats;
+import org.jclouds.openstack.marconi.v1.domain.QueueStats;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.jclouds.openstack.marconi.v1.functions.ParseMessages.TO_MESSAGE_ID;
+
+/**
+ * This parses the stats of a queue.
+ * 
+ * @author Everett
+ */
+public class ParseQueueStats implements Function<HttpResponse, QueueStats> {
+
+   private final ParseJson<QueueStats> json;
+
+   @Inject
+   ParseQueueStats(ParseJson<QueueStats> json) {
+      this.json = checkNotNull(json, "json");
+   }
+
+   public QueueStats apply(HttpResponse from) {
+      QueueStats rawQueueStats = json.apply(from);
+
+      if (rawQueueStats.getMessagesStats().getTotal() == 0) {
+         return rawQueueStats;
+      }
+      else {
+         // change the hrefs to ids
+         Aged oldestWithHref = rawQueueStats.getMessagesStats().getOldest().get();
+         Aged oldestWithId = oldestWithHref.toBuilder()
+               .id(TO_MESSAGE_ID.apply(oldestWithHref.getId()))
+               .build();
+         Aged newestWithHref = rawQueueStats.getMessagesStats().getNewest().get();
+         Aged newestWithId = newestWithHref.toBuilder()
+               .id(TO_MESSAGE_ID.apply(newestWithHref.getId()))
+               .build();
+
+         MessagesStats messagesStatsWithIds = rawQueueStats.getMessagesStats().toBuilder()
+               .oldest(oldestWithId)
+               .newest(newestWithId)
+               .build();
+
+         QueueStats queueStatsWithIds = rawQueueStats.toBuilder().messageStats(messagesStatsWithIds).build();
+
+         return queueStatsWithIds;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/22bf21f5/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamOptions.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamOptions.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamOptions.java
new file mode 100644
index 0000000..be2fffb
--- /dev/null
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamOptions.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.openstack.marconi.v1.options;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import org.jclouds.openstack.v2_0.options.PaginationOptions;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Options used to control the messages returned in the response.
+ */
+public class StreamOptions extends PaginationOptions {
+
+   public static final StreamOptions NONE = new StreamOptions();
+
+   /**
+    * {@inheritDoc}
+    */
+   @Override
+   public StreamOptions queryParameters(Multimap<String, String> queryParams) {
+      checkNotNull(queryParams, "queryParams");
+      queryParameters.putAll(queryParams);
+      return this;
+   }
+
+   /**
+    * @see Builder#marker(String)
+    */
+   @Override
+   public StreamOptions marker(String marker) {
+      super.marker(marker);
+      return this;
+   }
+
+   /**
+    * @see Builder#limit(int)
+    */
+   @Override
+   public StreamOptions limit(int limit) {
+      super.limit(limit);
+      return this;
+
+   }
+
+   /**
+    * @see Builder#echo(boolean)
+    */
+   public StreamOptions echo(boolean echo) {
+      queryParameters.put("echo", Boolean.toString(echo));
+      return this;
+   }
+
+   /**
+    * @return The String representation of the marker for these StreamOptions.
+    */
+   public String getMarker() {
+      return Iterables.getOnlyElement(queryParameters.get("marker"));
+   }
+
+   public static class Builder {
+      /**
+       * @see PaginationOptions#queryParameters(Multimap)
+       */
+      public static StreamOptions queryParameters(Multimap<String, String> queryParams) {
+         StreamOptions options = new StreamOptions();
+         return options.queryParameters(queryParams);
+      }
+
+      /**
+       * Specifies an opaque string that the client can use to request the next batch of messages. The marker parameter
+       * communicates to the server which messages the client has already received. If you do not specify a value, the
+       * API returns all messages at the head of the queue (up to the limit).
+       * </p>
+       * Clients should make no assumptions about the format or length of the marker. Furthermore, clients should assume
+       * that there is no relationship between markers and message IDs.
+       */
+      public static StreamOptions marker(String marker) {
+         StreamOptions options = new StreamOptions();
+         return options.marker(marker);
+      }
+
+      /**
+       * When more messages are available than can be returned in a single request, the client can pick up the next
+       * batch of messages by simply using the {@see StremOptions} returned from the previous call in {@code
+       * MessageStream#nextStreamOptions()}. Specifies up to 10 messages (the default value) to return. If you do not
+       * specify a value for the limit parameter, the default value of 10 is used.
+       */
+      public static StreamOptions limit(int limit) {
+         StreamOptions options = new StreamOptions();
+         return options.limit(limit);
+      }
+
+      /**
+       * The echo parameter determines whether the API returns a client's own messages, as determined by the clientId
+       * (UUID) portion of the client. If you do not specify a value, echo uses the default value of false. If you are
+       * experimenting with the API, you might want to set echo=true in order to see the messages that you posted.
+       */
+      public static StreamOptions echo(boolean echo) {
+         StreamOptions options = new StreamOptions();
+         return options.echo(echo);
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/22bf21f5/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java
new file mode 100644
index 0000000..3bfe3e0
--- /dev/null
+++ b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.openstack.marconi.v1.features;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.jclouds.openstack.marconi.v1.domain.CreateMessage;
+import org.jclouds.openstack.marconi.v1.domain.MessageStream;
+import org.jclouds.openstack.marconi.v1.domain.MessagesCreated;
+import org.jclouds.openstack.marconi.v1.internal.BaseMarconiApiLiveTest;
+import org.testng.annotations.Test;
+
+import java.util.List;
+import java.util.UUID;
+
+import static org.jclouds.openstack.marconi.v1.options.StreamOptions.Builder.echo;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+@Test(groups = "live", testName = "MessageApiLiveTest", singleThreaded = true)
+public class MessageApiLiveTest extends BaseMarconiApiLiveTest {
+
+   public void createQueues() throws Exception {
+      for (String zoneId : api.getConfiguredZones()) {
+         QueueApi queueApi = api.getQueueApiForZone(zoneId);
+         boolean success = queueApi.create("jclouds-test");
+
+         assertTrue(success);
+      }
+   }
+
+   @Test(dependsOnMethods = { "createQueues" })
+   public void streamZeroPagesOfMessages() throws Exception {
+      for (String zoneId : api.getConfiguredZones()) {
+         MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, "jclouds-test");
+
+         UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+
+         MessageStream messageStream = messageApi.stream(clientId, echo(true));
+
+         assertTrue(Iterables.isEmpty(messageStream));
+         assertFalse(messageStream.nextMarker().isPresent());
+      }
+   }
+
+   @Test(dependsOnMethods = { "streamZeroPagesOfMessages" })
+   public void createMessage() throws Exception {
+      for (String zoneId : api.getConfiguredZones()) {
+         MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, "jclouds-test");
+
+         UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+         String json1 = "{\"event\":{\"name\":\"Edmonton Java User Group\",\"attendees\":[\"bob\",\"jim\",\"sally\"]}}";
+         CreateMessage message1 = CreateMessage.builder().ttl(120).body(json1).build();
+         List<CreateMessage> message = ImmutableList.of(message1);
+
+         MessagesCreated messagesCreated = messageApi.create(clientId, message);
+
+         assertNotNull(messagesCreated);
+         assertEquals(messagesCreated.getMessageIds().size(), 1);
+      }
+   }
+
+   @Test(dependsOnMethods = { "createMessage" })
+   public void streamOnePageOfMessages() throws Exception {
+      for (String zoneId : api.getConfiguredZones()) {
+         MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, "jclouds-test");
+
+         UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+
+         MessageStream messageStream = messageApi.stream(clientId, echo(true));
+
+         while(messageStream.nextMarker().isPresent()) {
+            assertEquals(Iterables.size(messageStream), 1);
+
+            messageStream = messageApi.stream(clientId, messageStream.nextStreamOptions());
+         }
+
+         assertFalse(messageStream.nextMarker().isPresent());
+      }
+   }
+
+   @Test(dependsOnMethods = { "streamOnePageOfMessages" })
+   public void createMessages() throws Exception {
+      for (String zoneId : api.getConfiguredZones()) {
+         MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, "jclouds-test");
+
+         UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+         String json1 = "{\"event\":{\"name\":\"Austin Java User Group\",\"attendees\":[\"bob\",\"jim\",\"sally\"]}}";
+         CreateMessage message1 = CreateMessage.builder().ttl(120).body(json1).build();
+         String json2 = "{\"event\":{\"name\":\"SF Java User Group\",\"attendees\":[\"bob\",\"jim\",\"sally\"]}}";
+         CreateMessage message2 = CreateMessage.builder().ttl(120).body(json2).build();
+         String json3 = "{\"event\":{\"name\":\"HK Java User Group\",\"attendees\":[\"bob\",\"jim\",\"sally\"]}}";
+         CreateMessage message3 = CreateMessage.builder().ttl(120).body(json3).build();
+         List<CreateMessage> messages = ImmutableList.of(message1, message2, message3);
+
+         MessagesCreated messagesCreated = messageApi.create(clientId, messages);
+
+         assertNotNull(messagesCreated);
+         assertEquals(messagesCreated.getMessageIds().size(), 3);
+      }
+   }
+
+   @Test(dependsOnMethods = { "createMessages" })
+   public void streamManyPagesOfMessages() throws Exception {
+      for (String zoneId : api.getConfiguredZones()) {
+         MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, "jclouds-test");
+
+         UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+
+         MessageStream messageStream = messageApi.stream(clientId, echo(true).limit(2));
+
+         while(messageStream.nextMarker().isPresent()) {
+            assertEquals(Iterables.size(messageStream), 2);
+
+            messageStream = messageApi.stream(clientId, messageStream.nextStreamOptions());
+         }
+
+         assertFalse(messageStream.nextMarker().isPresent());
+      }
+   }
+
+   @Test(dependsOnMethods = { "streamManyPagesOfMessages" })
+   public void delete() throws Exception {
+      for (String zoneId : api.getConfiguredZones()) {
+         QueueApi queueApi = api.getQueueApiForZone(zoneId);
+         boolean success = queueApi.delete("jclouds-test");
+
+         assertTrue(success);
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/22bf21f5/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java
new file mode 100644
index 0000000..8f45aa3
--- /dev/null
+++ b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.openstack.marconi.v1.features;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.squareup.okhttp.mockwebserver.MockResponse;
+import com.squareup.okhttp.mockwebserver.MockWebServer;
+import org.jclouds.openstack.marconi.v1.MarconiApi;
+import org.jclouds.openstack.marconi.v1.domain.CreateMessage;
+import org.jclouds.openstack.marconi.v1.domain.MessageStream;
+import org.jclouds.openstack.marconi.v1.domain.MessagesCreated;
+import org.jclouds.openstack.v2_0.internal.BaseOpenStackMockTest;
+import org.testng.annotations.Test;
+
+import java.util.List;
+import java.util.UUID;
+
+import static org.jclouds.openstack.marconi.v1.options.StreamOptions.Builder.limit;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * @author Everett Toews
+ */
+@Test
+public class MessageApiMockTest extends BaseOpenStackMockTest<MarconiApi> {
+
+   public void createMessage() throws Exception {
+      MockWebServer server = mockOpenStackServer();
+      server.enqueue(new MockResponse().setBody(accessRackspace));
+      server.enqueue(new MockResponse().setResponseCode(201).setBody("{\"partial\": false, \"resources\": [\"/v1/queues/jclouds-test/messages/526550ecef913e655ff84db8\"]}"));
+
+      try {
+         MarconiApi api = api(server.getUrl("/").toString(), "openstack-marconi");
+         MessageApi messageApi = api.getMessageApiForZoneAndQueue("DFW", "jclouds-test");
+
+         UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+         String json1 = "{\"event\":{\"name\":\"Edmonton Java User Group\",\"attendees\":[\"bob\",\"jim\",\"sally\"]}}";
+         CreateMessage createMessage1 = CreateMessage.builder().ttl(120).body(json1).build();
+         List<CreateMessage> createMessages = ImmutableList.of(createMessage1);
+
+         MessagesCreated messagesCreated = messageApi.create(clientId, createMessages);
+
+         assertNotNull(messagesCreated);
+         assertEquals(messagesCreated.getMessageIds().size(), 1);
+         assertEquals(messagesCreated.getMessageIds().get(0), "526550ecef913e655ff84db8");
+
+         assertEquals(server.getRequestCount(), 2);
+         assertEquals(server.takeRequest().getRequestLine(), "POST /tokens HTTP/1.1");
+         assertEquals(server.takeRequest().getRequestLine(), "POST /v1/123123/queues/jclouds-test/messages HTTP/1.1");
+      }
+      finally {
+         server.shutdown();
+      }
+   }
+
+   public void createMessages() throws Exception {
+      MockWebServer server = mockOpenStackServer();
+      server.enqueue(new MockResponse().setBody(accessRackspace));
+      server.enqueue(new MockResponse().setResponseCode(201).setBody("{\"partial\": false, \"resources\": [\"/v1/queues/jclouds-test/messages/5265540ef4919b655da1760a\", \"/v1/queues/jclouds-test/messages/5265540ef4919b655da1760b\", \"/v1/queues/jclouds-test/messages/5265540ef4919b655da1760c\"]}"));
+
+      try {
+         MarconiApi api = api(server.getUrl("/").toString(), "openstack-marconi");
+         MessageApi messageApi = api.getMessageApiForZoneAndQueue("DFW", "jclouds-test");
+
+         UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+         String json1 = "{\"event\":{\"name\":\"Austin Java User Group\",\"attendees\":[\"bob\",\"jim\",\"sally\"]}}";
+         CreateMessage createMessage1 = CreateMessage.builder().ttl(120).body(json1).build();
+         String json2 = "{\"event\":{\"name\":\"SF Java User Group\",\"attendees\":[\"bob\",\"jim\",\"sally\"]}}";
+         CreateMessage createMessage2 = CreateMessage.builder().ttl(120).body(json2).build();
+         String json3 = "{\"event\":{\"name\":\"HK Java User Group\",\"attendees\":[\"bob\",\"jim\",\"sally\"]}}";
+         CreateMessage createMessage3 = CreateMessage.builder().ttl(120).body(json3).build();
+         List<CreateMessage> createMessages = ImmutableList.of(createMessage1, createMessage2, createMessage3);
+
+         MessagesCreated messagesCreated = messageApi.create(clientId, createMessages);
+
+         assertNotNull(messagesCreated);
+         assertEquals(messagesCreated.getMessageIds().size(), 3);
+         assertTrue(messagesCreated.getMessageIds().contains("5265540ef4919b655da1760a"));
+         assertTrue(messagesCreated.getMessageIds().contains("5265540ef4919b655da1760b"));
+         assertTrue(messagesCreated.getMessageIds().contains("5265540ef4919b655da1760c"));
+
+         assertEquals(server.getRequestCount(), 2);
+         assertEquals(server.takeRequest().getRequestLine(), "POST /tokens HTTP/1.1");
+         assertEquals(server.takeRequest().getRequestLine(), "POST /v1/123123/queues/jclouds-test/messages HTTP/1.1");
+      }
+      finally {
+         server.shutdown();
+      }
+   }
+
+   public void streamZeroPagesOfMessages() throws Exception {
+      MockWebServer server = mockOpenStackServer();
+      server.enqueue(new MockResponse().setBody(accessRackspace));
+      server.enqueue(new MockResponse().setResponseCode(204));
+
+      try {
+         MarconiApi api = api(server.getUrl("/").toString(), "openstack-marconi");
+         MessageApi messageApi = api.getMessageApiForZoneAndQueue("DFW", "jclouds-test");
+         UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+
+         MessageStream messageStream = messageApi.stream(clientId);
+
+         assertTrue(Iterables.isEmpty(messageStream));
+         assertFalse(messageStream.nextMarker().isPresent());
+
+         assertEquals(server.getRequestCount(), 2);
+         assertEquals(server.takeRequest().getRequestLine(), "POST /tokens HTTP/1.1");
+         assertEquals(server.takeRequest().getRequestLine(), "GET /v1/123123/queues/jclouds-test/messages HTTP/1.1");
+      }
+      finally {
+         server.shutdown();
+      }
+   }
+
+   public void streamOnePageOfMessages() throws Exception {
+      MockWebServer server = mockOpenStackServer();
+      server.enqueue(new MockResponse().setBody(accessRackspace));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody("{\"messages\": [{\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"SF Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 7353, \"href\": \"/v1/queues/jclouds-test/messages/526ec635b04a5866dbe31ba1\", \"ttl\": 86400}, {\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"Austin Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 7353, \"href\": \"/v1/queues/jclouds-test/messages/526ec635b04a5866dbe31ba2\", \"ttl\": 86400}, {\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"HK Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 7353, \"href\": \"/v1/queues/jclouds-test/messages/526ec635b04a5866dbe31ba3\", \"ttl\": 86400}, {\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"SF Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 7342, \"href\": \"/v1/queues/jclouds-test/messages/526e
 c640f4919b69a7bc558e\", \"ttl\": 86400}, {\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"Austin Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 7342, \"href\": \"/v1/queues/jclouds-test/messages/526ec640f4919b69a7bc558f\", \"ttl\": 86400}, {\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"HK Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 7342, \"href\": \"/v1/queues/jclouds-test/messages/526ec640f4919b69a7bc5590\", \"ttl\": 86400}], \"links\": [{\"href\": \"/v1/queues/jclouds-test/messages?marker=4512\", \"rel\": \"next\"}]}"));
+      server.enqueue(new MockResponse().setResponseCode(204));
+
+      try {
+         MarconiApi api = api(server.getUrl("/").toString(), "openstack-marconi");
+         MessageApi messageApi = api.getMessageApiForZoneAndQueue("DFW", "jclouds-test");
+         UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+
+         MessageStream messageStream = messageApi.stream(clientId);
+
+         while(messageStream.nextMarker().isPresent()) {
+            assertEquals(Iterables.size(messageStream), 6);
+
+            messageStream = messageApi.stream(clientId, messageStream.nextStreamOptions());
+         }
+
+         assertFalse(messageStream.nextMarker().isPresent());
+
+         assertEquals(server.getRequestCount(), 3);
+         assertEquals(server.takeRequest().getRequestLine(), "POST /tokens HTTP/1.1");
+         assertEquals(server.takeRequest().getRequestLine(), "GET /v1/123123/queues/jclouds-test/messages HTTP/1.1");
+         assertEquals(server.takeRequest().getRequestLine(), "GET /v1/123123/queues/jclouds-test/messages?marker=4512 HTTP/1.1");
+      }
+      finally {
+         server.shutdown();
+      }
+   }
+
+   public void streamManyPagesOfMessages() throws Exception {
+      MockWebServer server = mockOpenStackServer();
+      server.enqueue(new MockResponse().setBody(accessRackspace));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody("{\"messages\": [{\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"SF Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 8082, \"href\": \"/v1/queues/jclouds-test/messages/526ec635b04a5866dbe31ba1\", \"ttl\": 86400}, {\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"Austin Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 8082, \"href\": \"/v1/queues/jclouds-test/messages/526ec635b04a5866dbe31ba2\", \"ttl\": 86400}], \"links\": [{\"href\": \"/v1/queues/jclouds-test/messages?marker=4508&limit=2\", \"rel\": \"next\"}]}"));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody("{\"messages\": [{\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"HK Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 8082, \"href\": \"/v1/queues/jclouds-test/messages/526ec635b04a5866dbe31ba3\", \"ttl\": 86400}, {\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"SF Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 8071, \"href\": \"/v1/queues/jclouds-test/messages/526ec640f4919b69a7bc558e\", \"ttl\": 86400}], \"links\": [{\"href\": \"/v1/queues/jclouds-test/messages?marker=4510&limit=2\", \"rel\": \"next\"}]}"));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody("{\"messages\": [{\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"Austin Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 8071, \"href\": \"/v1/queues/jclouds-test/messages/526ec640f4919b69a7bc558f\", \"ttl\": 86400}, {\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"HK Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 8071, \"href\": \"/v1/queues/jclouds-test/messages/526ec640f4919b69a7bc5590\", \"ttl\": 86400}], \"links\": [{\"href\": \"/v1/queues/jclouds-test/messages?marker=4512&limit=2\", \"rel\": \"next\"}]}"));
+      server.enqueue(new MockResponse().setResponseCode(204));
+
+      try {
+         MarconiApi api = api(server.getUrl("/").toString(), "openstack-marconi");
+         MessageApi messageApi = api.getMessageApiForZoneAndQueue("DFW", "jclouds-test");
+         UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+
+         MessageStream messageStream = messageApi.stream(clientId, limit(2));
+
+         while(messageStream.nextMarker().isPresent()) {
+            assertEquals(Iterables.size(messageStream), 2);
+
+            messageStream = messageApi.stream(clientId, messageStream.nextStreamOptions());
+         }
+
+         assertFalse(messageStream.nextMarker().isPresent());
+
+         assertEquals(server.getRequestCount(), 5);
+         assertEquals(server.takeRequest().getRequestLine(), "POST /tokens HTTP/1.1");
+         assertEquals(server.takeRequest().getRequestLine(), "GET /v1/123123/queues/jclouds-test/messages?limit=2 HTTP/1.1");
+         assertEquals(server.takeRequest().getRequestLine(), "GET /v1/123123/queues/jclouds-test/messages?marker=4508&limit=2 HTTP/1.1");
+         assertEquals(server.takeRequest().getRequestLine(), "GET /v1/123123/queues/jclouds-test/messages?marker=4510&limit=2 HTTP/1.1");
+         assertEquals(server.takeRequest().getRequestLine(), "GET /v1/123123/queues/jclouds-test/messages?marker=4512&limit=2 HTTP/1.1");
+      }
+      finally {
+         server.shutdown();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/22bf21f5/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/QueueApiLiveTest.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/QueueApiLiveTest.java b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/QueueApiLiveTest.java
index 7c1341f..cb3dfa3 100644
--- a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/QueueApiLiveTest.java
+++ b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/QueueApiLiveTest.java
@@ -16,15 +16,20 @@
  */
 package org.jclouds.openstack.marconi.v1.features;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.jclouds.openstack.marconi.v1.domain.CreateMessage;
 import org.jclouds.openstack.marconi.v1.domain.QueueStats;
 import org.jclouds.openstack.marconi.v1.internal.BaseMarconiApiLiveTest;
 import org.testng.annotations.Test;
 
+import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 
 @Test(groups = "live", testName = "QueueApiLiveTest", singleThreaded = true)
@@ -85,6 +90,31 @@ public class QueueApiLiveTest extends BaseMarconiApiLiveTest {
    }
 
    @Test(dependsOnMethods = { "getStatsWithoutTotal" })
+   public void getStatsWithTotal() throws Exception {
+      for (String zoneId : api.getConfiguredZones()) {
+         MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, "jclouds-test");
+
+         UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+         String json1 = "{\"event\":{\"type\":\"hockey\",\"players\":[\"bob\",\"jim\",\"sally\"]}}";
+         CreateMessage message1 = CreateMessage.builder().ttl(120).body(json1).build();
+         List<CreateMessage> message = ImmutableList.of(message1);
+
+         messageApi.create(clientId, message);
+
+         QueueApi queueApi = api.getQueueApiForZone(zoneId);
+         QueueStats stats = queueApi.getStats("jclouds-test");
+
+         assertEquals(stats.getMessagesStats().getClaimed(), 0);
+         assertEquals(stats.getMessagesStats().getFree(), 1);
+         assertEquals(stats.getMessagesStats().getTotal(), 1);
+         assertTrue(stats.getMessagesStats().getOldest().isPresent());
+         assertNotNull(stats.getMessagesStats().getOldest().get().getId());
+         assertTrue(stats.getMessagesStats().getNewest().isPresent());
+         assertNotNull(stats.getMessagesStats().getNewest().get().getId());
+      }
+   }
+
+   @Test(dependsOnMethods = { "getStatsWithTotal" })
    public void delete() throws Exception {
       for (String zoneId : api.getConfiguredZones()) {
          QueueApi queueApi = api.getQueueApiForZone(zoneId);

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/22bf21f5/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/QueueApiMockTest.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/QueueApiMockTest.java b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/QueueApiMockTest.java
index 02e0488..375f24d 100644
--- a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/QueueApiMockTest.java
+++ b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/QueueApiMockTest.java
@@ -25,6 +25,7 @@ import org.jclouds.openstack.marconi.v1.domain.QueueStats;
 import org.jclouds.openstack.v2_0.internal.BaseOpenStackMockTest;
 import org.testng.annotations.Test;
 
+import java.util.Date;
 import java.util.Map;
 
 import static org.testng.Assert.assertEquals;
@@ -191,4 +192,36 @@ public class QueueApiMockTest extends BaseOpenStackMockTest<MarconiApi> {
          server.shutdown();
       }
    }
+
+   public void getQueueStatsWithTotal() throws Exception {
+      MockWebServer server = mockOpenStackServer();
+      server.enqueue(new MockResponse().setBody(accessRackspace));
+      server.enqueue(new MockResponse().setResponseCode(200)
+            .setBody("{\"messages\": {\"claimed\": 0, \"oldest\": {\"age\": 0, \"href\": \"/v1/queues/jclouds-test/messages/526558b3f4919b655feba3a7\", \"created\": \"2013-10-21T16:39:15Z\"}, \"total\": 4, \"newest\": {\"age\": 0, \"href\": \"/v1/queues/jclouds-test/messages/526558b33ac24e663fc545e7\", \"created\": \"2013-10-21T16:39:15Z\"}, \"free\": 4}}"));
+
+      try {
+         MarconiApi api = api(server.getUrl("/").toString(), "openstack-marconi");
+         QueueApi queueApi = api.getQueueApiForZone("DFW");
+         QueueStats stats = queueApi.getStats("jclouds-test");
+
+         assertEquals(stats.getMessagesStats().getClaimed(), 0);
+         assertEquals(stats.getMessagesStats().getFree(), 4);
+         assertEquals(stats.getMessagesStats().getTotal(), 4);
+         assertTrue(stats.getMessagesStats().getOldest().isPresent());
+         assertTrue(stats.getMessagesStats().getOldest().get().getCreated().before(new Date()));
+         assertEquals(stats.getMessagesStats().getOldest().get().getAge(), 0);
+         assertEquals(stats.getMessagesStats().getOldest().get().getId(), "526558b3f4919b655feba3a7");
+         assertTrue(stats.getMessagesStats().getNewest().isPresent());
+         assertTrue(stats.getMessagesStats().getNewest().get().getCreated().before(new Date()));
+         assertEquals(stats.getMessagesStats().getNewest().get().getAge(), 0);
+         assertEquals(stats.getMessagesStats().getNewest().get().getId(), "526558b33ac24e663fc545e7");
+
+         assertEquals(server.getRequestCount(), 2);
+         assertEquals(server.takeRequest().getRequestLine(), "POST /tokens HTTP/1.1");
+         assertEquals(server.takeRequest().getRequestLine(), "GET /v1/123123/queues/jclouds-test/stats HTTP/1.1");
+      }
+      finally {
+         server.shutdown();
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/22bf21f5/rackspace-cloudqueues-us/src/main/java/org/jclouds/rackspace/cloudqueues/us/CloudQueuesUSProviderMetadata.java
----------------------------------------------------------------------
diff --git a/rackspace-cloudqueues-us/src/main/java/org/jclouds/rackspace/cloudqueues/us/CloudQueuesUSProviderMetadata.java b/rackspace-cloudqueues-us/src/main/java/org/jclouds/rackspace/cloudqueues/us/CloudQueuesUSProviderMetadata.java
index 0baf912..201513b 100644
--- a/rackspace-cloudqueues-us/src/main/java/org/jclouds/rackspace/cloudqueues/us/CloudQueuesUSProviderMetadata.java
+++ b/rackspace-cloudqueues-us/src/main/java/org/jclouds/rackspace/cloudqueues/us/CloudQueuesUSProviderMetadata.java
@@ -66,12 +66,15 @@ public class CloudQueuesUSProviderMetadata extends BaseProviderMetadata {
       Properties properties = new Properties();
       properties.setProperty(CREDENTIAL_TYPE, CloudIdentityCredentialTypes.API_KEY_CREDENTIALS);
       properties.setProperty(SERVICE_TYPE, ServiceType.QUEUES);
-      properties.setProperty(PROPERTY_ZONES, "ORD,DFW,IAD,SYD,HKG");
+
+      properties.setProperty(PROPERTY_ZONES, "ORD,DFW,IAD,SYD"); // TODO: add HKG
       properties.setProperty(PROPERTY_ZONE + ".ORD." + ISO3166_CODES, "US-IL");
       properties.setProperty(PROPERTY_ZONE + ".DFW." + ISO3166_CODES, "US-TX");
       properties.setProperty(PROPERTY_ZONE + ".IAD." + ISO3166_CODES, "US-VA");
       properties.setProperty(PROPERTY_ZONE + ".SYD." + ISO3166_CODES, "AU-NSW");
-      properties.setProperty(PROPERTY_ZONE + ".HKG." + ISO3166_CODES, "HK");
+      // TODO: enable HKG
+      // properties.setProperty(PROPERTY_ZONE + ".HKG." + ISO3166_CODES, "HK");
+
       return properties;
    }
    

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/22bf21f5/rackspace-cloudqueues-us/src/test/java/org/jclouds/rackspace/cloudqueues/us/CloudQueuesUSMessageApiLiveTest.java
----------------------------------------------------------------------
diff --git a/rackspace-cloudqueues-us/src/test/java/org/jclouds/rackspace/cloudqueues/us/CloudQueuesUSMessageApiLiveTest.java b/rackspace-cloudqueues-us/src/test/java/org/jclouds/rackspace/cloudqueues/us/CloudQueuesUSMessageApiLiveTest.java
new file mode 100644
index 0000000..0652329
--- /dev/null
+++ b/rackspace-cloudqueues-us/src/test/java/org/jclouds/rackspace/cloudqueues/us/CloudQueuesUSMessageApiLiveTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.rackspace.cloudqueues.us;
+
+import org.jclouds.openstack.marconi.v1.features.MessageApiLiveTest;
+import org.testng.annotations.Test;
+
+/**
+ * @author Everett Toews
+ */
+@Test(groups = "live", testName = "CloudQueuesUSMessageApiLiveTest")
+public class CloudQueuesUSMessageApiLiveTest extends MessageApiLiveTest {
+    public CloudQueuesUSMessageApiLiveTest() {
+        provider = "rackspace-cloudqueues-us";
+    }
+}