You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2017/12/20 10:49:27 UTC
[2/3] httpcomponents-client git commit: HTTPCLIENT-1824,
HTTPCLIENT-1868: Asynchronous HTTP cache storage API;
Memcached backend implementation of async HTTP cache storage
HTTPCLIENT-1824, HTTPCLIENT-1868: Asynchronous HTTP cache storage API; Memcached backend implementation of async HTTP cache storage
Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-client/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-client/commit/002f40f9
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-client/tree/002f40f9
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-client/diff/002f40f9
Branch: refs/heads/master
Commit: 002f40f9d3ec65891b5faf086df404dd3c450600
Parents: ebcb55d
Author: Oleg Kalnichevski <ol...@apache.org>
Authored: Wed Dec 20 10:55:32 2017 +0100
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Wed Dec 20 11:24:52 2017 +0100
----------------------------------------------------------------------
.../http/cache/HttpAsyncCacheStorage.java | 90 ++++
.../cache/HttpAsyncCacheStorageAdaptor.java | 98 ++++
.../hc/client5/http/cache/HttpCacheStorage.java | 6 +-
.../cache/AbstractBinaryAsyncCacheStorage.java | 46 ++
.../AbstractSerializingAsyncCacheStorage.java | 228 ++++++++++
.../http/impl/cache/ComplexCancellable.java | 74 +++
.../MemcachedHttpAsyncCacheStorage.java | 250 +++++++++++
.../memcached/MemcachedHttpCacheStorage.java | 21 +-
...estAbstractSerializingAsyncCacheStorage.java | 450 +++++++++++++++++++
9 files changed, 1245 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/002f40f9/httpclient5-cache/src/main/java/org/apache/hc/client5/http/cache/HttpAsyncCacheStorage.java
----------------------------------------------------------------------
diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/cache/HttpAsyncCacheStorage.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/cache/HttpAsyncCacheStorage.java
new file mode 100644
index 0000000..4ef0ce7
--- /dev/null
+++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/cache/HttpAsyncCacheStorage.java
@@ -0,0 +1,90 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.cache;
+
+import org.apache.hc.core5.concurrent.Cancellable;
+import org.apache.hc.core5.concurrent.FutureCallback;
+
+/**
+ * {@literal HttpAsyncCacheStorage} represents an abstract HTTP cache
+ * storage backend that can then be plugged into the asynchronous
+ * (non-blocking ) request execution
+ * pipeline.
+ *
+ * @since 5.0
+ */
+public interface HttpAsyncCacheStorage {
+
+ Cancellable NOOP_CANCELLABLE = new Cancellable() {
+
+ @Override
+ public boolean cancel() {
+ return false;
+ }
+
+ };
+
+ /**
+ * Store a given cache entry under the given key.
+ * @param key where in the cache to store the entry
+ * @param entry cached response to store
+ * @param callback result callback
+ */
+ Cancellable putEntry(
+ String key, HttpCacheEntry entry, FutureCallback<Boolean> callback);
+
+ /**
+ * Retrieves the cache entry stored under the given key
+ * or null if no entry exists under that key.
+ * @param key cache key
+ * @param callback result callback
+ * @return an {@link HttpCacheEntry} or {@code null} if no
+ * entry exists
+ */
+ Cancellable getEntry(
+ String key, FutureCallback<HttpCacheEntry> callback);
+
+ /**
+ * Deletes/invalidates/removes any cache entries currently
+ * stored under the given key.
+ * @param key
+ * @param callback result callback
+ */
+ Cancellable removeEntry(
+ String key, FutureCallback<Boolean> callback);
+
+ /**
+ * Atomically applies the given callback to processChallenge an existing cache
+ * entry under a given key.
+ * @param key indicates which entry to modify
+ * @param casOperation the CAS operation to perform.
+ * @param callback result callback
+ */
+ Cancellable updateEntry(
+ String key, HttpCacheCASOperation casOperation, FutureCallback<Boolean> callback);
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/002f40f9/httpclient5-cache/src/main/java/org/apache/hc/client5/http/cache/HttpAsyncCacheStorageAdaptor.java
----------------------------------------------------------------------
diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/cache/HttpAsyncCacheStorageAdaptor.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/cache/HttpAsyncCacheStorageAdaptor.java
new file mode 100644
index 0000000..c991fd7
--- /dev/null
+++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/cache/HttpAsyncCacheStorageAdaptor.java
@@ -0,0 +1,98 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.cache;
+
+import org.apache.hc.core5.concurrent.Cancellable;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * {@link HttpAsyncCacheStorage} implementation that emulates asynchronous
+ * behavior using an instance of classic {@link HttpCacheStorage}.
+ *
+ * @since 5.0
+ */
+public final class HttpAsyncCacheStorageAdaptor implements HttpAsyncCacheStorage {
+
+ private final HttpCacheStorage cacheStorage;
+
+ public HttpAsyncCacheStorageAdaptor(final HttpCacheStorage cacheStorage) {
+ this.cacheStorage = Args.notNull(cacheStorage, "Cache strorage");
+ }
+
+ public Cancellable putEntry(final String key, final HttpCacheEntry entry, final FutureCallback<Boolean> callback) {
+ Args.notEmpty(key, "Key");
+ Args.notNull(entry, "Cache ehtry");
+ Args.notNull(callback, "Callback");
+ try {
+ cacheStorage.putEntry(key, entry);
+ callback.completed(Boolean.TRUE);
+ } catch (final Exception ex) {
+ callback.failed(ex);
+ }
+ return NOOP_CANCELLABLE;
+ }
+
+ public Cancellable getEntry(final String key, final FutureCallback<HttpCacheEntry> callback) {
+ Args.notEmpty(key, "Key");
+ Args.notNull(callback, "Callback");
+ try {
+ final HttpCacheEntry entry = cacheStorage.getEntry(key);
+ callback.completed(entry);
+ } catch (final Exception ex) {
+ callback.failed(ex);
+ }
+ return NOOP_CANCELLABLE;
+ }
+
+ public Cancellable removeEntry(final String key, final FutureCallback<Boolean> callback) {
+ Args.notEmpty(key, "Key");
+ Args.notNull(callback, "Callback");
+ try {
+ cacheStorage.removeEntry(key);
+ callback.completed(Boolean.TRUE);
+ } catch (final Exception ex) {
+ callback.failed(ex);
+ }
+ return NOOP_CANCELLABLE;
+ }
+
+ public Cancellable updateEntry(
+ final String key, final HttpCacheCASOperation casOperation, final FutureCallback<Boolean> callback) {
+ Args.notEmpty(key, "Key");
+ Args.notNull(casOperation, "CAS operation");
+ Args.notNull(callback, "Callback");
+ try {
+ cacheStorage.updateEntry(key, casOperation);
+ callback.completed(Boolean.TRUE);
+ } catch (final Exception ex) {
+ callback.failed(ex);
+ }
+ return NOOP_CANCELLABLE;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/002f40f9/httpclient5-cache/src/main/java/org/apache/hc/client5/http/cache/HttpCacheStorage.java
----------------------------------------------------------------------
diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/cache/HttpCacheStorage.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/cache/HttpCacheStorage.java
index 88c2aa0..e58efcb 100644
--- a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/cache/HttpCacheStorage.java
+++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/cache/HttpCacheStorage.java
@@ -27,9 +27,9 @@
package org.apache.hc.client5.http.cache;
/**
- * New storage backends should implement this {@link HttpCacheStorage}
- * interface. They can then be plugged into the existing caching
- * {@link org.apache.hc.client5.http.classic.HttpClient} implementation.
+ * {@literal HttpCacheStorage} represents an abstract HTTP cache
+ * storage backend that can then be plugged into the classic
+ * (blocking) request execution pipeline.
*
* @since 4.1
*/
http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/002f40f9/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AbstractBinaryAsyncCacheStorage.java
----------------------------------------------------------------------
diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AbstractBinaryAsyncCacheStorage.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AbstractBinaryAsyncCacheStorage.java
new file mode 100644
index 0000000..b1361ca
--- /dev/null
+++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AbstractBinaryAsyncCacheStorage.java
@@ -0,0 +1,46 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.impl.cache;
+
+import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer;
+
+/**
+ * Abstract cache backend for serialized binary objects capable of CAS (compare-and-swap) updates.
+ *
+ * @since 5.0
+ */
+public abstract class AbstractBinaryAsyncCacheStorage<CAS> extends AbstractSerializingAsyncCacheStorage<byte[], CAS> {
+
+ public AbstractBinaryAsyncCacheStorage(final int maxUpdateRetries, final HttpCacheEntrySerializer<byte[]> serializer) {
+ super(maxUpdateRetries, serializer);
+ }
+
+ public AbstractBinaryAsyncCacheStorage(final int maxUpdateRetries) {
+ super(maxUpdateRetries, ByteArrayCacheEntrySerializer.INSTANCE);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/002f40f9/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AbstractSerializingAsyncCacheStorage.java
----------------------------------------------------------------------
diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AbstractSerializingAsyncCacheStorage.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AbstractSerializingAsyncCacheStorage.java
new file mode 100644
index 0000000..36c055f
--- /dev/null
+++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AbstractSerializingAsyncCacheStorage.java
@@ -0,0 +1,228 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.impl.cache;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hc.client5.http.cache.HttpAsyncCacheStorage;
+import org.apache.hc.client5.http.cache.HttpCacheCASOperation;
+import org.apache.hc.client5.http.cache.HttpCacheEntry;
+import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer;
+import org.apache.hc.client5.http.cache.HttpCacheStorageEntry;
+import org.apache.hc.client5.http.cache.HttpCacheUpdateException;
+import org.apache.hc.client5.http.cache.ResourceIOException;
+import org.apache.hc.core5.concurrent.Cancellable;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * Abstract cache backend for serialized objects capable of CAS (compare-and-swap) updates.
+ *
+ * @since 5.0
+ */
+public abstract class AbstractSerializingAsyncCacheStorage<T, CAS> implements HttpAsyncCacheStorage {
+
+ private final int maxUpdateRetries;
+ private final HttpCacheEntrySerializer<T> serializer;
+
+ public AbstractSerializingAsyncCacheStorage(final int maxUpdateRetries, final HttpCacheEntrySerializer<T> serializer) {
+ this.maxUpdateRetries = Args.notNegative(maxUpdateRetries, "Max retries");
+ this.serializer = Args.notNull(serializer, "Cache entry serializer");
+ }
+
+ protected abstract String digestToStorageKey(String key);
+
+ protected abstract T getStorageObject(CAS cas) throws ResourceIOException;
+
+ protected abstract Cancellable store(String storageKey, T storageObject, FutureCallback<Boolean> callback);
+
+ protected abstract Cancellable restore(String storageKey, FutureCallback<T> callback);
+
+ protected abstract Cancellable getForUpdateCAS(String storageKey, FutureCallback<CAS> callback);
+
+ protected abstract Cancellable updateCAS(String storageKey, CAS cas, T storageObject, FutureCallback<Boolean> callback);
+
+ protected abstract Cancellable delete(String storageKey, FutureCallback<Boolean> callback);
+
+ @Override
+ public final Cancellable putEntry(
+ final String key, final HttpCacheEntry entry, final FutureCallback<Boolean> callback) {
+ Args.notNull(key, "Storage key");
+ Args.notNull(callback, "Callback");
+ try {
+ final String storageKey = digestToStorageKey(key);
+ final T storageObject = serializer.serialize(new HttpCacheStorageEntry(key, entry));
+ return store(storageKey, storageObject, callback);
+ } catch (final Exception ex) {
+ callback.failed(ex);
+ return NOOP_CANCELLABLE;
+ }
+ }
+
+ @Override
+ public final Cancellable getEntry(final String key, final FutureCallback<HttpCacheEntry> callback) {
+ Args.notNull(key, "Storage key");
+ Args.notNull(callback, "Callback");
+ try {
+ final String storageKey = digestToStorageKey(key);
+ return restore(storageKey, new FutureCallback<T>() {
+
+ @Override
+ public void completed(final T storageObject) {
+ try {
+ if (storageObject != null) {
+ final HttpCacheStorageEntry entry = serializer.deserialize(storageObject);
+ if (key.equals(entry.getKey())) {
+ callback.completed(entry.getContent());
+ } else {
+ callback.completed(null);
+ }
+ } else {
+ callback.completed(null);
+ }
+ } catch (final Exception ex) {
+ callback.failed(ex);
+ }
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ callback.failed(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ callback.cancelled();
+ }
+
+ });
+ } catch (final Exception ex) {
+ callback.failed(ex);
+ return NOOP_CANCELLABLE;
+ }
+ }
+
+ @Override
+ public final Cancellable removeEntry(final String key, final FutureCallback<Boolean> callback) {
+ Args.notNull(key, "Storage key");
+ Args.notNull(callback, "Callback");
+ try {
+ final String storageKey = digestToStorageKey(key);
+ return delete(storageKey, callback);
+ } catch (final Exception ex) {
+ callback.failed(ex);
+ return NOOP_CANCELLABLE;
+ }
+ }
+
+ @Override
+ public final Cancellable updateEntry(
+ final String key, final HttpCacheCASOperation casOperation, final FutureCallback<Boolean> callback) {
+ Args.notNull(key, "Storage key");
+ Args.notNull(casOperation, "CAS operation");
+ Args.notNull(callback, "Callback");
+ final ComplexCancellable complexCancellable = new ComplexCancellable();
+ final AtomicInteger count = new AtomicInteger(0);
+ atemmptUpdateEntry(key, casOperation, complexCancellable, count, callback);
+ return complexCancellable;
+ }
+
+ private void atemmptUpdateEntry(
+ final String key,
+ final HttpCacheCASOperation casOperation,
+ final ComplexCancellable complexCancellable,
+ final AtomicInteger count,
+ final FutureCallback<Boolean> callback) {
+ try {
+ final String storageKey = digestToStorageKey(key);
+ complexCancellable.setDependency(getForUpdateCAS(storageKey, new FutureCallback<CAS>() {
+
+ @Override
+ public void completed(final CAS cas) {
+ try {
+ HttpCacheStorageEntry storageEntry = cas != null ? serializer.deserialize(getStorageObject(cas)) : null;
+ if (storageEntry != null && !key.equals(storageEntry.getKey())) {
+ storageEntry = null;
+ }
+ final HttpCacheEntry existingEntry = storageEntry != null ? storageEntry.getContent() : null;
+ final HttpCacheEntry updatedEntry = casOperation.execute(existingEntry);
+ if (existingEntry == null) {
+ putEntry(key, updatedEntry, callback);
+ } else {
+ final T storageObject = serializer.serialize(new HttpCacheStorageEntry(key, updatedEntry));
+ complexCancellable.setDependency(updateCAS(storageKey, cas, storageObject, new FutureCallback<Boolean>() {
+
+ @Override
+ public void completed(final Boolean result) {
+ if (result) {
+ callback.completed(result);
+ } else {
+ if (!complexCancellable.isCancelled()) {
+ final int numRetries = count.incrementAndGet();
+ if (numRetries >= maxUpdateRetries) {
+ callback.failed(new HttpCacheUpdateException("Cache update failed after " + numRetries + " retries"));
+ } else {
+ atemmptUpdateEntry(key, casOperation, complexCancellable, count, callback);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ callback.failed(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ callback.cancelled();
+ }
+
+ }));
+ }
+ } catch (final Exception ex) {
+ callback.failed(ex);
+ }
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ callback.failed(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ callback.cancelled();
+ }
+
+ }));
+ } catch (final Exception ex) {
+ callback.failed(ex);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/002f40f9/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/ComplexCancellable.java
----------------------------------------------------------------------
diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/ComplexCancellable.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/ComplexCancellable.java
new file mode 100644
index 0000000..55273ef
--- /dev/null
+++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/ComplexCancellable.java
@@ -0,0 +1,74 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.impl.cache;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.concurrent.Cancellable;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * TODO: replace with ComplexCancellable from HttpCore 5.0b2
+ */
+final class ComplexCancellable implements Cancellable {
+
+ private final AtomicReference<Cancellable> dependencyRef;
+ private final AtomicBoolean cancelled;
+
+ public ComplexCancellable() {
+ this.dependencyRef = new AtomicReference<>(null);
+ this.cancelled = new AtomicBoolean(false);
+ }
+
+ public boolean isCancelled() {
+ return cancelled.get();
+ }
+
+ public void setDependency(final Cancellable dependency) {
+ Args.notNull(dependency, "dependency");
+ if (!cancelled.get()) {
+ dependencyRef.set(dependency);
+ } else {
+ dependency.cancel();
+ }
+ }
+
+ @Override
+ public boolean cancel() {
+ if (cancelled.compareAndSet(false, true)) {
+ final Cancellable dependency = dependencyRef.getAndSet(null);
+ if (dependency != null) {
+ dependency.cancel();
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/002f40f9/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/memcached/MemcachedHttpAsyncCacheStorage.java
----------------------------------------------------------------------
diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/memcached/MemcachedHttpAsyncCacheStorage.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/memcached/MemcachedHttpAsyncCacheStorage.java
new file mode 100644
index 0000000..478c578
--- /dev/null
+++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/memcached/MemcachedHttpAsyncCacheStorage.java
@@ -0,0 +1,250 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.impl.cache.memcached;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer;
+import org.apache.hc.client5.http.cache.ResourceIOException;
+import org.apache.hc.client5.http.impl.cache.AbstractBinaryAsyncCacheStorage;
+import org.apache.hc.client5.http.impl.cache.ByteArrayCacheEntrySerializer;
+import org.apache.hc.client5.http.impl.cache.CacheConfig;
+import org.apache.hc.core5.concurrent.Cancellable;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.util.Args;
+
+import net.spy.memcached.CASResponse;
+import net.spy.memcached.CASValue;
+import net.spy.memcached.MemcachedClient;
+import net.spy.memcached.internal.GetCompletionListener;
+import net.spy.memcached.internal.GetFuture;
+import net.spy.memcached.internal.OperationCompletionListener;
+import net.spy.memcached.internal.OperationFuture;
+
+/**
+ * <p>
+ * This class is a storage backend that uses an external <i>memcached</i>
+ * for storing cached origin responses. This storage option provides a
+ * couple of interesting advantages over the default in-memory storage
+ * backend:
+ * </p>
+ * <ol>
+ * <li>in-memory cached objects can survive an application restart since
+ * they are held in a separate process</li>
+ * <li>it becomes possible for several cooperating applications to share
+ * a large <i>memcached</i> farm together</li>
+ * </ol>
+ * <p>
+ * Note that in a shared memcached pool setting you may wish to make use
+ * of the Ketama consistent hashing algorithm to reduce the number of
+ * cache misses that might result if one of the memcached cluster members
+ * fails (see the <a href="http://dustin.github.com/java-memcached-client/apidocs/net/spy/memcached/KetamaConnectionFactory.html">
+ * KetamaConnectionFactory</a>).
+ * </p>
+ * <p>
+ * Because memcached places limits on the size of its keys, we need to
+ * introduce a key hashing scheme to map the annotated URLs the higher-level
+ * caching HTTP client wants to use as keys onto ones that are suitable
+ * for use with memcached. Please see {@link KeyHashingScheme} if you would
+ * like to use something other than the provided {@link SHA256KeyHashingScheme}.
+ * </p>
+ *
+ * <p>
+ * Please refer to the <a href="http://code.google.com/p/memcached/wiki/NewStart">
+ * memcached documentation</a> and in particular to the documentation for
+ * the <a href="http://code.google.com/p/spymemcached/">spymemcached
+ * documentation</a> for details about how to set up and configure memcached
+ * and the Java client used here, respectively.
+ * </p>
+ *
+ * @since 5.0
+ */
+public class MemcachedHttpAsyncCacheStorage extends AbstractBinaryAsyncCacheStorage<CASValue<Object>> {
+
+ private final MemcachedClient client;
+ private final KeyHashingScheme keyHashingScheme;
+
+ /**
+ * Create a storage backend talking to a <i>memcached</i> instance
+ * listening on the specified host and port. This is useful if you
+ * just have a single local memcached instance running on the same
+ * machine as your application, for example.
+ * @param address where the <i>memcached</i> daemon is running
+ * @throws IOException in case of an error
+ */
+ public MemcachedHttpAsyncCacheStorage(final InetSocketAddress address) throws IOException {
+ this(new MemcachedClient(address));
+ }
+
+ /**
+ * Create a storage backend using the pre-configured given
+ * <i>memcached</i> client.
+ * @param cache client to use for communicating with <i>memcached</i>
+ */
+ public MemcachedHttpAsyncCacheStorage(final MemcachedClient cache) {
+ this(cache, CacheConfig.DEFAULT, ByteArrayCacheEntrySerializer.INSTANCE, SHA256KeyHashingScheme.INSTANCE);
+ }
+
+ /**
+ * Create a storage backend using the given <i>memcached</i> client and
+ * applying the given cache configuration, serialization, and hashing
+ * mechanisms.
+ * @param client how to talk to <i>memcached</i>
+ * @param config apply HTTP cache-related options
+ * @param serializer alternative serialization mechanism
+ * @param keyHashingScheme how to map higher-level logical "storage keys"
+ * onto "cache keys" suitable for use with memcached
+ */
+ public MemcachedHttpAsyncCacheStorage(
+ final MemcachedClient client,
+ final CacheConfig config,
+ final HttpCacheEntrySerializer<byte[]> serializer,
+ final KeyHashingScheme keyHashingScheme) {
+ super((config != null ? config : CacheConfig.DEFAULT).getMaxUpdateRetries(),
+ serializer != null ? serializer : ByteArrayCacheEntrySerializer.INSTANCE);
+ this.client = Args.notNull(client, "Memcached client");
+ this.keyHashingScheme = keyHashingScheme;
+ }
+
+ @Override
+ protected String digestToStorageKey(final String key) {
+ return keyHashingScheme.hash(key);
+ }
+
+ private byte[] castAsByteArray(final Object storageObject) throws ResourceIOException {
+ if (storageObject == null) {
+ return null;
+ }
+ if (storageObject instanceof byte[]) {
+ return (byte[]) storageObject;
+ } else {
+ throw new ResourceIOException("Unexpected cache content: " + storageObject.getClass());
+ }
+ }
+
+ @Override
+ protected byte[] getStorageObject(final CASValue<Object> casValue) throws ResourceIOException {
+ return castAsByteArray(casValue.getValue());
+ }
+
+ private <T> Cancellable operation(final OperationFuture<T> operationFuture, final FutureCallback<T> callback) {
+ operationFuture.addListener(new OperationCompletionListener() {
+
+ @Override
+ public void onComplete(final OperationFuture<?> future) throws Exception {
+ try {
+ callback.completed(operationFuture.get());
+ } catch (final ExecutionException ex) {
+ if (ex.getCause() instanceof Exception) {
+ callback.failed((Exception) ex.getCause());
+ } else {
+ callback.failed(ex);
+ }
+ }
+ }
+
+ });
+ return new Cancellable() {
+
+ @Override
+ public boolean cancel() {
+ return operationFuture.cancel();
+ }
+
+ };
+ }
+
+ @Override
+ protected Cancellable store(final String storageKey, final byte[] storageObject, final FutureCallback<Boolean> callback) {
+ return operation(client.set(storageKey, 0, storageObject), callback);
+ }
+
+ @Override
+ protected Cancellable restore(final String storageKey, final FutureCallback<byte[]> callback) {
+ final GetFuture<Object> getFuture = client.asyncGet(storageKey);
+ getFuture.addListener(new GetCompletionListener() {
+
+ @Override
+ public void onComplete(final GetFuture<?> future) throws Exception {
+ try {
+ callback.completed(castAsByteArray(getFuture.get()));
+ } catch (final ExecutionException ex) {
+ if (ex.getCause() instanceof Exception) {
+ callback.failed((Exception) ex.getCause());
+ } else {
+ callback.failed(ex);
+ }
+ }
+ }
+
+ });
+ return new Cancellable() {
+
+ @Override
+ public boolean cancel() {
+ return getFuture.cancel(true);
+ }
+
+ };
+ }
+
+ @Override
+ protected Cancellable getForUpdateCAS(final String storageKey, final FutureCallback<CASValue<Object>> callback) {
+ return operation(client.asyncGets(storageKey), callback);
+ }
+
+ @Override
+ protected Cancellable updateCAS(
+ final String storageKey, final CASValue<Object> casValue, final byte[] storageObject, final FutureCallback<Boolean> callback) {
+ return operation(client.asyncCAS(storageKey, casValue.getCas(), storageObject), new FutureCallback<CASResponse>() {
+
+ @Override
+ public void completed(final CASResponse result) {
+ callback.completed(result == CASResponse.OK);
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ callback.failed(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ callback.cancelled();
+ }
+
+ });
+ }
+
+ @Override
+ protected Cancellable delete(final String storageKey, final FutureCallback<Boolean> callback) {
+ return operation(client.delete(storageKey), callback);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/002f40f9/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/memcached/MemcachedHttpCacheStorage.java
----------------------------------------------------------------------
diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/memcached/MemcachedHttpCacheStorage.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/memcached/MemcachedHttpCacheStorage.java
index 39d209e..6a6195c 100644
--- a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/memcached/MemcachedHttpCacheStorage.java
+++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/memcached/MemcachedHttpCacheStorage.java
@@ -32,14 +32,13 @@ import java.net.InetSocketAddress;
import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer;
import org.apache.hc.client5.http.cache.ResourceIOException;
import org.apache.hc.client5.http.impl.cache.AbstractBinaryCacheStorage;
-import org.apache.hc.client5.http.impl.cache.CacheConfig;
import org.apache.hc.client5.http.impl.cache.ByteArrayCacheEntrySerializer;
+import org.apache.hc.client5.http.impl.cache.CacheConfig;
import org.apache.hc.core5.util.Args;
import net.spy.memcached.CASResponse;
import net.spy.memcached.CASValue;
import net.spy.memcached.MemcachedClient;
-import net.spy.memcached.MemcachedClientIF;
import net.spy.memcached.OperationTimeoutException;
/**
@@ -82,7 +81,7 @@ import net.spy.memcached.OperationTimeoutException;
*/
public class MemcachedHttpCacheStorage extends AbstractBinaryCacheStorage<CASValue<Object>> {
- private final MemcachedClientIF client;
+ private final MemcachedClient client;
private final KeyHashingScheme keyHashingScheme;
/**
@@ -102,7 +101,7 @@ public class MemcachedHttpCacheStorage extends AbstractBinaryCacheStorage<CASVal
* <i>memcached</i> client.
* @param cache client to use for communicating with <i>memcached</i>
*/
- public MemcachedHttpCacheStorage(final MemcachedClientIF cache) {
+ public MemcachedHttpCacheStorage(final MemcachedClient cache) {
this(cache, CacheConfig.DEFAULT, ByteArrayCacheEntrySerializer.INSTANCE, SHA256KeyHashingScheme.INSTANCE);
}
@@ -117,7 +116,7 @@ public class MemcachedHttpCacheStorage extends AbstractBinaryCacheStorage<CASVal
* onto "cache keys" suitable for use with memcached
*/
public MemcachedHttpCacheStorage(
- final MemcachedClientIF client,
+ final MemcachedClient client,
final CacheConfig config,
final HttpCacheEntrySerializer<byte[]> serializer,
final KeyHashingScheme keyHashingScheme) {
@@ -134,11 +133,7 @@ public class MemcachedHttpCacheStorage extends AbstractBinaryCacheStorage<CASVal
@Override
protected void store(final String storageKey, final byte[] storageObject) throws ResourceIOException {
- try {
- client.set(storageKey, 0, storageObject);
- } catch (final OperationTimeoutException ex) {
- throw new MemcachedOperationTimeoutException(ex);
- }
+ client.set(storageKey, 0, storageObject);
}
private byte[] castAsByteArray(final Object storageObject) throws ResourceIOException {
@@ -184,11 +179,7 @@ public class MemcachedHttpCacheStorage extends AbstractBinaryCacheStorage<CASVal
@Override
protected void delete(final String storageKey) throws ResourceIOException {
- try {
- client.delete(storageKey);
- } catch (final OperationTimeoutException ex) {
- throw new MemcachedOperationTimeoutException(ex);
- }
+ client.delete(storageKey);
}
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/002f40f9/httpclient5-cache/src/test/java/org/apache/hc/client5/http/impl/cache/TestAbstractSerializingAsyncCacheStorage.java
----------------------------------------------------------------------
diff --git a/httpclient5-cache/src/test/java/org/apache/hc/client5/http/impl/cache/TestAbstractSerializingAsyncCacheStorage.java b/httpclient5-cache/src/test/java/org/apache/hc/client5/http/impl/cache/TestAbstractSerializingAsyncCacheStorage.java
new file mode 100644
index 0000000..7070c89
--- /dev/null
+++ b/httpclient5-cache/src/test/java/org/apache/hc/client5/http/impl/cache/TestAbstractSerializingAsyncCacheStorage.java
@@ -0,0 +1,450 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.impl.cache;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hc.client5.http.cache.HttpCacheCASOperation;
+import org.apache.hc.client5.http.cache.HttpCacheEntry;
+import org.apache.hc.client5.http.cache.HttpCacheStorageEntry;
+import org.apache.hc.client5.http.cache.HttpCacheUpdateException;
+import org.apache.hc.client5.http.cache.ResourceIOException;
+import org.apache.hc.core5.concurrent.Cancellable;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestAbstractSerializingAsyncCacheStorage {
+
+ @Mock
+ private Cancellable cancellable;
+ @Mock
+ private FutureCallback<Boolean> operationCallback;
+ @Mock
+ private FutureCallback<HttpCacheEntry> cacheEntryCallback;
+
+ private AbstractBinaryAsyncCacheStorage<String> impl;
+
+ public static byte[] serialize(final String key, final HttpCacheEntry value) throws ResourceIOException {
+ return ByteArrayCacheEntrySerializer.INSTANCE.serialize(new HttpCacheStorageEntry(key, value));
+ }
+
+ @Before
+ @SuppressWarnings("unchecked")
+ public void setUp() {
+ impl = Mockito.mock(AbstractBinaryAsyncCacheStorage.class,
+ Mockito.withSettings().defaultAnswer(Answers.CALLS_REAL_METHODS).useConstructor(3));
+ }
+
+ @Test
+ public void testCachePut() throws Exception {
+ final String key = "foo";
+ final HttpCacheEntry value = HttpTestUtils.makeCacheEntry();
+
+ Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar");
+ Mockito.when(impl.store(
+ Mockito.eq("bar"),
+ Mockito.<byte[]>any(),
+ Mockito.<FutureCallback<Boolean>>any())).thenAnswer(new Answer<Cancellable>() {
+
+ @Override
+ public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
+ final FutureCallback<Boolean> callback = invocation.getArgument(2);
+ callback.completed(true);
+ return cancellable;
+ }
+
+ });
+
+ impl.putEntry(key, value, operationCallback);
+
+ final ArgumentCaptor<byte[]> argumentCaptor = ArgumentCaptor.forClass(byte[].class);
+ Mockito.verify(impl).store(Mockito.eq("bar"), argumentCaptor.capture(), Mockito.<FutureCallback<Boolean>>any());
+ Assert.assertArrayEquals(serialize(key, value), argumentCaptor.getValue());
+ Mockito.verify(operationCallback).completed(Boolean.TRUE);
+ }
+
+ @Test
+ public void testCacheGetNullEntry() throws Exception {
+ final String key = "foo";
+
+ Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar");
+ Mockito.when(impl.restore(Mockito.eq("bar"), Mockito.<FutureCallback<byte[]>>any())).thenAnswer(new Answer<Cancellable>() {
+
+ @Override
+ public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
+ final FutureCallback<byte[]> callback = invocation.getArgument(1);
+ callback.completed(null);
+ return cancellable;
+ }
+
+ });
+
+ impl.getEntry(key, cacheEntryCallback);
+ final ArgumentCaptor<HttpCacheEntry> argumentCaptor = ArgumentCaptor.forClass(HttpCacheEntry.class);
+ Mockito.verify(cacheEntryCallback).completed(argumentCaptor.capture());
+ Assert.assertThat(argumentCaptor.getValue(), CoreMatchers.nullValue());
+ Mockito.verify(impl).restore(Mockito.eq("bar"), Mockito.<FutureCallback<byte[]>>any());
+ }
+
+ @Test
+ public void testCacheGet() throws Exception {
+ final String key = "foo";
+ final HttpCacheEntry value = HttpTestUtils.makeCacheEntry();
+
+ Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar");
+ Mockito.when(impl.restore(Mockito.eq("bar"), Mockito.<FutureCallback<byte[]>>any())).thenAnswer(new Answer<Cancellable>() {
+
+ @Override
+ public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
+ final FutureCallback<byte[]> callback = invocation.getArgument(1);
+ callback.completed(serialize(key, value));
+ return cancellable;
+ }
+
+ });
+
+ impl.getEntry(key, cacheEntryCallback);
+ final ArgumentCaptor<HttpCacheEntry> argumentCaptor = ArgumentCaptor.forClass(HttpCacheEntry.class);
+ Mockito.verify(cacheEntryCallback).completed(argumentCaptor.capture());
+ final HttpCacheEntry resultingEntry = argumentCaptor.getValue();
+ Assert.assertThat(resultingEntry, HttpCacheEntryMatcher.equivalent(value));
+ Mockito.verify(impl).restore(Mockito.eq("bar"), Mockito.<FutureCallback<byte[]>>any());
+ }
+
+ @Test
+ public void testCacheGetKeyMismatch() throws Exception {
+ final String key = "foo";
+ final HttpCacheEntry value = HttpTestUtils.makeCacheEntry();
+ Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar");
+ Mockito.when(impl.restore(Mockito.eq("bar"), Mockito.<FutureCallback<byte[]>>any())).thenAnswer(new Answer<Cancellable>() {
+
+ @Override
+ public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
+ final FutureCallback<byte[]> callback = invocation.getArgument(1);
+ callback.completed(serialize("not-foo", value));
+ return cancellable;
+ }
+
+ });
+
+ impl.getEntry(key, cacheEntryCallback);
+ final ArgumentCaptor<HttpCacheEntry> argumentCaptor = ArgumentCaptor.forClass(HttpCacheEntry.class);
+ Mockito.verify(cacheEntryCallback).completed(argumentCaptor.capture());
+ Assert.assertThat(argumentCaptor.getValue(), CoreMatchers.nullValue());
+ Mockito.verify(impl).restore(Mockito.eq("bar"), Mockito.<FutureCallback<byte[]>>any());
+ }
+
+ @Test
+ public void testCacheRemove() throws Exception{
+ final String key = "foo";
+
+ Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar");
+ Mockito.when(impl.delete(
+ Mockito.eq("bar"),
+ Mockito.<FutureCallback<Boolean>>any())).thenAnswer(new Answer<Cancellable>() {
+
+ @Override
+ public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
+ final FutureCallback<Boolean> callback = invocation.getArgument(1);
+ callback.completed(true);
+ return cancellable;
+ }
+
+ });
+ impl.removeEntry(key, operationCallback);
+
+ Mockito.verify(impl).delete("bar", operationCallback);
+ Mockito.verify(operationCallback).completed(Boolean.TRUE);
+ }
+
+ @Test
+ public void testCacheUpdateNullEntry() throws Exception {
+ final String key = "foo";
+ final HttpCacheEntry updatedValue = HttpTestUtils.makeCacheEntry();
+
+ Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar");
+ Mockito.when(impl.getForUpdateCAS(Mockito.eq("bar"), Mockito.<FutureCallback<String>>any())).thenAnswer(new Answer<Cancellable>() {
+
+ @Override
+ public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
+ final FutureCallback<byte[]> callback = invocation.getArgument(1);
+ callback.completed(null);
+ return cancellable;
+ }
+
+ });
+ Mockito.when(impl.store(
+ Mockito.eq("bar"),
+ Mockito.<byte[]>any(),
+ Mockito.<FutureCallback<Boolean>>any())).thenAnswer(new Answer<Cancellable>() {
+
+ @Override
+ public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
+ final FutureCallback<Boolean> callback = invocation.getArgument(2);
+ callback.completed(true);
+ return cancellable;
+ }
+
+ });
+
+ impl.updateEntry(key, new HttpCacheCASOperation() {
+
+ @Override
+ public HttpCacheEntry execute(final HttpCacheEntry existing) throws ResourceIOException {
+ Assert.assertThat(existing, CoreMatchers.nullValue());
+ return updatedValue;
+ }
+
+ }, operationCallback);
+
+ Mockito.verify(impl).getForUpdateCAS(Mockito.eq("bar"), Mockito.<FutureCallback<String>>any());
+ Mockito.verify(impl).store(Mockito.eq("bar"), Mockito.<byte[]>any(), Mockito.<FutureCallback<Boolean>>any());
+ Mockito.verify(operationCallback).completed(Boolean.TRUE);
+ }
+
+ @Test
+ public void testCacheCASUpdate() throws Exception {
+ final String key = "foo";
+ final HttpCacheEntry existingValue = HttpTestUtils.makeCacheEntry();
+ final HttpCacheEntry updatedValue = HttpTestUtils.makeCacheEntry();
+
+ Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar");
+ Mockito.when(impl.getForUpdateCAS(Mockito.eq("bar"), Mockito.<FutureCallback<String>>any())).thenAnswer(new Answer<Cancellable>() {
+
+ @Override
+ public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
+ final FutureCallback<String> callback = invocation.getArgument(1);
+ callback.completed("stuff");
+ return cancellable;
+ }
+
+ });
+ Mockito.when(impl.getStorageObject("stuff")).thenReturn(serialize(key, existingValue));
+ Mockito.when(impl.updateCAS(
+ Mockito.eq("bar"),
+ Mockito.eq("stuff"),
+ Mockito.<byte[]>any(),
+ Mockito.<FutureCallback<Boolean>>any())).thenAnswer(new Answer<Cancellable>() {
+
+ @Override
+ public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
+ final FutureCallback<Boolean> callback = invocation.getArgument(3);
+ callback.completed(true);
+ return cancellable;
+ }
+
+ });
+
+ impl.updateEntry(key, new HttpCacheCASOperation() {
+
+ @Override
+ public HttpCacheEntry execute(final HttpCacheEntry existing) throws ResourceIOException {
+ return updatedValue;
+ }
+
+ }, operationCallback);
+
+ Mockito.verify(impl).getForUpdateCAS(Mockito.eq("bar"), Mockito.<FutureCallback<String>>any());
+ Mockito.verify(impl).getStorageObject("stuff");
+ Mockito.verify(impl).updateCAS(Mockito.eq("bar"), Mockito.eq("stuff"), Mockito.<byte[]>any(), Mockito.<FutureCallback<Boolean>>any());
+ Mockito.verify(operationCallback).completed(Boolean.TRUE);
+ }
+
+ @Test
+ public void testCacheCASUpdateKeyMismatch() throws Exception {
+ final String key = "foo";
+ final HttpCacheEntry existingValue = HttpTestUtils.makeCacheEntry();
+ final HttpCacheEntry updatedValue = HttpTestUtils.makeCacheEntry();
+
+ Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar");
+ Mockito.when(impl.getForUpdateCAS(Mockito.eq("bar"), Mockito.<FutureCallback<String>>any())).thenAnswer(
+ new Answer<Cancellable>() {
+
+ @Override
+ public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
+ final FutureCallback<String> callback = invocation.getArgument(1);
+ callback.completed("stuff");
+ return cancellable;
+ }
+
+ });
+ Mockito.when(impl.getStorageObject("stuff")).thenReturn(serialize("not-foo", existingValue));
+ Mockito.when(impl.store(
+ Mockito.eq("bar"),
+ Mockito.<byte[]>any(),
+ Mockito.<FutureCallback<Boolean>>any())).thenAnswer(new Answer<Cancellable>() {
+
+ @Override
+ public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
+ final FutureCallback<Boolean> callback = invocation.getArgument(2);
+ callback.completed(true);
+ return cancellable;
+ }
+
+ });
+
+ impl.updateEntry(key, new HttpCacheCASOperation() {
+
+ @Override
+ public HttpCacheEntry execute(final HttpCacheEntry existing) throws ResourceIOException {
+ Assert.assertThat(existing, CoreMatchers.nullValue());
+ return updatedValue;
+ }
+
+ }, operationCallback);
+
+ Mockito.verify(impl).getForUpdateCAS(Mockito.eq("bar"), Mockito.<FutureCallback<String>>any());
+ Mockito.verify(impl).getStorageObject("stuff");
+ Mockito.verify(impl, Mockito.never()).updateCAS(
+ Mockito.eq("bar"), Mockito.eq("stuff"), Mockito.<byte[]>any(), Mockito.<FutureCallback<Boolean>>any());
+ Mockito.verify(impl).store(Mockito.eq("bar"), Mockito.<byte[]>any(), Mockito.<FutureCallback<Boolean>>any());
+ Mockito.verify(operationCallback).completed(Boolean.TRUE);
+ }
+
+ @Test
+ public void testSingleCacheUpdateRetry() throws Exception {
+ final String key = "foo";
+ final HttpCacheEntry existingValue = HttpTestUtils.makeCacheEntry();
+ final HttpCacheEntry updatedValue = HttpTestUtils.makeCacheEntry();
+
+ Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar");
+ Mockito.when(impl.getForUpdateCAS(Mockito.eq("bar"), Mockito.<FutureCallback<String>>any())).thenAnswer(
+ new Answer<Cancellable>() {
+
+ @Override
+ public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
+ final FutureCallback<String> callback = invocation.getArgument(1);
+ callback.completed("stuff");
+ return cancellable;
+ }
+
+ });
+ Mockito.when(impl.getStorageObject("stuff")).thenReturn(serialize(key, existingValue));
+ final AtomicInteger count = new AtomicInteger(0);
+ Mockito.when(impl.updateCAS(
+ Mockito.eq("bar"),
+ Mockito.eq("stuff"),
+ Mockito.<byte[]>any(),
+ Mockito.<FutureCallback<Boolean>>any())).thenAnswer(new Answer<Cancellable>() {
+
+ @Override
+ public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
+ final FutureCallback<Boolean> callback = invocation.getArgument(3);
+ if (count.incrementAndGet() == 1) {
+ callback.completed(false);
+ } else {
+ callback.completed(true);
+ }
+ return cancellable;
+ }
+
+ });
+
+ impl.updateEntry(key, new HttpCacheCASOperation() {
+
+ @Override
+ public HttpCacheEntry execute(final HttpCacheEntry existing) throws ResourceIOException {
+ return updatedValue;
+ }
+
+ }, operationCallback);
+
+ Mockito.verify(impl, Mockito.times(2)).getForUpdateCAS(Mockito.eq("bar"), Mockito.<FutureCallback<String>>any());
+ Mockito.verify(impl, Mockito.times(2)).getStorageObject("stuff");
+ Mockito.verify(impl, Mockito.times(2)).updateCAS(
+ Mockito.eq("bar"), Mockito.eq("stuff"), Mockito.<byte[]>any(), Mockito.<FutureCallback<Boolean>>any());
+ Mockito.verify(operationCallback).completed(Boolean.TRUE);
+ }
+
+ @Test
+ public void testCacheUpdateFail() throws Exception {
+ final String key = "foo";
+ final HttpCacheEntry existingValue = HttpTestUtils.makeCacheEntry();
+ final HttpCacheEntry updatedValue = HttpTestUtils.makeCacheEntry();
+
+ Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar");
+ Mockito.when(impl.getForUpdateCAS(Mockito.eq("bar"), Mockito.<FutureCallback<String>>any())).thenAnswer(
+ new Answer<Cancellable>() {
+
+ @Override
+ public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
+ final FutureCallback<String> callback = invocation.getArgument(1);
+ callback.completed("stuff");
+ return cancellable;
+ }
+
+ });
+ Mockito.when(impl.getStorageObject("stuff")).thenReturn(serialize(key, existingValue));
+ final AtomicInteger count = new AtomicInteger(0);
+ Mockito.when(impl.updateCAS(
+ Mockito.eq("bar"),
+ Mockito.eq("stuff"),
+ Mockito.<byte[]>any(),
+ Mockito.<FutureCallback<Boolean>>any())).thenAnswer(new Answer<Cancellable>() {
+
+ @Override
+ public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
+ final FutureCallback<Boolean> callback = invocation.getArgument(3);
+ if (count.incrementAndGet() <= 3) {
+ callback.completed(false);
+ } else {
+ callback.completed(true);
+ }
+ return cancellable;
+ }
+
+ });
+
+ impl.updateEntry(key, new HttpCacheCASOperation() {
+
+ @Override
+ public HttpCacheEntry execute(final HttpCacheEntry existing) throws ResourceIOException {
+ return updatedValue;
+ }
+
+ }, operationCallback);
+
+ Mockito.verify(impl, Mockito.times(3)).getForUpdateCAS(Mockito.eq("bar"), Mockito.<FutureCallback<String>>any());
+ Mockito.verify(impl, Mockito.times(3)).getStorageObject("stuff");
+ Mockito.verify(impl, Mockito.times(3)).updateCAS(
+ Mockito.eq("bar"), Mockito.eq("stuff"), Mockito.<byte[]>any(), Mockito.<FutureCallback<Boolean>>any());
+ Mockito.verify(operationCallback).failed(Mockito.<HttpCacheUpdateException>any());
+ }
+
+}