You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jclouds.apache.org by ev...@apache.org on 2013/12/12 19:36:49 UTC

git commit: The OpenStack Marconi Claim API.

Updated Branches:
  refs/heads/master 51a46bfad -> a9a2b5bc8


The OpenStack Marconi Claim API.


Project: http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/commit/a9a2b5bc
Tree: http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/tree/a9a2b5bc
Diff: http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/diff/a9a2b5bc

Branch: refs/heads/master
Commit: a9a2b5bc8f910d589d531e1b088e89729f7286ee
Parents: 51a46bf
Author: Everett Toews <ev...@rackspace.com>
Authored: Wed Dec 11 21:55:41 2013 -0600
Committer: Everett Toews <ev...@rackspace.com>
Committed: Thu Dec 12 12:36:37 2013 -0600

----------------------------------------------------------------------
 .../openstack/marconi/v1/MarconiApi.java        |  23 ++-
 .../openstack/marconi/v1/domain/Claim.java      | 164 +++++++++++++++++++
 .../openstack/marconi/v1/domain/Message.java    |  31 +++-
 .../openstack/marconi/v1/features/ClaimApi.java | 144 ++++++++++++++++
 .../marconi/v1/features/MessageApi.java         |  30 +++-
 .../marconi/v1/functions/ParseClaim.java        |  78 +++++++++
 .../v1/functions/ParseMessagesCreated.java      |   4 +-
 .../v1/functions/ParseMessagesToStream.java     |  47 +++++-
 .../marconi/v1/functions/ParseQueueStats.java   |   6 +-
 .../v1/options/StreamMessagesOptions.java       |  16 ++
 .../marconi/v1/features/ClaimApiLiveTest.java   | 134 +++++++++++++++
 .../marconi/v1/features/ClaimApiMockTest.java   | 126 ++++++++++++++
 .../marconi/v1/features/MessageApiLiveTest.java |  15 ++
 .../marconi/v1/features/MessageApiMockTest.java |  23 +++
 .../us/CloudQueuesUSClaimApiLiveTest.java       |  30 ++++
 15 files changed, 844 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApi.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApi.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApi.java
index fe4c048..3583bbe 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApi.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApi.java
@@ -20,15 +20,18 @@ import com.google.inject.Provides;
 import org.jclouds.javax.annotation.Nullable;
 import org.jclouds.location.Zone;
 import org.jclouds.location.functions.ZoneToEndpoint;
+import org.jclouds.openstack.marconi.v1.features.ClaimApi;
 import org.jclouds.openstack.marconi.v1.features.MessageApi;
 import org.jclouds.openstack.marconi.v1.features.QueueApi;
 import org.jclouds.rest.annotations.Delegate;
 import org.jclouds.rest.annotations.EndpointParam;
 
+import javax.ws.rs.HeaderParam;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import java.io.Closeable;
 import java.util.Set;
+import java.util.UUID;
 
 /**
  * Marconi is a robust, web-scale message queuing service to support the distributed nature of large web applications.
@@ -55,11 +58,29 @@ public interface MarconiApi extends Closeable {
    /**
     * Provides access to Message features.
     *
-    * @param zone The zone where this queue will live.
+    * @param zone The zone where this queue lives.
     * @param name Name of the queue.
     */
    @Delegate
    @Path("/queues/{name}")
    MessageApi getMessageApiForZoneAndQueue(
          @EndpointParam(parser = ZoneToEndpoint.class) @Nullable String zone, @PathParam("name") String name);
