You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jclouds.apache.org by ev...@apache.org on 2013/12/12 19:36:49 UTC
git commit: The OpenStack Marconi Claim API.
Updated Branches:
refs/heads/master 51a46bfad -> a9a2b5bc8
The OpenStack Marconi Claim API.
Project: http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/commit/a9a2b5bc
Tree: http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/tree/a9a2b5bc
Diff: http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/diff/a9a2b5bc
Branch: refs/heads/master
Commit: a9a2b5bc8f910d589d531e1b088e89729f7286ee
Parents: 51a46bf
Author: Everett Toews <ev...@rackspace.com>
Authored: Wed Dec 11 21:55:41 2013 -0600
Committer: Everett Toews <ev...@rackspace.com>
Committed: Thu Dec 12 12:36:37 2013 -0600
----------------------------------------------------------------------
.../openstack/marconi/v1/MarconiApi.java | 23 ++-
.../openstack/marconi/v1/domain/Claim.java | 164 +++++++++++++++++++
.../openstack/marconi/v1/domain/Message.java | 31 +++-
.../openstack/marconi/v1/features/ClaimApi.java | 144 ++++++++++++++++
.../marconi/v1/features/MessageApi.java | 30 +++-
.../marconi/v1/functions/ParseClaim.java | 78 +++++++++
.../v1/functions/ParseMessagesCreated.java | 4 +-
.../v1/functions/ParseMessagesToStream.java | 47 +++++-
.../marconi/v1/functions/ParseQueueStats.java | 6 +-
.../v1/options/StreamMessagesOptions.java | 16 ++
.../marconi/v1/features/ClaimApiLiveTest.java | 134 +++++++++++++++
.../marconi/v1/features/ClaimApiMockTest.java | 126 ++++++++++++++
.../marconi/v1/features/MessageApiLiveTest.java | 15 ++
.../marconi/v1/features/MessageApiMockTest.java | 23 +++
.../us/CloudQueuesUSClaimApiLiveTest.java | 30 ++++
15 files changed, 844 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApi.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApi.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApi.java
index fe4c048..3583bbe 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApi.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApi.java
@@ -20,15 +20,18 @@ import com.google.inject.Provides;
import org.jclouds.javax.annotation.Nullable;
import org.jclouds.location.Zone;
import org.jclouds.location.functions.ZoneToEndpoint;
+import org.jclouds.openstack.marconi.v1.features.ClaimApi;
import org.jclouds.openstack.marconi.v1.features.MessageApi;
import org.jclouds.openstack.marconi.v1.features.QueueApi;
import org.jclouds.rest.annotations.Delegate;
import org.jclouds.rest.annotations.EndpointParam;
+import javax.ws.rs.HeaderParam;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import java.io.Closeable;
import java.util.Set;
+import java.util.UUID;
/**
* Marconi is a robust, web-scale message queuing service to support the distributed nature of large web applications.
@@ -55,11 +58,29 @@ public interface MarconiApi extends Closeable {
/**
* Provides access to Message features.
*
- * @param zone The zone where this queue will live.
+ * @param zone The zone where this queue lives.
* @param name Name of the queue.
*/
@Delegate
@Path("/queues/{name}")
MessageApi getMessageApiForZoneAndQueue(
@EndpointParam(parser = ZoneToEndpoint.class) @Nullable String zone, @PathParam("name") String name);
+
+ /**
+ * Provides access to Claim features.
+ *
+ * @param zone The zone where this queue lives.
+ * @param clientId A UUID for each client instance. The UUID must be submitted in its canonical form (for example,
+ * 3381af92-2b9e-11e3-b191-71861300734c). The client generates the Client-ID once. Client-ID
+ * persists between restarts of the client so the client should reuse that same Client-ID. All
+ * message-related operations require the use of Client-ID in the headers to ensure that messages
+ * are not echoed back to the client that posted them, unless the client explicitly requests this.
+ * @param name Name of the queue.
+ */
+ @Delegate
+ @Path("/queues/{name}")
+ ClaimApi getClaimApiForZoneAndClientAndQueue(
+ @EndpointParam(parser = ZoneToEndpoint.class) @Nullable String zone,
+ @HeaderParam("Client-ID") UUID clientId,
+ @PathParam("name") String name);
}
http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Claim.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Claim.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Claim.java
new file mode 100644
index 0000000..71945b4
--- /dev/null
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Claim.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to jclouds, Inc. (jclouds) under one or more
+ * contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. jclouds licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jclouds.openstack.marconi.v1.domain;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
+import org.jclouds.javax.annotation.Nullable;
+
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A claim for messages in a queue.
+ *
+ * @author Everett Toews
+ */
+public class Claim {
+
+ private final String id;
+ private final int ttl;
+ private final int age;
+ private final List<Message> messages;
+
+ protected Claim(String id, int ttl, int age, @Nullable List<Message> messages) {
+ this.id = checkNotNull(id, "id required");
+ this.ttl = ttl;
+ this.age = age;
+ this.messages = messages == null ? ImmutableList.<Message>of() : messages;
+ }
+
+ /**
+ * @return The id of this message.
+ */
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * @see CreateMessage.Builder#ttl(int)
+ */
+ public int getTTL() {
+ return ttl;
+ }
+
+ /**
+ * @return Age of this message in seconds.
+ */
+ public int getAge() {
+ return age;
+ }
+
+ /**
+ * @return The messages that are associated with this claim.
+ */
+ public List<Message> getMessages() {
+ return messages;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(id);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null || getClass() != obj.getClass()) return false;
+ Claim that = Claim.class.cast(obj);
+ return Objects.equal(this.id, that.id);
+ }
+
+ protected Objects.ToStringHelper string() {
+ return Objects.toStringHelper(this)
+ .add("id", id).add("ttl", ttl).add("age", age).add("messages", messages);
+ }
+
+ @Override
+ public String toString() {
+ return string().toString();
+ }
+
+ public static Builder builder() {
+ return new ConcreteBuilder();
+ }
+
+ public Builder toBuilder() {
+ return new ConcreteBuilder().fromMessage(this);
+ }
+
+ public static abstract class Builder {
+ protected abstract Builder self();
+
+ protected String id;
+ protected int ttl;
+ protected int age;
+ protected List<Message> messages;
+
+ /**
+ * @see Claim#getId()
+ */
+ public Builder id(String id) {
+ this.id = id;
+ return self();
+ }
+
+ /**
+ * @see CreateMessage.Builder#ttl(int)
+ */
+ public Builder ttl(int ttl) {
+ this.ttl = ttl;
+ return self();
+ }
+
+ /**
+ * @see Claim#getAge()
+ */
+ public Builder age(int age) {
+ this.age = age;
+ return self();
+ }
+
+ /**
+ * @see Claim#getMessages()
+ */
+ public Builder messages(List<Message> messages) {
+ this.messages = messages;
+ return self();
+ }
+
+ public Claim build() {
+ return new Claim(id, ttl, age, messages);
+ }
+
+ public Builder fromMessage(Claim in) {
+ return this.id(in.getId()).ttl(in.getTTL()).age(in.getAge()).messages(in.getMessages());
+ }
+ }
+
+ private static class ConcreteBuilder extends Builder {
+ @Override
+ protected ConcreteBuilder self() {
+ return this;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Message.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Message.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Message.java
index a118603..184ef7b 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Message.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Message.java
@@ -19,6 +19,8 @@
package org.jclouds.openstack.marconi.v1.domain;
import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import org.jclouds.javax.annotation.Nullable;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -33,12 +35,14 @@ public class Message {
private final int ttl;
private final String body;
private final int age;
+ private final String claimId;
- protected Message(String id, int ttl, String body, int age) {
+ protected Message(String id, int ttl, String body, int age, @Nullable String claimId) {
this.id = checkNotNull(id, "id required");
this.ttl = ttl;
this.body = checkNotNull(body, "body required");
this.age = age;
+ this.claimId = claimId;
}
/**
@@ -69,6 +73,13 @@ public class Message {
return age;
}
+ /**
+ * @return The claim id of this message.
+ */
+ public Optional<String> getClaimId() {
+ return Optional.fromNullable(claimId);
+ }
+
@Override
public int hashCode() {
return Objects.hashCode(id);
@@ -83,8 +94,8 @@ public class Message {
}
protected Objects.ToStringHelper string() {
- return Objects.toStringHelper(this)
- .add("id", id).add("ttl", ttl).add("body", body).add("age", age);
+ return Objects.toStringHelper(this).omitNullValues()
+ .add("id", id).add("ttl", ttl).add("body", body).add("age", age).add("claimId", claimId);
}
@Override
@@ -107,6 +118,7 @@ public class Message {
protected int ttl;
protected String body;
protected int age;
+ protected String claimId;
/**
* @see Message#getId()
@@ -140,12 +152,21 @@ public class Message {
return self();
}
+ /**
+ * @see Message#getClaimId()
+ */
+ public Builder claimId(String claimId) {
+ this.claimId = claimId;
+ return self();
+ }
+
public Message build() {
- return new Message(id, ttl, body, age);
+ return new Message(id, ttl, body, age, claimId);
}
public Builder fromMessage(Message in) {
- return this.id(in.getId()).ttl(in.getTTL()).body(in.getBody()).age(in.getAge());
+ return this.id(in.getId()).ttl(in.getTTL()).body(in.getBody()).age(in.getAge())
+ .claimId(in.getClaimId().orNull());
}
}
http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/ClaimApi.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/ClaimApi.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/ClaimApi.java
new file mode 100644
index 0000000..f14deb5
--- /dev/null
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/ClaimApi.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.openstack.marconi.v1.features;
+
+import org.jclouds.openstack.keystone.v2_0.filters.AuthenticateRequest;
+import org.jclouds.openstack.marconi.v1.domain.Claim;
+import org.jclouds.openstack.marconi.v1.domain.Message;
+import org.jclouds.openstack.marconi.v1.functions.ParseClaim;
+import org.jclouds.openstack.marconi.v1.functions.ParseMessagesToList;
+import org.jclouds.rest.annotations.Fallback;
+import org.jclouds.rest.annotations.PATCH;
+import org.jclouds.rest.annotations.Payload;
+import org.jclouds.rest.annotations.PayloadParam;
+import org.jclouds.rest.annotations.RequestFilters;
+import org.jclouds.rest.annotations.ResponseParser;
+import org.jclouds.rest.annotations.SkipEncoding;
+
+import javax.inject.Named;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import java.util.List;
+
+import static org.jclouds.Fallbacks.EmptyListOnNotFoundOr404;
+import static org.jclouds.Fallbacks.FalseOnNotFoundOr404;
+import static org.jclouds.Fallbacks.NullOnNotFoundOr404;
+
+/**
+ * Provides access to Messages via their REST API.
+ *
+ * @author Everett Toews
+ */
+@SkipEncoding({'/', '='})
+@RequestFilters(AuthenticateRequest.class)
+public interface ClaimApi {
+ /**
+ * This operation claims a set of messages (up to the value of the limit parameter) from oldest to newest and skips
+ * any messages that are already claimed. If no unclaimed messages are available, an empty List is returned.
+ * </p>
+ * When a client (worker) finishes processing a message, it should delete the message before the claim expires to
+ * ensure that the message is processed only once. As part of the delete operation, workers should specify the claim
+ * ID. If workers perform these actions and a claim simply expires, the server can return an error and notify the
+ * worker of the race condition. This action gives the worker a chance to roll back its own processing of the given
+ * message because another worker can claim the message and process it.
+ * </p>
+ * The age given for a claim is relative to the server's clock. The claim's age is useful for determining how
+ * quickly messages are getting processed and whether a given message's claim is about to expire.
+ * </p>
+ * When a claim expires, it is released. If the original worker failed to process the message, another client worker
+ * can then claim the message.
+ * </p>
+ * Note that claim creation is best-effort, meaning the worker may claim and return less than the requested number
+ * of messages.
+ * </p>
+ * To deal with workers that have stopped responding (for up to 1209600 seconds or 14 days, including claim
+ * lifetime), the server extends the lifetime of claimed messages to be at least as long as the lifetime of the
+ * claim itself, plus the specified grace period. If a claimed message would normally live longer than the grace
+ * period, its expiration is not adjusted.
+ *
+ * @param ttl The ttl attribute specifies how long the server waits before releasing the claim. The ttl value
+ * must be between 60 and 43200 seconds (12 hours). You must include a value for this attribute in
+ * your request.
+ * @param grace The grace attribute specifies the message grace period in seconds. The value of grace value must
+ * be between 60 and 43200 seconds (12 hours). You must include a value for thia attribute in your
+ * request.
+ * @param limit Specifies the number of messages to return, up to 20 messages.
+ */
+ @Named("claim:claim")
+ @POST
+ @Path("/claims")
+ @ResponseParser(ParseMessagesToList.class)
+ @Fallback(EmptyListOnNotFoundOr404.class)
+ @Payload("%7B\"ttl\":{ttl},\"grace\":{grace}%7D")
+ List<Message> claim(@PayloadParam("ttl") int ttl,
+ @PayloadParam("grace") int grace,
+ @QueryParam("limit") int limit);
+
+ /**
+ * Gets a specific claim and the associated messages.
+ *
+ * @param claimId Specific claim ID of the message to get.
+ */
+ @Named("claim:get")
+ @GET
+ @ResponseParser(ParseClaim.class)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Path("/claims/{claim_id}")
+ @Fallback(NullOnNotFoundOr404.class)
+ Claim get(@PathParam("claim_id") String claimId);
+
+ /**
+ * Clients should periodically renew claims during long-running batches of work to avoid losing a claim while
+ * processing a message. The client can renew a claim by including a new TTL for the claim (which can be different
+ * from the original TTL). The server resets the age of the claim and applies the new TTL.
+ *
+ * @param claimId Specific claim ID of the message to get.
+ * @param ttl The ttl attribute specifies how long the server waits before releasing the claim. The ttl value
+ * must be between 60 and 43200 seconds (12 hours). You must include a value for this attribute in
+ * your request.
+ */
+ // TODO: revisit this when we figure out what's wrong with PATCH
+ // @Named("claim:update")
+ // @PATCH
+ // @Path("/claims/{claim_id}")
+ // @Fallback(FalseOnNotFoundOr404.class)
+ // @Payload("%7B\"ttl\":{ttl}%7D")
+ // @Produces(MediaType.APPLICATION_JSON)
+ // boolean update(@PathParam("claim_id") String claimId,
+ // @PayloadParam("ttl") int ttl);
+
+ /**
+ * This operation immediately releases a claim, making any remaining, undeleted) messages that are associated with
+ * the claim available to other workers. This operation is useful when a worker is performing a graceful shutdown,
+ * fails to process one or more messages, or is taking longer than expected to process messages, and wants to make
+ * the remainder of the messages available to other workers.
+ *
+ * @param claimId Specific claim ID of the message to get.
+ */
+ @Named("claim:delete")
+ @DELETE
+ @Path("/claims/{claim_id}")
+ @Fallback(FalseOnNotFoundOr404.class)
+ boolean release(@PathParam("claim_id") String claimId);
+}
http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java
index 5dc0aa9..645602c 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java
@@ -42,6 +42,7 @@ import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import java.util.List;
import java.util.UUID;
@@ -83,7 +84,7 @@ public interface MessageApi {
* messages indefinitely.
*
* @param clientId A UUID for each client instance.
- * @param options Options for streaming messages to your client.
+ * @param options Options for streaming messages to your client.
*/
@Named("message:stream")
@GET
@@ -98,7 +99,7 @@ public interface MessageApi {
* Lists specific messages. Unlike the stream method, a client's own messages are always returned in this operation.
*
* @param clientId A UUID for each client instance.
- * @param ids Specifies the IDs of the messages to get.
+ * @param ids Specifies the IDs of the messages to list.
*/
@Named("message:list")
@GET
@@ -109,13 +110,11 @@ public interface MessageApi {
List<Message> list(@HeaderParam("Client-ID") UUID clientId,
@BinderParam(BindIdsToQueryParam.class) Iterable<String> ids);
- // TODO: list by claim id when claim API done
-
/**
* Gets a specific message. Unlike the stream method, a client's own messages are always returned in this operation.
*
* @param clientId A UUID for each client instance.
- * @param id Specific ID of the message to get.
+ * @param id Specific ID of the message to get.
*/
@Named("message:get")
@GET
@@ -131,7 +130,7 @@ public interface MessageApi {
* remaining valid messages IDs are deleted.
*
* @param clientId A UUID for each client instance.
- * @param ids Specifies the IDs of the messages to delete.
+ * @param ids Specifies the IDs of the messages to delete.
*/
@Named("message:delete")
@DELETE
@@ -141,5 +140,22 @@ public interface MessageApi {
boolean delete(@HeaderParam("Client-ID") UUID clientId,
@BinderParam(BindIdsToQueryParam.class) Iterable<String> ids);
- // TODO: delete by claim id when claim API done
+ /**
+ * The claimId parameter specifies that the message is deleted only if it has the specified claim ID and that claim
+ * has not expired. This specification is useful for ensuring only one worker processes any given message. When a
+ * worker's claim expires before it can delete a message that it has processed, the worker must roll back any
+ * actions it took based on that message because another worker can now claim and process the same message.
+ *
+ * @param clientId A UUID for each client instance.
+ * @param id Specific ID of the message to delete.
+ * @param claimId Specific claim ID of the message to delete.
+ */
+ @Named("message:delete")
+ @DELETE
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Path("/messages/{message_id}")
+ @Fallback(FalseOnNotFoundOr404.class)
+ boolean deleteByClaim(@HeaderParam("Client-ID") UUID clientId,
+ @PathParam("message_id") String id,
+ @QueryParam("claim_id") String claimId);
}
http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseClaim.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseClaim.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseClaim.java
new file mode 100644
index 0000000..c03358f
--- /dev/null
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseClaim.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.openstack.marconi.v1.functions;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import org.jclouds.http.HttpResponse;
+import org.jclouds.http.functions.ParseJson;
+import org.jclouds.openstack.marconi.v1.domain.Claim;
+import org.jclouds.openstack.marconi.v1.domain.Message;
+
+import javax.inject.Inject;
+import java.beans.ConstructorProperties;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Iterables.transform;
+import static com.google.common.collect.Lists.newArrayList;
+import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.MessageWithHref;
+import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_ID_FROM_HREF;
+import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_MESSAGE;
+
+/**
+ * @author Everett Toews
+ */
+public class ParseClaim implements Function<HttpResponse, Claim> {
+
+ protected static final Function<ClaimWithHref, Claim> TO_CLAIM = new Function<ClaimWithHref, Claim>() {
+ @Override
+ public Claim apply(ClaimWithHref claimWithHref) {
+ List<Message> messages = newArrayList(transform(claimWithHref.messagesWithHref, TO_MESSAGE));
+ String claimId = TO_ID_FROM_HREF.apply(claimWithHref.getId());
+
+ return claimWithHref.toBuilder()
+ .id(claimId)
+ .messages(messages)
+ .build();
+ }
+ };
+ private final ParseJson<ClaimWithHref> json;
+
+ @Inject
+ ParseClaim(ParseJson<ClaimWithHref> json) {
+ this.json = checkNotNull(json, "json");
+ }
+
+ @Override
+ public Claim apply(HttpResponse response) {
+ ClaimWithHref claimWithHref = json.apply(response);
+ Claim claim = TO_CLAIM.apply(claimWithHref);
+
+ return claim;
+ }
+
+ private static class ClaimWithHref extends Claim {
+ private final List<MessageWithHref> messagesWithHref;
+
+ @ConstructorProperties({"href", "ttl", "age", "messages"})
+ protected ClaimWithHref(String href, int ttl, int age, List<MessageWithHref> messagesWithHref) {
+ super(href, ttl, age, null);
+ this.messagesWithHref = messagesWithHref;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java
index 9be3722..3d38092 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java
@@ -26,7 +26,7 @@ import org.jclouds.openstack.marconi.v1.domain.MessagesCreated;
import java.util.List;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_MESSAGE_ID;
+import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_ID_FROM_HREF;
/**
* This parses the messages created on a queue.
@@ -44,7 +44,7 @@ public class ParseMessagesCreated implements Function<HttpResponse, MessagesCrea
public MessagesCreated apply(HttpResponse from) {
MessagesCreated rawMessagesCreated = json.apply(from);
- List<String> messageIds = Lists.transform(rawMessagesCreated.getMessageIds(), TO_MESSAGE_ID);
+ List<String> messageIds = Lists.transform(rawMessagesCreated.getMessageIds(), TO_ID_FROM_HREF);
MessagesCreated messagesCreated = MessagesCreated.builder()
.messageIds(messageIds)
http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java
index 76a9ba7..42e2b02 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java
@@ -18,7 +18,7 @@ package org.jclouds.openstack.marconi.v1.functions;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
import org.jclouds.http.HttpResponse;
import org.jclouds.http.functions.ParseJson;
import org.jclouds.openstack.marconi.v1.domain.Message;
@@ -30,6 +30,9 @@ import javax.inject.Inject;
import java.beans.ConstructorProperties;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static com.google.common.collect.Iterables.transform;
+import static org.jclouds.http.utils.Queries.queryParser;
/**
* @author Everett Toews
@@ -51,27 +54,53 @@ public class ParseMessagesToStream implements Function<HttpResponse, MessageStre
}
MessagesWithHref messagesWithHref = json.apply(response);
- Iterable<Message> messages = Iterables.transform(messagesWithHref, TO_MESSAGE);
+ Iterable<Message> messages = transform(messagesWithHref, TO_MESSAGE);
return new Messages(messages, messagesWithHref.getLinks());
}
- private static String getMessageId(String rawMessageHref) {
- // strip off everything but the message id
- return rawMessageHref.substring(rawMessageHref.lastIndexOf('/')+1);
+ /**
+ * Strip off everything but the message id.
+ */
+ private static String getIdFromHref(String rawMessageHref) {
+ int indexOfQuestionMark = rawMessageHref.indexOf('?');
+ int lastIndexOfSlash = rawMessageHref.lastIndexOf('/') + 1;
+
+ if (indexOfQuestionMark > 0) {
+ return rawMessageHref.substring(lastIndexOfSlash, indexOfQuestionMark);
+ }
+ else {
+ return rawMessageHref.substring(lastIndexOfSlash);
+ }
+ }
+
+ private static String getClaimIdFromHref(String rawMessageHref) {
+ int indexOfQuestionMark = rawMessageHref.indexOf('?') + 1;
+
+ if (indexOfQuestionMark > 0) {
+ Multimap<String, String> queryParams = queryParser().apply(rawMessageHref.substring(indexOfQuestionMark));
+
+ return getOnlyElement(queryParams.get("claim_id"), null);
+ }
+ else {
+ return null;
+ }
}
protected static final Function<MessageWithHref, Message> TO_MESSAGE = new Function<MessageWithHref, Message>() {
@Override
public Message apply(MessageWithHref messageWithHref) {
- return messageWithHref.toBuilder().id(getMessageId(messageWithHref.getId())).build();
+ return messageWithHref.toBuilder()
+ .id(getIdFromHref(messageWithHref.getId()))
+ .claimId(getClaimIdFromHref(messageWithHref.getId()))
+ .build();
}
};
- protected static final Function<String, String> TO_MESSAGE_ID = new Function<String, String>() {
+ protected static final Function<String, String> TO_ID_FROM_HREF = new Function<String, String>() {
@Override
public String apply(String messageIdWithHref) {
- return getMessageId(messageIdWithHref);
+ return getIdFromHref(messageIdWithHref);
}
};
@@ -95,7 +124,7 @@ public class ParseMessagesToStream implements Function<HttpResponse, MessageStre
@ConstructorProperties({ "href", "ttl", "body", "age" })
protected MessageWithHref(String href, int ttl, String body, int age) {
- super(href, ttl, body, age);
+ super(href, ttl, body, age, null);
}
}
}
http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java
index e6f0ee2..264dc38 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java
@@ -25,7 +25,7 @@ import org.jclouds.openstack.marconi.v1.domain.MessagesStats;
import org.jclouds.openstack.marconi.v1.domain.QueueStats;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_MESSAGE_ID;
+import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_ID_FROM_HREF;
/**
* This parses the stats of a queue.
@@ -51,11 +51,11 @@ public class ParseQueueStats implements Function<HttpResponse, QueueStats> {
// change the hrefs to ids
Aged oldestWithHref = rawQueueStats.getMessagesStats().getOldest().get();
Aged oldestWithId = oldestWithHref.toBuilder()
- .id(TO_MESSAGE_ID.apply(oldestWithHref.getId()))
+ .id(TO_ID_FROM_HREF.apply(oldestWithHref.getId()))
.build();
Aged newestWithHref = rawQueueStats.getMessagesStats().getNewest().get();
Aged newestWithId = newestWithHref.toBuilder()
- .id(TO_MESSAGE_ID.apply(newestWithHref.getId()))
+ .id(TO_ID_FROM_HREF.apply(newestWithHref.getId()))
.build();
MessagesStats messagesStatsWithIds = rawQueueStats.getMessagesStats().toBuilder()
http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java
index b0ff396..2b8ca40 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java
@@ -67,6 +67,14 @@ public class StreamMessagesOptions extends PaginationOptions {
}
/**
+ * @see Builder#includeClaimed(boolean)
+ */
+ public StreamMessagesOptions includeClaimed(boolean includeClaimed) {
+ queryParameters.put("include_claimed", Boolean.toString(includeClaimed));
+ return this;
+ }
+
+ /**
* @return The String representation of the marker for these StreamMessagesOptions.
*/
public String getMarker() {
@@ -115,5 +123,13 @@ public class StreamMessagesOptions extends PaginationOptions {
StreamMessagesOptions options = new StreamMessagesOptions();
return options.echo(echo);
}
+
+ /**
+ * The includeClaimed parameter determines whether the API returns claimed messages.
+ */
+ public static StreamMessagesOptions includeClaimed(boolean includeClaimed) {
+ StreamMessagesOptions options = new StreamMessagesOptions();
+ return options.echo(includeClaimed);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/ClaimApiLiveTest.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/ClaimApiLiveTest.java b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/ClaimApiLiveTest.java
new file mode 100644
index 0000000..b2d5d80
--- /dev/null
+++ b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/ClaimApiLiveTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.openstack.marconi.v1.features;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import org.jclouds.openstack.marconi.v1.domain.Claim;
+import org.jclouds.openstack.marconi.v1.domain.CreateMessage;
+import org.jclouds.openstack.marconi.v1.domain.Message;
+import org.jclouds.openstack.marconi.v1.domain.MessagesCreated;
+import org.jclouds.openstack.marconi.v1.internal.BaseMarconiApiLiveTest;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static com.google.common.collect.Iterables.getLast;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+@Test(groups = "live", testName = "ClaimApiLiveTest", singleThreaded = true)
+public class ClaimApiLiveTest extends BaseMarconiApiLiveTest {
+
+ private static final UUID CLIENT_ID = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+ private final Map<String, List<String>> claimIds = Maps.newHashMap();
+
+ public void createQueues() throws Exception {
+ for (String zoneId : zones) {
+ QueueApi queueApi = api.getQueueApiForZone(zoneId);
+ boolean success = queueApi.create("jclouds-test");
+
+ assertTrue(success);
+ }
+ }
+
+ @Test(dependsOnMethods = { "createQueues" })
+ public void createMessages() throws Exception {
+ for (String zoneId : zones) {
+ MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, "jclouds-test");
+
+ UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+ String json1 = "{\"event\":{\"name\":\"Austin Java User Group\",\"attendees\":[\"bob\",\"jim\",\"sally\"]}}";
+ CreateMessage message1 = CreateMessage.builder().ttl(86400).body(json1).build();
+ String json2 = "{\"event\":{\"name\":\"SF Java User Group\",\"attendees\":[\"bob\",\"jim\",\"sally\"]}}";
+ CreateMessage message2 = CreateMessage.builder().ttl(86400).body(json2).build();
+ String json3 = "{\"event\":{\"name\":\"HK Java User Group\",\"attendees\":[\"bob\",\"jim\",\"sally\"]}}";
+ CreateMessage message3 = CreateMessage.builder().ttl(86400).body(json3).build();
+ List<CreateMessage> messages = ImmutableList.of(message1, message2, message3);
+
+ MessagesCreated messagesCreated = messageApi.create(clientId, messages);
+
+ assertNotNull(messagesCreated);
+ assertEquals(messagesCreated.getMessageIds().size(), 3);
+ }
+ }
+
+ @Test(dependsOnMethods = { "createMessages" })
+ public void claimMessages() throws Exception {
+ for (String zoneId : zones) {
+ ClaimApi claimApi = api.getClaimApiForZoneAndClientAndQueue(zoneId, CLIENT_ID, "jclouds-test");
+
+ List<Message> messages = claimApi.claim(300, 200, 2);
+ assertEquals(messages.size(), 2);
+
+ claimIds.put(zoneId, new ArrayList<String>());
+
+ for (Message message: messages) {
+ claimIds.get(zoneId).add(message.getClaimId().get());
+
+ assertNotNull(message.getId());
+ assertTrue(message.getClaimId().isPresent());
+ assertEquals(message.getTTL(), 86400);
+ }
+ }
+ }
+
+ @Test(dependsOnMethods = { "claimMessages" })
+ public void getClaim() throws Exception {
+ for (String zoneId : zones) {
+ ClaimApi claimApi = api.getClaimApiForZoneAndClientAndQueue(zoneId, CLIENT_ID, "jclouds-test");
+
+ Claim claim = claimApi.get(claimIds.get(zoneId).get(0));
+
+ assertNotNull(claim.getId());
+ assertEquals(claim.getMessages().size(), 2);
+ assertEquals(claim.getTTL(), 300);
+
+ for (Message message: claim.getMessages()) {
+ assertNotNull(message.getId());
+ assertTrue(message.getClaimId().isPresent());
+ assertEquals(message.getTTL(), 86400);
+ }
+ }
+ }
+
+ @Test(dependsOnMethods = { "getClaim" })
+ public void releaseClaim() throws Exception {
+ for (String zoneId : zones) {
+ ClaimApi claimApi = api.getClaimApiForZoneAndClientAndQueue(zoneId, CLIENT_ID, "jclouds-test");
+
+ boolean success = claimApi.release(claimIds.get(zoneId).get(0));
+
+ assertTrue(success);
+ }
+ }
+
+ @Test(dependsOnMethods = { "getClaim" })
+ public void delete() throws Exception {
+ for (String zoneId : zones) {
+ QueueApi queueApi = api.getQueueApiForZone(zoneId);
+ boolean success = queueApi.delete("jclouds-test");
+
+ assertTrue(success);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/ClaimApiMockTest.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/ClaimApiMockTest.java b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/ClaimApiMockTest.java
new file mode 100644
index 0000000..04eee13
--- /dev/null
+++ b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/ClaimApiMockTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.openstack.marconi.v1.features;
+
+import com.squareup.okhttp.mockwebserver.MockResponse;
+import com.squareup.okhttp.mockwebserver.MockWebServer;
+import org.jclouds.openstack.marconi.v1.MarconiApi;
+import org.jclouds.openstack.marconi.v1.domain.Claim;
+import org.jclouds.openstack.marconi.v1.domain.Message;
+import org.jclouds.openstack.v2_0.internal.BaseOpenStackMockTest;
+import org.testng.annotations.Test;
+
+import java.util.List;
+import java.util.UUID;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * @author Everett Toews
+ */
+@Test
+public class ClaimApiMockTest extends BaseOpenStackMockTest<MarconiApi> {
+ private static final UUID CLIENT_ID = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+
+ public void claimMessages() throws Exception {
+ MockWebServer server = mockOpenStackServer();
+ server.enqueue(new MockResponse().setBody(accessRackspace));
+ server.enqueue(new MockResponse().setResponseCode(201).setBody("[{\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"HK Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 1997, \"href\": \"/v1/queues/jclouds-test/messages/52a645633ac24e6f0be88d44?claim_id=52a64d30ef913e6d05e7f786\", \"ttl\": 86400}, {\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"SF Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 981, \"href\": \"/v1/queues/jclouds-test/messages/52a6495bef913e6d195dcffe?claim_id=52a64d30ef913e6d05e7f786\", \"ttl\": 86400}]"));
+
+ try {
+ MarconiApi api = api(server.getUrl("/").toString(), "openstack-marconi");
+ ClaimApi claimApi = api.getClaimApiForZoneAndClientAndQueue("DFW", CLIENT_ID, "jclouds-test");
+
+ List<Message> messages = claimApi.claim(300, 200, 2);
+
+ assertEquals(messages.size(), 2);
+ assertEquals(messages.get(0).getId(), "52a645633ac24e6f0be88d44");
+ assertEquals(messages.get(0).getClaimId().get(), "52a64d30ef913e6d05e7f786");
+ assertEquals(messages.get(0).getTTL(), 86400);
+ assertEquals(messages.get(1).getId(), "52a6495bef913e6d195dcffe");
+ assertEquals(messages.get(1).getClaimId().get(), "52a64d30ef913e6d05e7f786");
+ assertEquals(messages.get(1).getTTL(), 86400);
+
+ assertEquals(server.getRequestCount(), 2);
+ assertEquals(server.takeRequest().getRequestLine(), "POST /tokens HTTP/1.1");
+ assertEquals(server.takeRequest().getRequestLine(), "POST /v1/123123/queues/jclouds-test/claims?limit=2 HTTP/1.1");
+ }
+ finally {
+ server.shutdown();
+ }
+ }
+
+ public void getClaim() throws Exception {
+ MockWebServer server = mockOpenStackServer();
+ server.enqueue(new MockResponse().setBody(accessRackspace));
+ server.enqueue(new MockResponse().setResponseCode(201).setBody("{\"age\": 209, \"href\": \"/v1/queues/jclouds-test/claims/52a8d23eb04a584f1bbd4f47\", \"messages\": [{\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"SF Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 12182, \"href\": \"/v1/queues/jclouds-test/messages/52a8a379b04a584f2ec2bc3e?claim_id=52a8d23eb04a584f1bbd4f47\", \"ttl\": 86400}, {\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"Austin Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 12182, \"href\": \"/v1/queues/jclouds-test/messages/52a8a379b04a584f2ec2bc3f?claim_id=52a8d23eb04a584f1bbd4f47\", \"ttl\": 86400}], \"ttl\": 300}"));
+
+ try {
+ MarconiApi api = api(server.getUrl("/").toString(), "openstack-marconi");
+ ClaimApi claimApi = api.getClaimApiForZoneAndClientAndQueue("DFW", CLIENT_ID, "jclouds-test");
+
+ Claim claim = claimApi.get("52a8d23eb04a584f1bbd4f47");
+
+ assertEquals(claim.getMessages().size(), 2);
+ assertEquals(claim.getId(), "52a8d23eb04a584f1bbd4f47");
+ assertEquals(claim.getAge(), 209);
+ assertEquals(claim.getTTL(), 300);
+
+ assertEquals(claim.getMessages().get(0).getId(), "52a8a379b04a584f2ec2bc3e");
+ assertEquals(claim.getMessages().get(0).getClaimId().get(), "52a8d23eb04a584f1bbd4f47");
+ assertEquals(claim.getMessages().get(0).getAge(), 12182);
+ assertEquals(claim.getMessages().get(0).getTTL(), 86400);
+
+ assertEquals(claim.getMessages().get(1).getId(), "52a8a379b04a584f2ec2bc3f");
+ assertEquals(claim.getMessages().get(1).getClaimId().get(), "52a8d23eb04a584f1bbd4f47");
+ assertEquals(claim.getMessages().get(1).getAge(), 12182);
+ assertEquals(claim.getMessages().get(1).getTTL(), 86400);
+
+ assertEquals(server.getRequestCount(), 2);
+ assertEquals(server.takeRequest().getRequestLine(), "POST /tokens HTTP/1.1");
+ assertEquals(server.takeRequest().getRequestLine(), "GET /v1/123123/queues/jclouds-test/claims/52a8d23eb04a584f1bbd4f47 HTTP/1.1");
+ }
+ finally {
+ server.shutdown();
+ }
+ }
+
+ public void releaseClaim() throws Exception {
+ MockWebServer server = mockOpenStackServer();
+ server.enqueue(new MockResponse().setBody(accessRackspace));
+ server.enqueue(new MockResponse().setResponseCode(204));
+
+ try {
+ MarconiApi api = api(server.getUrl("/").toString(), "openstack-marconi");
+ ClaimApi claimApi = api.getClaimApiForZoneAndClientAndQueue("DFW", CLIENT_ID, "jclouds-test");
+
+ boolean success = claimApi.release("52a8d23eb04a584f1bbd4f47");
+
+ assertTrue(success);
+
+ assertEquals(server.getRequestCount(), 2);
+ assertEquals(server.takeRequest().getRequestLine(), "POST /tokens HTTP/1.1");
+ assertEquals(server.takeRequest().getRequestLine(), "DELETE /v1/123123/queues/jclouds-test/claims/52a8d23eb04a584f1bbd4f47 HTTP/1.1");
+ }
+ finally {
+ server.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java
index 47cd6e0..3d7d745 100644
--- a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java
+++ b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.UUID;
import static com.google.common.collect.Iterables.getLast;
+import static com.google.common.collect.Iterables.getOnlyElement;
import static org.jclouds.openstack.marconi.v1.options.StreamMessagesOptions.Builder.echo;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
@@ -175,6 +176,20 @@ public class MessageApiLiveTest extends BaseMarconiApiLiveTest {
}
@Test(dependsOnMethods = { "getMessage" })
+ public void deleteMessagesByClaimId() throws Exception {
+ for (String zoneId : zones) {
+ UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+ MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, "jclouds-test");
+ ClaimApi claimApi = api.getClaimApiForZoneAndClientAndQueue(zoneId, clientId, "jclouds-test");
+ Message message = getOnlyElement(claimApi.claim(300, 100, 1));
+
+ boolean success = messageApi.deleteByClaim(clientId, message.getId(), message.getClaimId().get());
+
+ assertTrue(success);
+ }
+ }
+
+ @Test(dependsOnMethods = { "deleteMessagesByClaimId" })
public void deleteMessages() throws Exception {
for (String zoneId : zones) {
MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, "jclouds-test");
http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java
----------------------------------------------------------------------
diff --git a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java
index dbf9799..8e5a5f4 100644
--- a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java
+++ b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java
@@ -277,4 +277,27 @@ public class MessageApiMockTest extends BaseOpenStackMockTest<MarconiApi> {
server.shutdown();
}
}
+
+ public void deleteMessageByClaimId() throws Exception {
+ MockWebServer server = mockOpenStackServer();
+ server.enqueue(new MockResponse().setBody(accessRackspace));
+ server.enqueue(new MockResponse().setResponseCode(204));
+
+ try {
+ MarconiApi api = api(server.getUrl("/").toString(), "openstack-marconi");
+ MessageApi messageApi = api.getMessageApiForZoneAndQueue("DFW", "jclouds-test");
+ UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+
+ boolean success = messageApi.deleteByClaim(clientId, "52936b8a3ac24e6ef4c067dd", "5292b30cef913e6d026f4dec");
+
+ assertTrue(success);
+
+ assertEquals(server.getRequestCount(), 2);
+ assertEquals(server.takeRequest().getRequestLine(), "POST /tokens HTTP/1.1");
+ assertEquals(server.takeRequest().getRequestLine(), "DELETE /v1/123123/queues/jclouds-test/messages/52936b8a3ac24e6ef4c067dd?claim_id=5292b30cef913e6d026f4dec HTTP/1.1");
+ }
+ finally {
+ server.shutdown();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/rackspace-cloudqueues-us/src/test/java/org/jclouds/rackspace/cloudqueues/us/CloudQueuesUSClaimApiLiveTest.java
----------------------------------------------------------------------
diff --git a/rackspace-cloudqueues-us/src/test/java/org/jclouds/rackspace/cloudqueues/us/CloudQueuesUSClaimApiLiveTest.java b/rackspace-cloudqueues-us/src/test/java/org/jclouds/rackspace/cloudqueues/us/CloudQueuesUSClaimApiLiveTest.java
new file mode 100644
index 0000000..9a210b4
--- /dev/null
+++ b/rackspace-cloudqueues-us/src/test/java/org/jclouds/rackspace/cloudqueues/us/CloudQueuesUSClaimApiLiveTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.rackspace.cloudqueues.us;
+
+import org.jclouds.openstack.marconi.v1.features.ClaimApiLiveTest;
+import org.testng.annotations.Test;
+
+/**
+ * @author Everett Toews
+ */
+@Test(groups = "live", testName = "CloudQueuesUSClaimApiLiveTest")
+public class CloudQueuesUSClaimApiLiveTest extends ClaimApiLiveTest {
+ public CloudQueuesUSClaimApiLiveTest() {
+ provider = "rackspace-cloudqueues-us";
+ }
+}