+
+   /**
+    * Provides access to Claim features.
+    *
+    * @param zone The zone where this queue lives.
+    * @param clientId A UUID for each client instance. The UUID must be submitted in its canonical form (for example,
+    *                 3381af92-2b9e-11e3-b191-71861300734c). The client generates the Client-ID once. Client-ID
+    *                 persists between restarts of the client so the client should reuse that same Client-ID. All
+    *                 message-related operations require the use of Client-ID in the headers to ensure that messages
+    *                 are not echoed back to the client that posted them, unless the client explicitly requests this.
+    * @param name Name of the queue.
+    */
+   @Delegate
+   @Path("/queues/{name}")
+   ClaimApi getClaimApiForZoneAndClientAndQueue(
+         @EndpointParam(parser = ZoneToEndpoint.class) @Nullable String zone,
+         @HeaderParam("Client-ID") UUID clientId,
+         @PathParam("name") String name);
 }

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Claim.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Claim.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Claim.java
new file mode 100644
index 0000000..71945b4
--- /dev/null
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Claim.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to jclouds, Inc. (jclouds) under one or more
+ * contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  jclouds licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jclouds.openstack.marconi.v1.domain;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
+import org.jclouds.javax.annotation.Nullable;
+
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A claim for messages in a queue.
+ *
+ * @author Everett Toews
+ */
+public class Claim {
+
+   private final String id;
+   private final int ttl;
+   private final int age;
+   private final List<Message> messages;
+
+   protected Claim(String id, int ttl, int age, @Nullable List<Message> messages) {
+      this.id = checkNotNull(id, "id required");
+      this.ttl = ttl;
+      this.age = age;
+      this.messages = messages == null ? ImmutableList.<Message>of() : messages;
+   }
+
+   /**
+    * @return The id of this message.
+    */
+   public String getId() {
+      return id;
+   }
+
+   /**
+    * @see CreateMessage.Builder#ttl(int)
+    */
+   public int getTTL() {
+      return ttl;
+   }
+
+   /**
+    * @return Age of this message in seconds.
+    */
+   public int getAge() {
+      return age;
+   }
+
+   /**
+    * @return The messages that are associated with this claim.
+    */
+   public List<Message> getMessages() {
+      return messages;
+   }
+
+   @Override
+   public int hashCode() {
+      return Objects.hashCode(id);
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+      if (this == obj) return true;
+      if (obj == null || getClass() != obj.getClass()) return false;
+      Claim that = Claim.class.cast(obj);
+      return Objects.equal(this.id, that.id);
+   }
+
+   protected Objects.ToStringHelper string() {
+      return Objects.toStringHelper(this)
+         .add("id", id).add("ttl", ttl).add("age", age).add("messages", messages);
+   }
+
+   @Override
+   public String toString() {
+      return string().toString();
+   }
+
+   public static Builder builder() {
+      return new ConcreteBuilder();
+   }
+
+   public Builder toBuilder() {
+      return new ConcreteBuilder().fromMessage(this);
+   }
+
+   public static abstract class Builder {
+      protected abstract Builder self();
+
+      protected String id;
+      protected int ttl;
+      protected int age;
+      protected List<Message> messages;
+
+      /**
+       * @see Claim#getId()
+       */
+      public Builder id(String id) {
+         this.id = id;
+         return self();
+      }
+
+      /**
+       * @see CreateMessage.Builder#ttl(int)
+       */
+      public Builder ttl(int ttl) {
+         this.ttl = ttl;
+         return self();
+      }
+
+      /**
+       * @see Claim#getAge()
+       */
+      public Builder age(int age) {
+         this.age = age;
+         return self();
+      }
+
+      /**
+       * @see Claim#getMessages()
+       */
+      public Builder messages(List<Message> messages) {
+         this.messages = messages;
+         return self();
+      }
+
+      public Claim build() {
+         return new Claim(id, ttl, age, messages);
+      }
+
+      public Builder fromMessage(Claim in) {
+         return this.id(in.getId()).ttl(in.getTTL()).age(in.getAge()).messages(in.getMessages());
+      }
+   }
+
+   private static class ConcreteBuilder extends Builder {
+      @Override
+      protected ConcreteBuilder self() {
+         return this;
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Message.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Message.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Message.java
index a118603..184ef7b 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Message.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Message.java
@@ -19,6 +19,8 @@
 package org.jclouds.openstack.marconi.v1.domain;
 
 import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import org.jclouds.javax.annotation.Nullable;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -33,12 +35,14 @@ public class Message {
    private final int ttl;
    private final String body;
    private final int age;
+   private final String claimId;
 
-   protected Message(String id, int ttl, String body, int age) {
+   protected Message(String id, int ttl, String body, int age, @Nullable String claimId) {
       this.id = checkNotNull(id, "id required");
       this.ttl = ttl;
       this.body = checkNotNull(body, "body required");
       this.age = age;
+      this.claimId = claimId;
    }
 
    /**
@@ -69,6 +73,13 @@ public class Message {
       return age;
    }
 
+   /**
+    * @return The claim id of this message.
+    */
+   public Optional<String> getClaimId() {
+      return Optional.fromNullable(claimId);
+   }
+
    @Override
    public int hashCode() {
       return Objects.hashCode(id);
@@ -83,8 +94,8 @@ public class Message {
    }
 
    protected Objects.ToStringHelper string() {
-      return Objects.toStringHelper(this)
-         .add("id", id).add("ttl", ttl).add("body", body).add("age", age);
+      return Objects.toStringHelper(this).omitNullValues()
+         .add("id", id).add("ttl", ttl).add("body", body).add("age", age).add("claimId", claimId);
    }
 
    @Override
@@ -107,6 +118,7 @@ public class Message {
       protected int ttl;
       protected String body;
       protected int age;
+      protected String claimId;
 
       /**
        * @see Message#getId()
@@ -140,12 +152,21 @@ public class Message {
          return self();
       }
 
+      /**
+       * @see Message#getClaimId()
+       */
+      public Builder claimId(String claimId) {
+         this.claimId = claimId;
+         return self();
+      }
+
       public Message build() {
-         return new Message(id, ttl, body, age);
+         return new Message(id, ttl, body, age, claimId);
       }
 
       public Builder fromMessage(Message in) {
-         return this.id(in.getId()).ttl(in.getTTL()).body(in.getBody()).age(in.getAge());
+         return this.id(in.getId()).ttl(in.getTTL()).body(in.getBody()).age(in.getAge())
+               .claimId(in.getClaimId().orNull());
       }
    }
 

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/ClaimApi.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/ClaimApi.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/ClaimApi.java
new file mode 100644
index 0000000..f14deb5
--- /dev/null
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/ClaimApi.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.openstack.marconi.v1.features;
+
+import org.jclouds.openstack.keystone.v2_0.filters.AuthenticateRequest;
+import org.jclouds.openstack.marconi.v1.domain.Claim;
+import org.jclouds.openstack.marconi.v1.domain.Message;
+import org.jclouds.openstack.marconi.v1.functions.ParseClaim;
+import org.jclouds.openstack.marconi.v1.functions.ParseMessagesToList;
+import org.jclouds.rest.annotations.Fallback;
+import org.jclouds.rest.annotations.PATCH;
+import org.jclouds.rest.annotations.Payload;
+import org.jclouds.rest.annotations.PayloadParam;
+import org.jclouds.rest.annotations.RequestFilters;
+import org.jclouds.rest.annotations.ResponseParser;
+import org.jclouds.rest.annotations.SkipEncoding;
+
+import javax.inject.Named;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import java.util.List;
+
+import static org.jclouds.Fallbacks.EmptyListOnNotFoundOr404;
+import static org.jclouds.Fallbacks.FalseOnNotFoundOr404;
+import static org.jclouds.Fallbacks.NullOnNotFoundOr404;
+
+/**
+ * Provides access to Messages via their REST API.
+ *
+ * @author Everett Toews
+ */
+@SkipEncoding({'/', '='})
+@RequestFilters(AuthenticateRequest.class)
+public interface ClaimApi {
+   /**
+    * This operation claims a set of messages (up to the value of the limit parameter) from oldest to newest and skips
+    * any messages that are already claimed. If no unclaimed messages are available, an empty List is returned.
+    * </p>
+    * When a client (worker) finishes processing a message, it should delete the message before the claim expires to
+    * ensure that the message is processed only once. As part of the delete operation, workers should specify the claim
+    * ID. If workers perform these actions and a claim simply expires, the server can return an error and notify the
+    * worker of the race condition. This action gives the worker a chance to roll back its own processing of the given
+    * message because another worker can claim the message and process it.
+    * </p>
+    * The age given for a claim is relative to the server's clock. The claim's age is useful for determining how
+    * quickly messages are getting processed and whether a given message's claim is about to expire.
+    * </p>
+    * When a claim expires, it is released. If the original worker failed to process the message, another client worker
+    * can then claim the message.
+    * </p>
+    * Note that claim creation is best-effort, meaning the worker may claim and return less than the requested number
+    * of messages.
+    * </p>
+    * To deal with workers that have stopped responding (for up to 1209600 seconds or 14 days, including claim
+    * lifetime), the server extends the lifetime of claimed messages to be at least as long as the lifetime of the
+    * claim itself, plus the specified grace period. If a claimed message would normally live longer than the grace
+    * period, its expiration is not adjusted.
+    *
+    * @param ttl   The ttl attribute specifies how long the server waits before releasing the claim. The ttl value
+    *              must be between 60 and 43200 seconds (12 hours). You must include a value for this attribute in
+    *              your request.
+    * @param grace The grace attribute specifies the message grace period in seconds. The value of grace value must
+    *              be between 60 and 43200 seconds (12 hours). You must include a value for thia attribute in your
+    *              request.
+    * @param limit Specifies the number of messages to return, up to 20 messages.
+    */
+   @Named("claim:claim")
+   @POST
+   @Path("/claims")
+   @ResponseParser(ParseMessagesToList.class)
+   @Fallback(EmptyListOnNotFoundOr404.class)
+   @Payload("%7B\"ttl\":{ttl},\"grace\":{grace}%7D")
+   List<Message> claim(@PayloadParam("ttl") int ttl,
+                       @PayloadParam("grace") int grace,
+                       @QueryParam("limit") int limit);
+
+   /**
+    * Gets a specific claim and the associated messages.
+    *
+    * @param claimId Specific claim ID of the message to get.
+    */
+   @Named("claim:get")
+   @GET
+   @ResponseParser(ParseClaim.class)
+   @Consumes(MediaType.APPLICATION_JSON)
+   @Path("/claims/{claim_id}")
+   @Fallback(NullOnNotFoundOr404.class)
+   Claim get(@PathParam("claim_id") String claimId);
+
+   /**
+    * Clients should periodically renew claims during long-running batches of work to avoid losing a claim while
+    * processing a message. The client can renew a claim by including a new TTL for the claim (which can be different
+    * from the original TTL). The server resets the age of the claim and applies the new TTL.
+    *
+    * @param claimId Specific claim ID of the message to get.
+    * @param ttl     The ttl attribute specifies how long the server waits before releasing the claim. The ttl value
+    *                must be between 60 and 43200 seconds (12 hours). You must include a value for this attribute in
+    *                your request.
+    */
+   // TODO: revisit this when we figure out what's wrong with PATCH
+   //   @Named("claim:update")
+   //   @PATCH
+   //   @Path("/claims/{claim_id}")
+   //   @Fallback(FalseOnNotFoundOr404.class)
+   //   @Payload("%7B\"ttl\":{ttl}%7D")
+   //   @Produces(MediaType.APPLICATION_JSON)
+   //   boolean update(@PathParam("claim_id") String claimId,
+   //                  @PayloadParam("ttl") int ttl);
+
+   /**
+    * This operation immediately releases a claim, making any remaining, undeleted) messages that are associated with
+    * the claim available to other workers. This operation is useful when a worker is performing a graceful shutdown,
+    * fails to process one or more messages, or is taking longer than expected to process messages, and wants to make
+    * the remainder of the messages available to other workers.
+    *
+    * @param claimId Specific claim ID of the message to get.
+    */
+   @Named("claim:delete")
+   @DELETE
+   @Path("/claims/{claim_id}")
+   @Fallback(FalseOnNotFoundOr404.class)
+   boolean release(@PathParam("claim_id") String claimId);
+}

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java
index 5dc0aa9..645602c 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java
@@ -42,6 +42,7 @@ import javax.ws.rs.HeaderParam;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import java.util.List;
 import java.util.UUID;
@@ -83,7 +84,7 @@ public interface MessageApi {
     * messages indefinitely.
     *
     * @param clientId A UUID for each client instance.
-    * @param options Options for streaming messages to your client.
+    * @param options  Options for streaming messages to your client.
     */
    @Named("message:stream")
    @GET
@@ -98,7 +99,7 @@ public interface MessageApi {
     * Lists specific messages. Unlike the stream method, a client's own messages are always returned in this operation.
     *
     * @param clientId A UUID for each client instance.
-    * @param ids Specifies the IDs of the messages to get.
+    * @param ids      Specifies the IDs of the messages to list.
     */
    @Named("message:list")
    @GET
@@ -109,13 +110,11 @@ public interface MessageApi {
    List<Message> list(@HeaderParam("Client-ID") UUID clientId,
                       @BinderParam(BindIdsToQueryParam.class) Iterable<String> ids);
 
-   // TODO: list by claim id when claim API done
-
    /**
     * Gets a specific message. Unlike the stream method, a client's own messages are always returned in this operation.
     *
     * @param clientId A UUID for each client instance.
-    * @param id Specific ID of the message to get.
+    * @param id       Specific ID of the message to get.
     */
    @Named("message:get")
    @GET
@@ -131,7 +130,7 @@ public interface MessageApi {
     * remaining valid messages IDs are deleted.
     *
     * @param clientId A UUID for each client instance.
-    * @param ids Specifies the IDs of the messages to delete.
+    * @param ids      Specifies the IDs of the messages to delete.
     */
    @Named("message:delete")
    @DELETE
@@ -141,5 +140,22 @@ public interface MessageApi {
    boolean delete(@HeaderParam("Client-ID") UUID clientId,
                   @BinderParam(BindIdsToQueryParam.class) Iterable<String> ids);
 
-   // TODO: delete by claim id when claim API done
+   /**
+    * The claimId parameter specifies that the message is deleted only if it has the specified claim ID and that claim
+    * has not expired. This specification is useful for ensuring only one worker processes any given message. When a
+    * worker's claim expires before it can delete a message that it has processed, the worker must roll back any
+    * actions it took based on that message because another worker can now claim and process the same message.
+    *
+    * @param clientId A UUID for each client instance.
+    * @param id       Specific ID of the message to delete.
+    * @param claimId  Specific claim ID of the message to delete.
+    */
+   @Named("message:delete")
+   @DELETE
+   @Consumes(MediaType.APPLICATION_JSON)
+   @Path("/messages/{message_id}")
+   @Fallback(FalseOnNotFoundOr404.class)
+   boolean deleteByClaim(@HeaderParam("Client-ID") UUID clientId,
+                         @PathParam("message_id") String id,
+                         @QueryParam("claim_id") String claimId);
 }

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseClaim.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseClaim.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseClaim.java
new file mode 100644
index 0000000..c03358f
--- /dev/null
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseClaim.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.openstack.marconi.v1.functions;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import org.jclouds.http.HttpResponse;
+import org.jclouds.http.functions.ParseJson;
+import org.jclouds.openstack.marconi.v1.domain.Claim;
+import org.jclouds.openstack.marconi.v1.domain.Message;
+
+import javax.inject.Inject;
+import java.beans.ConstructorProperties;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Iterables.transform;
+import static com.google.common.collect.Lists.newArrayList;
+import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.MessageWithHref;
+import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_ID_FROM_HREF;
+import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_MESSAGE;
+
+/**
+ * @author Everett Toews
+ */
+public class ParseClaim implements Function<HttpResponse, Claim> {
+
+   protected static final Function<ClaimWithHref, Claim> TO_CLAIM = new Function<ClaimWithHref, Claim>() {
+      @Override
+      public Claim apply(ClaimWithHref claimWithHref) {
+         List<Message> messages = newArrayList(transform(claimWithHref.messagesWithHref, TO_MESSAGE));
+         String claimId = TO_ID_FROM_HREF.apply(claimWithHref.getId());
+
+         return claimWithHref.toBuilder()
+               .id(claimId)
+               .messages(messages)
+               .build();
+      }
+   };
+   private final ParseJson<ClaimWithHref> json;
+
+   @Inject
+   ParseClaim(ParseJson<ClaimWithHref> json) {
+      this.json = checkNotNull(json, "json");
+   }
+
+   @Override
+   public Claim apply(HttpResponse response) {
+      ClaimWithHref claimWithHref = json.apply(response);
+      Claim claim = TO_CLAIM.apply(claimWithHref);
+
+      return claim;
+   }
+
+   private static class ClaimWithHref extends Claim {
+      private final List<MessageWithHref> messagesWithHref;
+
+      @ConstructorProperties({"href", "ttl", "age", "messages"})
+      protected ClaimWithHref(String href, int ttl, int age, List<MessageWithHref> messagesWithHref) {
+         super(href, ttl, age, null);
+         this.messagesWithHref = messagesWithHref;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java
index 9be3722..3d38092 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java
@@ -26,7 +26,7 @@ import org.jclouds.openstack.marconi.v1.domain.MessagesCreated;
 import java.util.List;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_MESSAGE_ID;
+import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_ID_FROM_HREF;
 
 /**
  * This parses the messages created on a queue.
@@ -44,7 +44,7 @@ public class ParseMessagesCreated implements Function<HttpResponse, MessagesCrea
 
    public MessagesCreated apply(HttpResponse from) {
       MessagesCreated rawMessagesCreated = json.apply(from);
-      List<String> messageIds = Lists.transform(rawMessagesCreated.getMessageIds(), TO_MESSAGE_ID);
+      List<String> messageIds = Lists.transform(rawMessagesCreated.getMessageIds(), TO_ID_FROM_HREF);
 
       MessagesCreated messagesCreated = MessagesCreated.builder()
             .messageIds(messageIds)

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java
index 76a9ba7..42e2b02 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java
@@ -18,7 +18,7 @@ package org.jclouds.openstack.marconi.v1.functions;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
 import org.jclouds.http.HttpResponse;
 import org.jclouds.http.functions.ParseJson;
 import org.jclouds.openstack.marconi.v1.domain.Message;
@@ -30,6 +30,9 @@ import javax.inject.Inject;
 import java.beans.ConstructorProperties;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static com.google.common.collect.Iterables.transform;
+import static org.jclouds.http.utils.Queries.queryParser;
 
 /**
  * @author Everett Toews
@@ -51,27 +54,53 @@ public class ParseMessagesToStream implements Function<HttpResponse, MessageStre
       }
 
       MessagesWithHref messagesWithHref = json.apply(response);
-      Iterable<Message> messages = Iterables.transform(messagesWithHref, TO_MESSAGE);
+      Iterable<Message> messages = transform(messagesWithHref, TO_MESSAGE);
 
       return new Messages(messages, messagesWithHref.getLinks());
    }
 
-   private static String getMessageId(String rawMessageHref) {
-      // strip off everything but the message id
-      return rawMessageHref.substring(rawMessageHref.lastIndexOf('/')+1);
+   /**
+    * Strip off everything but the message id.
+    */
+   private static String getIdFromHref(String rawMessageHref) {
+      int indexOfQuestionMark = rawMessageHref.indexOf('?');
+      int lastIndexOfSlash = rawMessageHref.lastIndexOf('/') + 1;
+
+      if (indexOfQuestionMark > 0) {
+         return rawMessageHref.substring(lastIndexOfSlash, indexOfQuestionMark);
+      }
+      else {
+         return rawMessageHref.substring(lastIndexOfSlash);
+      }
+   }
+
+   private static String getClaimIdFromHref(String rawMessageHref) {
+      int indexOfQuestionMark = rawMessageHref.indexOf('?') + 1;
+
+      if (indexOfQuestionMark > 0) {
+         Multimap<String, String> queryParams = queryParser().apply(rawMessageHref.substring(indexOfQuestionMark));
+
+         return getOnlyElement(queryParams.get("claim_id"), null);
+      }
+      else {
+         return null;
+      }
    }
 
    protected static final Function<MessageWithHref, Message> TO_MESSAGE = new Function<MessageWithHref, Message>() {
       @Override
       public Message apply(MessageWithHref messageWithHref) {
-         return messageWithHref.toBuilder().id(getMessageId(messageWithHref.getId())).build();
+         return messageWithHref.toBuilder()
+               .id(getIdFromHref(messageWithHref.getId()))
+               .claimId(getClaimIdFromHref(messageWithHref.getId()))
+               .build();
       }
    };
 
-   protected static final Function<String, String> TO_MESSAGE_ID = new Function<String, String>() {
+   protected static final Function<String, String> TO_ID_FROM_HREF = new Function<String, String>() {
       @Override
       public String apply(String messageIdWithHref) {
-         return getMessageId(messageIdWithHref);
+         return getIdFromHref(messageIdWithHref);
       }
    };
 
@@ -95,7 +124,7 @@ public class ParseMessagesToStream implements Function<HttpResponse, MessageStre
 
       @ConstructorProperties({ "href", "ttl", "body", "age" })
       protected MessageWithHref(String href, int ttl, String body, int age) {
-         super(href, ttl, body, age);
+         super(href, ttl, body, age, null);
       }
    }
 }

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java
index e6f0ee2..264dc38 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java
@@ -25,7 +25,7 @@ import org.jclouds.openstack.marconi.v1.domain.MessagesStats;
 import org.jclouds.openstack.marconi.v1.domain.QueueStats;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_MESSAGE_ID;
+import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_ID_FROM_HREF;
 
 /**
  * This parses the stats of a queue.
@@ -51,11 +51,11 @@ public class ParseQueueStats implements Function<HttpResponse, QueueStats> {
          // change the hrefs to ids
          Aged oldestWithHref = rawQueueStats.getMessagesStats().getOldest().get();
          Aged oldestWithId = oldestWithHref.toBuilder()
-               .id(TO_MESSAGE_ID.apply(oldestWithHref.getId()))
+               .id(TO_ID_FROM_HREF.apply(oldestWithHref.getId()))
                .build();
          Aged newestWithHref = rawQueueStats.getMessagesStats().getNewest().get();
          Aged newestWithId = newestWithHref.toBuilder()
-               .id(TO_MESSAGE_ID.apply(newestWithHref.getId()))
+               .id(TO_ID_FROM_HREF.apply(newestWithHref.getId()))
                .build();
 
          MessagesStats messagesStatsWithIds = rawQueueStats.getMessagesStats().toBuilder()

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java
index b0ff396..2b8ca40 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java
@@ -67,6 +67,14 @@ public class StreamMessagesOptions extends PaginationOptions {
    }
 
    /**
+    * @see Builder#includeClaimed(boolean)
+    */
+   public StreamMessagesOptions includeClaimed(boolean includeClaimed) {
+      queryParameters.put("include_claimed", Boolean.toString(includeClaimed));
+      return this;
+   }
+
+   /**
     * @return The String representation of the marker for these StreamMessagesOptions.
     */
    public String getMarker() {
@@ -115,5 +123,13 @@ public class StreamMessagesOptions extends PaginationOptions {
          StreamMessagesOptions options = new StreamMessagesOptions();
          return options.echo(echo);
       }
+
+      /**
+       * The includeClaimed parameter determines whether the API returns claimed messages.
+       */
+      public static StreamMessagesOptions includeClaimed(boolean includeClaimed) {
+         StreamMessagesOptions options = new StreamMessagesOptions();
+         return options.echo(includeClaimed);
+      }
    }
 }

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/ClaimApiLiveTest.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/ClaimApiLiveTest.java b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/ClaimApiLiveTest.java
new file mode 100644
index 0000000..b2d5d80
--- /dev/null
+++ b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/ClaimApiLiveTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.openstack.marconi.v1.features;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import org.jclouds.openstack.marconi.v1.domain.Claim;
+import org.jclouds.openstack.marconi.v1.domain.CreateMessage;
+import org.jclouds.openstack.marconi.v1.domain.Message;
+import org.jclouds.openstack.marconi.v1.domain.MessagesCreated;
+import org.jclouds.openstack.marconi.v1.internal.BaseMarconiApiLiveTest;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static com.google.common.collect.Iterables.getLast;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+@Test(groups = "live", testName = "ClaimApiLiveTest", singleThreaded = true)
+public class ClaimApiLiveTest extends BaseMarconiApiLiveTest {
+
+   private static final UUID CLIENT_ID = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+   private final Map<String, List<String>> claimIds = Maps.newHashMap();
+
+   public void createQueues() throws Exception {
+      for (String zoneId : zones) {
+         QueueApi queueApi = api.getQueueApiForZone(zoneId);
+         boolean success = queueApi.create("jclouds-test");
+
+         assertTrue(success);
+      }
+   }
+
+   @Test(dependsOnMethods = { "createQueues" })
+   public void createMessages() throws Exception {
+      for (String zoneId : zones) {
+         MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, "jclouds-test");
+
+         UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+         String json1 = "{\"event\":{\"name\":\"Austin Java User Group\",\"attendees\":[\"bob\",\"jim\",\"sally\"]}}";
+         CreateMessage message1 = CreateMessage.builder().ttl(86400).body(json1).build();
+         String json2 = "{\"event\":{\"name\":\"SF Java User Group\",\"attendees\":[\"bob\",\"jim\",\"sally\"]}}";
+         CreateMessage message2 = CreateMessage.builder().ttl(86400).body(json2).build();
+         String json3 = "{\"event\":{\"name\":\"HK Java User Group\",\"attendees\":[\"bob\",\"jim\",\"sally\"]}}";
+         CreateMessage message3 = CreateMessage.builder().ttl(86400).body(json3).build();
+         List<CreateMessage> messages = ImmutableList.of(message1, message2, message3);
+
+         MessagesCreated messagesCreated = messageApi.create(clientId, messages);
+
+         assertNotNull(messagesCreated);
+         assertEquals(messagesCreated.getMessageIds().size(), 3);
+      }
+   }
+
+   @Test(dependsOnMethods = { "createMessages" })
+   public void claimMessages() throws Exception {
+      for (String zoneId : zones) {
+         ClaimApi claimApi = api.getClaimApiForZoneAndClientAndQueue(zoneId, CLIENT_ID, "jclouds-test");
+
+         List<Message> messages = claimApi.claim(300, 200, 2);
+         assertEquals(messages.size(), 2);
+
+         claimIds.put(zoneId, new ArrayList<String>());
+
+         for (Message message: messages) {
+            claimIds.get(zoneId).add(message.getClaimId().get());
+
+            assertNotNull(message.getId());
+            assertTrue(message.getClaimId().isPresent());
+            assertEquals(message.getTTL(), 86400);
+         }
+      }
+   }
+
+   @Test(dependsOnMethods = { "claimMessages" })
+   public void getClaim() throws Exception {
+      for (String zoneId : zones) {
+         ClaimApi claimApi = api.getClaimApiForZoneAndClientAndQueue(zoneId, CLIENT_ID, "jclouds-test");
+
+         Claim claim = claimApi.get(claimIds.get(zoneId).get(0));
+
+         assertNotNull(claim.getId());
+         assertEquals(claim.getMessages().size(), 2);
+         assertEquals(claim.getTTL(), 300);
+
+         for (Message message: claim.getMessages()) {
+            assertNotNull(message.getId());
+            assertTrue(message.getClaimId().isPresent());
+            assertEquals(message.getTTL(), 86400);
+         }
+      }
+   }
+
+   @Test(dependsOnMethods = { "getClaim" })
+   public void releaseClaim() throws Exception {
+      for (String zoneId : zones) {
+         ClaimApi claimApi = api.getClaimApiForZoneAndClientAndQueue(zoneId, CLIENT_ID, "jclouds-test");
+
+         boolean success = claimApi.release(claimIds.get(zoneId).get(0));
+
+         assertTrue(success);
+      }
+   }
+
+   @Test(dependsOnMethods = { "getClaim" })
+   public void delete() throws Exception {
+      for (String zoneId : zones) {
+         QueueApi queueApi = api.getQueueApiForZone(zoneId);
+         boolean success = queueApi.delete("jclouds-test");
+
+         assertTrue(success);
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/ClaimApiMockTest.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/ClaimApiMockTest.java b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/ClaimApiMockTest.java
new file mode 100644
index 0000000..04eee13
--- /dev/null
+++ b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/ClaimApiMockTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.openstack.marconi.v1.features;
+
+import com.squareup.okhttp.mockwebserver.MockResponse;
+import com.squareup.okhttp.mockwebserver.MockWebServer;
+import org.jclouds.openstack.marconi.v1.MarconiApi;
+import org.jclouds.openstack.marconi.v1.domain.Claim;
+import org.jclouds.openstack.marconi.v1.domain.Message;
+import org.jclouds.openstack.v2_0.internal.BaseOpenStackMockTest;
+import org.testng.annotations.Test;
+
+import java.util.List;
+import java.util.UUID;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * @author Everett Toews
+ */
+@Test
+public class ClaimApiMockTest extends BaseOpenStackMockTest<MarconiApi> {
+   private static final UUID CLIENT_ID = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+
+   public void claimMessages() throws Exception {
+      MockWebServer server = mockOpenStackServer();
+      server.enqueue(new MockResponse().setBody(accessRackspace));
+      server.enqueue(new MockResponse().setResponseCode(201).setBody("[{\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"HK Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 1997, \"href\": \"/v1/queues/jclouds-test/messages/52a645633ac24e6f0be88d44?claim_id=52a64d30ef913e6d05e7f786\", \"ttl\": 86400}, {\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"SF Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 981, \"href\": \"/v1/queues/jclouds-test/messages/52a6495bef913e6d195dcffe?claim_id=52a64d30ef913e6d05e7f786\", \"ttl\": 86400}]"));
+
+      try {
+         MarconiApi api = api(server.getUrl("/").toString(), "openstack-marconi");
+         ClaimApi claimApi = api.getClaimApiForZoneAndClientAndQueue("DFW", CLIENT_ID, "jclouds-test");
+
+         List<Message> messages = claimApi.claim(300, 200, 2);
+
+         assertEquals(messages.size(), 2);
+         assertEquals(messages.get(0).getId(), "52a645633ac24e6f0be88d44");
+         assertEquals(messages.get(0).getClaimId().get(), "52a64d30ef913e6d05e7f786");
+         assertEquals(messages.get(0).getTTL(), 86400);
+         assertEquals(messages.get(1).getId(), "52a6495bef913e6d195dcffe");
+         assertEquals(messages.get(1).getClaimId().get(), "52a64d30ef913e6d05e7f786");
+         assertEquals(messages.get(1).getTTL(), 86400);
+
+         assertEquals(server.getRequestCount(), 2);
+         assertEquals(server.takeRequest().getRequestLine(), "POST /tokens HTTP/1.1");
+         assertEquals(server.takeRequest().getRequestLine(), "POST /v1/123123/queues/jclouds-test/claims?limit=2 HTTP/1.1");
+      }
+      finally {
+         server.shutdown();
+      }
+   }
+
+   public void getClaim() throws Exception {
+      MockWebServer server = mockOpenStackServer();
+      server.enqueue(new MockResponse().setBody(accessRackspace));
+      server.enqueue(new MockResponse().setResponseCode(201).setBody("{\"age\": 209, \"href\": \"/v1/queues/jclouds-test/claims/52a8d23eb04a584f1bbd4f47\", \"messages\": [{\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"SF Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 12182, \"href\": \"/v1/queues/jclouds-test/messages/52a8a379b04a584f2ec2bc3e?claim_id=52a8d23eb04a584f1bbd4f47\", \"ttl\": 86400}, {\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"Austin Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 12182, \"href\": \"/v1/queues/jclouds-test/messages/52a8a379b04a584f2ec2bc3f?claim_id=52a8d23eb04a584f1bbd4f47\", \"ttl\": 86400}], \"ttl\": 300}"));
+
+      try {
+         MarconiApi api = api(server.getUrl("/").toString(), "openstack-marconi");
+         ClaimApi claimApi = api.getClaimApiForZoneAndClientAndQueue("DFW", CLIENT_ID, "jclouds-test");
+
+         Claim claim = claimApi.get("52a8d23eb04a584f1bbd4f47");
+
+         assertEquals(claim.getMessages().size(), 2);
+         assertEquals(claim.getId(), "52a8d23eb04a584f1bbd4f47");
+         assertEquals(claim.getAge(), 209);
+         assertEquals(claim.getTTL(), 300);
+
+         assertEquals(claim.getMessages().get(0).getId(), "52a8a379b04a584f2ec2bc3e");
+         assertEquals(claim.getMessages().get(0).getClaimId().get(), "52a8d23eb04a584f1bbd4f47");
+         assertEquals(claim.getMessages().get(0).getAge(), 12182);
+         assertEquals(claim.getMessages().get(0).getTTL(), 86400);
+
+         assertEquals(claim.getMessages().get(1).getId(), "52a8a379b04a584f2ec2bc3f");
+         assertEquals(claim.getMessages().get(1).getClaimId().get(), "52a8d23eb04a584f1bbd4f47");
+         assertEquals(claim.getMessages().get(1).getAge(), 12182);
+         assertEquals(claim.getMessages().get(1).getTTL(), 86400);
+
+         assertEquals(server.getRequestCount(), 2);
+         assertEquals(server.takeRequest().getRequestLine(), "POST /tokens HTTP/1.1");
+         assertEquals(server.takeRequest().getRequestLine(), "GET /v1/123123/queues/jclouds-test/claims/52a8d23eb04a584f1bbd4f47 HTTP/1.1");
+      }
+      finally {
+         server.shutdown();
+      }
+   }
+
+   public void releaseClaim() throws Exception {
+      MockWebServer server = mockOpenStackServer();
+      server.enqueue(new MockResponse().setBody(accessRackspace));
+      server.enqueue(new MockResponse().setResponseCode(204));
+
+      try {
+         MarconiApi api = api(server.getUrl("/").toString(), "openstack-marconi");
+         ClaimApi claimApi = api.getClaimApiForZoneAndClientAndQueue("DFW", CLIENT_ID, "jclouds-test");
+
+         boolean success = claimApi.release("52a8d23eb04a584f1bbd4f47");
+
+         assertTrue(success);
+
+         assertEquals(server.getRequestCount(), 2);
+         assertEquals(server.takeRequest().getRequestLine(), "POST /tokens HTTP/1.1");
+         assertEquals(server.takeRequest().getRequestLine(), "DELETE /v1/123123/queues/jclouds-test/claims/52a8d23eb04a584f1bbd4f47 HTTP/1.1");
+      }
+      finally {
+         server.shutdown();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java
index 47cd6e0..3d7d745 100644
--- a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java
+++ b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.UUID;
 
 import static com.google.common.collect.Iterables.getLast;
+import static com.google.common.collect.Iterables.getOnlyElement;
 import static org.jclouds.openstack.marconi.v1.options.StreamMessagesOptions.Builder.echo;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -175,6 +176,20 @@ public class MessageApiLiveTest extends BaseMarconiApiLiveTest {
    }
 
    @Test(dependsOnMethods = { "getMessage" })
+   public void deleteMessagesByClaimId() throws Exception {
+      for (String zoneId : zones) {
+         UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+         MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, "jclouds-test");
+         ClaimApi claimApi = api.getClaimApiForZoneAndClientAndQueue(zoneId, clientId, "jclouds-test");
+         Message message = getOnlyElement(claimApi.claim(300, 100, 1));
+
+         boolean success = messageApi.deleteByClaim(clientId, message.getId(), message.getClaimId().get());
+
+         assertTrue(success);
+      }
+   }
+
+   @Test(dependsOnMethods = { "deleteMessagesByClaimId" })
    public void deleteMessages() throws Exception {
       for (String zoneId : zones) {
          MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, "jclouds-test");

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java
index dbf9799..8e5a5f4 100644
--- a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java
+++ b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java
@@ -277,4 +277,27 @@ public class MessageApiMockTest extends BaseOpenStackMockTest<MarconiApi> {
          server.shutdown();
       }
    }
+
+   public void deleteMessageByClaimId() throws Exception {
+      MockWebServer server = mockOpenStackServer();
+      server.enqueue(new MockResponse().setBody(accessRackspace));
+      server.enqueue(new MockResponse().setResponseCode(204));
+
+      try {
+         MarconiApi api = api(server.getUrl("/").toString(), "openstack-marconi");
+         MessageApi messageApi = api.getMessageApiForZoneAndQueue("DFW", "jclouds-test");
+         UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+
+         boolean success = messageApi.deleteByClaim(clientId, "52936b8a3ac24e6ef4c067dd", "5292b30cef913e6d026f4dec");
+
+         assertTrue(success);
+
+         assertEquals(server.getRequestCount(), 2);
+         assertEquals(server.takeRequest().getRequestLine(), "POST /tokens HTTP/1.1");
+         assertEquals(server.takeRequest().getRequestLine(), "DELETE /v1/123123/queues/jclouds-test/messages/52936b8a3ac24e6ef4c067dd?claim_id=5292b30cef913e6d026f4dec HTTP/1.1");
+      }
+      finally {
+         server.shutdown();
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/rackspace-cloudqueues-us/src/test/java/org/jclouds/rackspace/cloudqueues/us/CloudQueuesUSClaimApiLiveTest.java
----------------------------------------------------------------------
diff --git a/rackspace-cloudqueues-us/src/test/java/org/jclouds/rackspace/cloudqueues/us/CloudQueuesUSClaimApiLiveTest.java b/rackspace-cloudqueues-us/src/test/java/org/jclouds/rackspace/cloudqueues/us/CloudQueuesUSClaimApiLiveTest.java
new file mode 100644
index 0000000..9a210b4
--- /dev/null
+++ b/rackspace-cloudqueues-us/src/test/java/org/jclouds/rackspace/cloudqueues/us/CloudQueuesUSClaimApiLiveTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.rackspace.cloudqueues.us;
+
+import org.jclouds.openstack.marconi.v1.features.ClaimApiLiveTest;
+import org.testng.annotations.Test;
+
+/**
+ * @author Everett Toews
+ */
+@Test(groups = "live", testName = "CloudQueuesUSClaimApiLiveTest")
+public class CloudQueuesUSClaimApiLiveTest extends ClaimApiLiveTest {
+    public CloudQueuesUSClaimApiLiveTest() {
+        provider = "rackspace-cloudqueues-us";
+    }
+}