You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dm...@apache.org on 2015/10/12 19:53:37 UTC
[12/25] ignite git commit: ignite-1526: full support of IBM JDK by
Ignite
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
index ac8c5af..d89e397 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
@@ -22,7 +22,6 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.locks.Lock;
import javax.cache.CacheException;
import javax.cache.CacheManager;
@@ -35,7 +34,6 @@ import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCompute;
-import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cache.CachePeekMode;
@@ -48,8 +46,8 @@ import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.mxbean.CacheMetricsMXBean;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.jetbrains.annotations.Nullable;
/**
@@ -63,9 +61,6 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
/** Cache name. */
private final String cacheName;
- /** Grid id. */
- private final UUID gridId;
-
/** With async. */
private final boolean isAsync;
@@ -82,31 +77,16 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
/**
* @param name Name.
- * @param async
+ * @param async Async flag.
* @param proxy Ignite Process Proxy.
*/
public IgniteCacheProcessProxy(String name, boolean async, IgniteProcessProxy proxy) {
cacheName = name;
isAsync = async;
- gridId = proxy.getId();
igniteProxy = proxy;
compute = proxy.remoteCompute();
}
- /**
- * Returns cache instance. Method to be called from closure at another JVM.
- *
- * @return Cache.
- */
- private IgniteCache<Object, Object> cache() {
- IgniteCache cache = Ignition.ignite(gridId).cache(cacheName);
-
- if (isAsync)
- cache = cache.withAsync();
-
- return cache;
- }
-
/** {@inheritDoc} */
@Override public IgniteCache<K, V> withAsync() {
return new IgniteCacheProcessProxy<>(cacheName, true, igniteProxy);
@@ -124,14 +104,8 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
}
/** {@inheritDoc} */
- @Override public <C extends Configuration<K, V>> C getConfiguration(final Class<C> clazz) {
- final Class cl = clazz;
-
- return (C)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().getConfiguration(cl);
- }
- });
+ @Override public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) {
+ return compute.call(new GetConfigurationTask<>(cacheName, isAsync, clazz));
}
/** {@inheritDoc} */
@@ -149,33 +123,26 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
throw new UnsupportedOperationException("Method should be supported.");
}
+ /** {@inheritDoc} */
@Override public IgniteCache<K, V> withNoRetries() {
throw new UnsupportedOperationException("Method should be supported.");
}
/** {@inheritDoc} */
- @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException {
+ @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args)
+ throws CacheException {
throw new UnsupportedOperationException("Method should be supported.");
}
/** {@inheritDoc} */
- @Override public void localLoadCache(@Nullable final IgniteBiPredicate<K, V> p, @Nullable final Object... args) throws CacheException {
- final IgniteBiPredicate pCopy = p;
-
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- cache().localLoadCache(pCopy, args);
- }
- });
+ @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args)
+ throws CacheException {
+ compute.call(new LocalLoadCacheTask<>(cacheName, isAsync, p, args));
}
/** {@inheritDoc} */
- @Override public V getAndPutIfAbsent(final K key, final V val) throws CacheException {
- return (V)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().getAndPutIfAbsent(key, val);
- }
- });
+ @Override public V getAndPutIfAbsent(K key, V val) throws CacheException {
+ return compute.call(new GetAndPutIfAbsentTask<>(cacheName, isAsync, key, val));
}
/** {@inheritDoc} */
@@ -189,12 +156,8 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
}
/** {@inheritDoc} */
- @Override public boolean isLocalLocked(final K key, final boolean byCurrThread) {
- return compute.call(new IgniteCallable<Boolean>() {
- @Override public Boolean call() throws Exception {
- return cache().isLocalLocked(key, byCurrThread);
- }
- });
+ @Override public boolean isLocalLocked(K key, boolean byCurrThread) {
+ return compute.call(new IsLocalLockedTask<>(cacheName, isAsync, key, byCurrThread));
}
/** {@inheritDoc} */
@@ -203,18 +166,8 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public Iterable<Entry<K, V>> localEntries(final CachePeekMode... peekModes) throws CacheException {
- return (Iterable<Entry<K, V>>)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- Collection<Entry> res = new ArrayList<>();
-
- for (Entry e : cache().localEntries(peekModes))
- res.add(new CacheEntryImpl(e.getKey(), e.getValue()));
-
- return res;
- }
- });
+ @Override public Iterable<Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException {
+ return compute.call(new LocalEntriesTask<K, V>(cacheName, isAsync, peekModes));
}
/** {@inheritDoc} */
@@ -223,21 +176,13 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
}
/** {@inheritDoc} */
- @Override public void localEvict(final Collection<? extends K> keys) {
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- cache().localEvict(keys);
- }
- });
+ @Override public void localEvict(Collection<? extends K> keys) {
+ compute.call(new LocalEvictTask<>(cacheName, isAsync, keys));
}
/** {@inheritDoc} */
- @Override public V localPeek(final K key, final CachePeekMode... peekModes) {
- return (V)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().localPeek(key, peekModes);
- }
- });
+ @Override public V localPeek(K key, CachePeekMode... peekModes) {
+ return compute.call(new LocalPeekTask<K, V>(cacheName, isAsync, key, peekModes));
}
/** {@inheritDoc} */
@@ -246,274 +191,160 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
}
/** {@inheritDoc} */
- @Override public int size(final CachePeekMode... peekModes) throws CacheException {
- return (int)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().size(peekModes);
- }
- });
+ @Override public int size(CachePeekMode... peekModes) throws CacheException {
+ return compute.call(new SizeTask(cacheName, isAsync, peekModes, false));
}
/** {@inheritDoc} */
- @Override public int localSize(final CachePeekMode... peekModes) {
- return (int)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().localSize(peekModes);
- }
- });
+ @Override public int localSize(CachePeekMode... peekModes) {
+ return compute.call(new SizeTask(cacheName, isAsync, peekModes, true));
}
/** {@inheritDoc} */
- @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
- Object... args) {
+ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(
+ Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
+ Object... args)
+ {
throw new UnsupportedOperationException("Method should be supported.");
}
/** {@inheritDoc} */
- @Override public V get(final K key) {
- return (V)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().get(key);
- }
- });
+ @Override public V get(K key) {
+ return compute.call(new GetTask<K, V>(cacheName, isAsync, key));
}
/** {@inheritDoc} */
- @Override public Map<K, V> getAll(final Set<? extends K> keys) {
- return (Map<K, V>)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().getAll(keys);
- }
- });
+ @Override public Map<K, V> getAll(Set<? extends K> keys) {
+ return compute.call(new GetAllTask<K, V>(cacheName, isAsync, keys));
}
- @Override public Map<K, V> getAllOutTx(final Set<? extends K> keys) {
- return (Map<K, V>)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().getAllOutTx(keys);
- }
- });
+ /** {@inheritDoc} */
+ @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) {
+ return compute.call(new GetAllOutTxTask<K, V>(cacheName, isAsync, keys));
}
/** {@inheritDoc} */
- @Override public boolean containsKey(final K key) {
- return (boolean)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().containsKey(key);
- }
- });
+ @Override public boolean containsKey(K key) {
+ return compute.call(new ContainsKeyTask<>(cacheName, isAsync, key));
}
/** {@inheritDoc} */
- @Override public void loadAll(Set<? extends K> keys, boolean replaceExistingValues, CompletionListener completionLsnr) {
+ @Override public void loadAll(Set<? extends K> keys, boolean replaceExistVals, CompletionListener completionLsnr) {
throw new UnsupportedOperationException("Oparetion can't be supported automatically.");
}
/** {@inheritDoc} */
- @Override public boolean containsKeys(final Set<? extends K> keys) {
- return (boolean)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().containsKeys(keys);
- }
- });
+ @Override public boolean containsKeys(Set<? extends K> keys) {
+ return compute.call(new ContainsKeysTask<>(cacheName, isAsync, keys));
}
/** {@inheritDoc} */
- @Override public void put(final K key, final V val) {;
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- cache().put(key, val);
- }
- });
+ @Override public void put(K key, V val) {
+ compute.call(new PutTask<>(cacheName, isAsync, key, val));
}
/** {@inheritDoc} */
- @Override public V getAndPut(final K key, final V val) {
- return (V)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().getAndPut(key, val);
- }
- });
+ @Override public V getAndPut(K key, V val) {
+ return compute.call(new GetAndPutTask<>(cacheName, isAsync, key, val));
}
/** {@inheritDoc} */
- @Override public void putAll(final Map<? extends K, ? extends V> map) {
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- cache().putAll(map);
- }
- });
+ @Override public void putAll(Map<? extends K, ? extends V> map) {
+ compute.call(new PutAllTask<>(cacheName, isAsync, map));
}
/** {@inheritDoc} */
- @Override public boolean putIfAbsent(final K key, final V val) {
- return (boolean)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().putIfAbsent(key, val);
- }
- });
+ @Override public boolean putIfAbsent(K key, V val) {
+ return compute.call(new PutIfAbsentTask<>(cacheName, isAsync, key, val));
}
/** {@inheritDoc} */
- @Override public boolean remove(final K key) {
- return (boolean)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().remove(key);
- }
- });
+ @Override public boolean remove(K key) {
+ return compute.call(new RemoveTask<>(cacheName, isAsync, key));
}
/** {@inheritDoc} */
- @Override public boolean remove(final K key, final V oldVal) {
- return (boolean)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().remove(key, oldVal);
- }
- });
+ @Override public boolean remove(K key, V oldVal) {
+ return compute.call(new RemoveIfExistsTask<>(cacheName, isAsync, key, oldVal));
}
/** {@inheritDoc} */
- @Override public V getAndRemove(final K key) {
- return (V)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().getAndRemove(key);
- }
- });
+ @Override public V getAndRemove(K key) {
+ return compute.call(new GetAndRemoveTask<K, V>(cacheName, isAsync, key));
}
/** {@inheritDoc} */
- @Override public boolean replace(final K key, final V oldVal, final V newVal) {
- return (boolean)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().replace(key, oldVal, newVal);
- }
- });
+ @Override public boolean replace(K key, V oldVal, V newVal) {
+ return compute.call(new ReplaceIfExistsTask<>(cacheName, isAsync, key, oldVal, newVal));
}
/** {@inheritDoc} */
- @Override public boolean replace(final K key, final V val) {
- return (boolean)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().replace(key, val);
- }
- });
+ @Override public boolean replace(K key, V val) {
+ return compute.call(new ReplaceTask<>(cacheName, isAsync, key, val));
}
/** {@inheritDoc} */
- @Override public V getAndReplace(final K key, final V val) {
- return (V)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().getAndReplace(key, val);
- }
- });
+ @Override public V getAndReplace(K key, V val) {
+ return compute.call(new GetAndReplaceTask<>(cacheName, isAsync, key, val));
}
/** {@inheritDoc} */
- @Override public void removeAll(final Set<? extends K> keys) {
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- cache().removeAll(keys);
- }
- });
+ @Override public void removeAll(Set<? extends K> keys) {
+ compute.call(new RemoveAllKeysTask<>(cacheName, isAsync, keys));
}
/** {@inheritDoc} */
@Override public void removeAll() {
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- IgniteCache<Object, Object> cache = cache();
-
- cache.removeAll();
-
- if (isAsync)
- cache.future().get();
- }
- });
+ compute.call(new RemoveAllTask<K, V>(cacheName, isAsync));
}
/** {@inheritDoc} */
@Override public void clear() {
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- cache().clear();
- }
- });
+ compute.call(new ClearTask(cacheName, isAsync));
}
/** {@inheritDoc} */
- @Override public void clear(final K key) {
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- cache().clear(key);
- }
- });
+ @Override public void clear(K key) {
+ compute.call(new ClearKeyTask<>(cacheName, isAsync, false, key));
}
/** {@inheritDoc} */
- @Override public void clearAll(final Set<? extends K> keys) {
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- cache().clearAll(keys);
- }
- });
+ @Override public void clearAll(Set<? extends K> keys) {
+ compute.call(new ClearAllKeys<>(cacheName, isAsync, false, keys));
}
/** {@inheritDoc} */
- @Override public void localClear(final K key) {
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- cache().localClear(key);
- }
- });
+ @Override public void localClear(K key) {
+ compute.call(new ClearKeyTask<>(cacheName, isAsync, true, key));
}
/** {@inheritDoc} */
- @Override public void localClearAll(final Set<? extends K> keys) {
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- cache().localClearAll(keys);
- }
- });
+ @Override public void localClearAll(Set<? extends K> keys) {
+ compute.call(new ClearAllKeys<>(cacheName, isAsync, true, keys));
}
/** {@inheritDoc} */
- @Override public <T> T invoke(final K key, final EntryProcessor<K, V, T> entryProcessor, final Object... arguments) {
- return (T)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().invoke(key,
- (EntryProcessor<Object, Object, Object>)entryProcessor, arguments);
- }
- });
+ @Override public <T> T invoke(K key, EntryProcessor<K, V, T> processor, Object... args) {
+ return compute.call(new InvokeTask<>(cacheName, isAsync, key, processor, args));
}
/** {@inheritDoc} */
- @Override public <T> T invoke(final K key, final CacheEntryProcessor<K, V, T> entryProcessor, final Object... arguments) {
- return (T)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().invoke(key,
- (CacheEntryProcessor<Object, Object, Object>)entryProcessor, arguments);
- }
- });
+ @Override public <T> T invoke(K key, CacheEntryProcessor<K, V, T> processor, Object... args) {
+ return compute.call(new InvokeTask<>(cacheName, isAsync, key, processor, args));
}
/** {@inheritDoc} */
- @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(final Set<? extends K> keys, final EntryProcessor<K, V, T> entryProcessor,
- final Object... args) {
- return (Map<K, EntryProcessorResult<T>>)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().invokeAll(keys,
- (EntryProcessor<Object, Object, Object>)entryProcessor, args);
- }
- });
+ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(
+ Set<? extends K> keys,
+ EntryProcessor<K, V, T> processor,
+ Object... args)
+ {
+ return compute.call(new InvokeAllTask<>(cacheName, isAsync, keys, processor, args));
}
/** {@inheritDoc} */
@Override public String getName() {
- return (String)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().getName();
- }
- });
+ return compute.call(new GetNameTask(cacheName, isAsync));
}
/** {@inheritDoc} */
@@ -523,72 +354,47 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
/** {@inheritDoc} */
@Override public void close() {
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- cache().close();
- }
- });
+ compute.call(new CloseTask(cacheName, isAsync));
}
/** {@inheritDoc} */
@Override public void destroy() {
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- cache().destroy();
- }
- });
+ compute.call(new DestroyTask(cacheName, isAsync));
}
/** {@inheritDoc} */
@Override public boolean isClosed() {
- return (boolean)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().isClosed();
- }
- });
+ return compute.call(new IsClosedTask(cacheName, isAsync));
}
/** {@inheritDoc} */
- @Override public <T> T unwrap(final Class<T> clazz) {
+ @SuppressWarnings("unchecked")
+ @Override public <T> T unwrap(Class<T> clazz) {
if (Ignite.class.equals(clazz))
return (T)igniteProxy;
try {
- return (T)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().unwrap(clazz);
- }
- });
+ return compute.call(new UnwrapTask<>(cacheName, isAsync, clazz));
}
catch (Exception e) {
- throw new IllegalArgumentException("Looks like class " + clazz + " is unmarshallable. Exception type:" + e.getClass(), e);
+ throw new IllegalArgumentException("Looks like class " + clazz +
+ " is unmarshallable. Exception type:" + e.getClass(), e);
}
}
/** {@inheritDoc} */
- @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {
+ @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
throw new UnsupportedOperationException("Method should be supported.");
}
/** {@inheritDoc} */
- @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {
+ @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
throw new UnsupportedOperationException("Method should be supported.");
}
/** {@inheritDoc} */
@Override public Iterator<Entry<K, V>> iterator() {
- final Collection<Entry<K, V>> col = (Collection<Entry<K, V>>)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- Collection res = new ArrayList();
-
- for (Object o : cache())
- res.add(o);
-
- return res;
- }
- });
-
- return col.iterator();
+ return compute.call(new IteratorTask<K, V>(cacheName, isAsync)).iterator();
}
/** {@inheritDoc} */
@@ -616,4 +422,968 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
@Override public CacheMetricsMXBean mxBean() {
throw new UnsupportedOperationException("Method should be supported.");
}
+
+ /**
+ *
+ */
+ private static class GetConfigurationTask<K, V, C extends Configuration<K, V>> extends CacheTaskAdapter<K, V, C> {
+ /** Clazz. */
+ private final Class<C> clazz;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param clazz Clazz.
+ */
+ public GetConfigurationTask(String cacheName, boolean async, Class<C> clazz) {
+ super(cacheName, async);
+ this.clazz = clazz;
+ }
+
+ /** {@inheritDoc} */
+ @Override public C call() throws Exception {
+ return cache().getConfiguration(clazz);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class LocalLoadCacheTask<K, V> extends CacheTaskAdapter<K, V, Void> {
+ /** Predicate. */
+ private final IgniteBiPredicate<K, V> p;
+
+ /** Args. */
+ private final Object[] args;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param p P.
+ * @param args Args.
+ */
+ public LocalLoadCacheTask(String cacheName, boolean async, IgniteBiPredicate<K, V> p, Object[] args) {
+ super(cacheName, async);
+ this.p = p;
+ this.args = args;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() {
+ cache().localLoadCache(p, args);
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class GetAndPutIfAbsentTask<K, V> extends CacheTaskAdapter<K, V, V> {
+ /** Key. */
+ private final K key;
+
+ /** Value. */
+ private final V val;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ * @param val Value.
+ */
+ public GetAndPutIfAbsentTask(String cacheName, boolean async, K key, V val) {
+ super(cacheName, async);
+ this.key = key;
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V call() throws Exception {
+ return cache().getAndPutIfAbsent(key, val);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class IsLocalLockedTask<K> extends CacheTaskAdapter<K, Void, Boolean> {
+ /** Key. */
+ private final K key;
+
+ /** By current thread. */
+ private final boolean byCurrThread;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ * @param byCurrThread By current thread.
+ */
+ public IsLocalLockedTask(String cacheName, boolean async, K key, boolean byCurrThread) {
+ super(cacheName, async);
+ this.key = key;
+ this.byCurrThread = byCurrThread;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean call() throws Exception {
+ return cache().isLocalLocked(key, byCurrThread);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class LocalEntriesTask<K, V> extends CacheTaskAdapter<K, V, Iterable<Entry<K, V>>> {
+ /** Peek modes. */
+ private final CachePeekMode[] peekModes;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param peekModes Peek modes.
+ */
+ public LocalEntriesTask(String cacheName, boolean async, CachePeekMode[] peekModes) {
+ super(cacheName, async);
+ this.peekModes = peekModes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterable<Entry<K, V>> call() throws Exception {
+ Collection<Entry<K, V>> res = new ArrayList<>();
+
+ for (Entry<K, V> e : cache().localEntries(peekModes))
+ res.add(new CacheEntryImpl<>(e.getKey(), e.getValue()));
+
+ return res;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class LocalEvictTask<K> extends CacheTaskAdapter<K, Void, Void> {
+ /** Keys. */
+ private final Collection<? extends K> keys;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param keys Keys.
+ */
+ public LocalEvictTask(String cacheName, boolean async, Collection<? extends K> keys) {
+ super(cacheName, async);
+ this.keys = keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() {
+ cache().localEvict(keys);
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class LocalPeekTask<K, V> extends CacheTaskAdapter<K, V, V> {
+ /** Key. */
+ private final K key;
+
+ /** Peek modes. */
+ private final CachePeekMode[] peekModes;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ * @param peekModes Peek modes.
+ */
+ public LocalPeekTask(String cacheName, boolean async, K key, CachePeekMode[] peekModes) {
+ super(cacheName, async);
+ this.key = key;
+ this.peekModes = peekModes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V call() throws Exception {
+ return cache().localPeek(key, peekModes);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class SizeTask extends CacheTaskAdapter<Void, Void, Integer> {
+ /** Peek modes. */
+ private final CachePeekMode[] peekModes;
+
+ /** Local. */
+ private final boolean loc;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param peekModes Peek modes.
+ * @param loc Local.
+ */
+ public SizeTask(String cacheName, boolean async, CachePeekMode[] peekModes, boolean loc) {
+ super(cacheName, async);
+ this.loc = loc;
+ this.peekModes = peekModes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Integer call() throws Exception {
+ return loc ? cache().localSize(peekModes) : cache().size(peekModes);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class GetTask<K, V> extends CacheTaskAdapter<K, V, V> {
+ /** Key. */
+ private final K key;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ */
+ public GetTask(String cacheName, boolean async, K key) {
+ super(cacheName, async);
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V call() throws Exception {
+ return cache().get(key);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class RemoveAllTask<K, V> extends CacheTaskAdapter<K, V, Void> {
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ */
+ public RemoveAllTask(String cacheName, boolean async) {
+ super(cacheName, async);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() {
+ IgniteCache<K, V> cache = cache();
+
+ cache.removeAll();
+
+ if (async)
+ cache.future().get();
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class PutTask<K, V> extends CacheTaskAdapter<K, V, Void> {
+ /** Key. */
+ private final K key;
+
+ /** Value. */
+ private final V val;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ * @param val Value.
+ */
+ public PutTask(String cacheName, boolean async, K key, V val) {
+ super(cacheName, async);
+ this.key = key;
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() {
+ cache().put(key, val);
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ContainsKeyTask<K> extends CacheTaskAdapter<K, Object, Boolean> {
+ /** Key. */
+ private final K key;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ */
+ public ContainsKeyTask(String cacheName, boolean async, K key) {
+ super(cacheName, async);
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean call() throws Exception {
+ return cache().containsKey(key);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ClearTask extends CacheTaskAdapter<Object, Object, Void> {
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ */
+ public ClearTask(String cacheName, boolean async) {
+ super(cacheName, async);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() {
+ cache().clear();
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class IteratorTask<K, V> extends CacheTaskAdapter<K, V, Collection<Entry<K, V>>> {
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ */
+ public IteratorTask(String cacheName, boolean async) {
+ super(cacheName, async);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<Entry<K, V>> call() throws Exception {
+ Collection<Entry<K, V>> res = new ArrayList<>();
+
+ for (Entry<K, V> o : cache())
+ res.add(o);
+
+ return res;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ReplaceTask<K, V> extends CacheTaskAdapter<K, V, Boolean> {
+ /** Key. */
+ private final K key;
+
+ /** Value. */
+ private final V val;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ * @param val Value.
+ */
+ public ReplaceTask(String cacheName, boolean async, K key, V val) {
+ super(cacheName, async);
+ this.key = key;
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean call() throws Exception {
+ return cache().replace(key, val);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class GetNameTask extends CacheTaskAdapter<Void, Void, String> {
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ */
+ public GetNameTask(String cacheName, boolean async) {
+ super(cacheName, async);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String call() throws Exception {
+ return cache().getName();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class RemoveTask<K> extends CacheTaskAdapter<K, Void, Boolean> {
+ /** Key. */
+ private final K key;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ */
+ public RemoveTask(String cacheName, boolean async, K key) {
+ super(cacheName, async);
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean call() throws Exception {
+ return cache().remove(key);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class PutAllTask<K, V> extends CacheTaskAdapter<K, V, Void> {
+ /** Map. */
+ private final Map<? extends K, ? extends V> map;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param map Map.
+ */
+ public PutAllTask(String cacheName, boolean async, Map<? extends K, ? extends V> map) {
+ super(cacheName, async);
+ this.map = map;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() {
+ cache().putAll(map);
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class RemoveAllKeysTask<K> extends CacheTaskAdapter<K, Void, Void> {
+ /** Keys. */
+ private final Set<? extends K> keys;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param keys Keys.
+ */
+ public RemoveAllKeysTask(String cacheName, boolean async, Set<? extends K> keys) {
+ super(cacheName, async);
+ this.keys = keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() {
+ cache().removeAll(keys);
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class GetAllTask<K, V> extends CacheTaskAdapter<K, V, Map<K, V>> {
+ /** Keys. */
+ private final Set<? extends K> keys;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param keys Keys.
+ */
+ public GetAllTask(String cacheName, boolean async, Set<? extends K> keys) {
+ super(cacheName, async);
+ this.keys = keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<K, V> call() throws Exception {
+ return cache().getAll(keys);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class GetAllOutTxTask<K, V> extends CacheTaskAdapter<K, V, Map<K, V>> {
+ /** Keys. */
+ private final Set<? extends K> keys;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param keys Keys.
+ */
+ public GetAllOutTxTask(String cacheName, boolean async, Set<? extends K> keys) {
+ super(cacheName, async);
+ this.keys = keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<K, V> call() throws Exception {
+ return cache().getAllOutTx(keys);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ContainsKeysTask<K, V> extends CacheTaskAdapter<K, V, Boolean> {
+ /** Keys. */
+ private final Set<? extends K> keys;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param keys Keys.
+ */
+ public ContainsKeysTask(String cacheName, boolean async, Set<? extends K> keys) {
+ super(cacheName, async);
+ this.keys = keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean call() throws Exception {
+ return cache().containsKeys(keys);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class GetAndPutTask<K, V> extends CacheTaskAdapter<K, V, V> {
+ /** Key. */
+ private final K key;
+
+ /** Value. */
+ private final V val;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ * @param val Value.
+ */
+ public GetAndPutTask(String cacheName, boolean async, K key, V val) {
+ super(cacheName, async);
+ this.key = key;
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V call() throws Exception {
+ return cache().getAndPut(key, val);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class PutIfAbsentTask<K, V> extends CacheTaskAdapter<K, V, Boolean> {
+ /** Key. */
+ private final K key;
+
+ /** Value. */
+ private final V val;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ * @param val Value.
+ */
+ public PutIfAbsentTask(String cacheName, boolean async, K key, V val) {
+ super(cacheName, async);
+ this.key = key;
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean call() throws Exception {
+ return cache().putIfAbsent(key, val);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class RemoveIfExistsTask<K, V> extends CacheTaskAdapter<K, V, Boolean> {
+ /** Key. */
+ private final K key;
+
+ /** Old value. */
+ private final V oldVal;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ * @param oldVal Old value.
+ */
+ public RemoveIfExistsTask(String cacheName, boolean async, K key, V oldVal) {
+ super(cacheName, async);
+ this.key = key;
+ this.oldVal = oldVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean call() throws Exception {
+ return cache().remove(key, oldVal);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class GetAndRemoveTask<K, V> extends CacheTaskAdapter<K, V, V> {
+ /** Key. */
+ private final K key;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ */
+ public GetAndRemoveTask(String cacheName, boolean async, K key) {
+ super(cacheName, async);
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V call() throws Exception {
+ return cache().getAndRemove(key);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ReplaceIfExistsTask<K, V> extends CacheTaskAdapter<K, V, Boolean> {
+ /** Key. */
+ private final K key;
+
+ /** Old value. */
+ private final V oldVal;
+
+ /** New value. */
+ private final V newVal;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ * @param oldVal Old value.
+ * @param newVal New value.
+ */
+ public ReplaceIfExistsTask(String cacheName, boolean async, K key, V oldVal, V newVal) {
+ super(cacheName, async);
+ this.key = key;
+ this.oldVal = oldVal;
+ this.newVal = newVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean call() throws Exception {
+ return cache().replace(key, oldVal, newVal);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class GetAndReplaceTask<K, V> extends CacheTaskAdapter<K, V, V> {
+ /** Key. */
+ private final K key;
+
+ /** Value. */
+ private final V val;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ * @param val Value.
+ */
+ public GetAndReplaceTask(String cacheName, boolean async, K key, V val) {
+ super(cacheName, async);
+ this.key = key;
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V call() throws Exception {
+ return cache().getAndReplace(key, val);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ClearKeyTask<K> extends CacheTaskAdapter<K, Void, Void> {
+ /** Key. */
+ private final K key;
+
+ /** Local. */
+ private final boolean loc;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ */
+ public ClearKeyTask(String cacheName, boolean async, boolean loc, K key) {
+ super(cacheName, async);
+ this.key = key;
+ this.loc = loc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() {
+ if (loc)
+ cache().localClear(key);
+ else
+ cache().clear(key);
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ClearAllKeys<K> extends CacheTaskAdapter<K, Void, Void> {
+ /** Keys. */
+ private final Set<? extends K> keys;
+
+ /** Local. */
+ private final boolean loc;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param keys Keys.
+ */
+ public ClearAllKeys(String cacheName, boolean async, boolean loc, Set<? extends K> keys) {
+ super(cacheName, async);
+ this.keys = keys;
+ this.loc = loc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() {
+ if (loc)
+ cache().localClearAll(keys);
+ else
+ cache().clearAll(keys);
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class InvokeTask<K, V, R> extends CacheTaskAdapter<K, V, R> {
+ /** Key. */
+ private final K key;
+
+ /** Processor. */
+ private final EntryProcessor<K, V, R> processor;
+
+ /** Args. */
+ private final Object[] args;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ * @param processor Processor.
+ * @param args Args.
+ */
+ public InvokeTask(String cacheName, boolean async, K key, EntryProcessor<K, V, R> processor,
+ Object[] args) {
+ super(cacheName, async);
+ this.args = args;
+ this.key = key;
+ this.processor = processor;
+ }
+
+ /** {@inheritDoc} */
+ @Override public R call() throws Exception {
+ return cache().invoke(key, processor, args);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class InvokeAllTask<K, V, T> extends CacheTaskAdapter<K, V, Map<K, EntryProcessorResult<T>>> {
+ /** Keys. */
+ private final Set<? extends K> keys;
+
+ /** Processor. */
+ private final EntryProcessor<K, V, T> processor;
+
+ /** Args. */
+ private final Object[] args;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param keys Keys.
+ * @param processor Processor.
+ * @param args Args.
+ */
+ public InvokeAllTask(String cacheName, boolean async, Set<? extends K> keys,
+ EntryProcessor<K, V, T> processor, Object[] args) {
+ super(cacheName, async);
+ this.args = args;
+ this.keys = keys;
+ this.processor = processor;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<K, EntryProcessorResult<T>> call() throws Exception {
+ return cache().invokeAll(keys, processor, args);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CloseTask extends CacheTaskAdapter<Void, Void, Void> {
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ */
+ public CloseTask(String cacheName, boolean async) {
+ super(cacheName, async);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() {
+ cache().close();
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class DestroyTask extends CacheTaskAdapter<Void, Void, Void> {
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ */
+ public DestroyTask(String cacheName, boolean async) {
+ super(cacheName, async);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() {
+ cache().destroy();
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class IsClosedTask extends CacheTaskAdapter<Void, Void, Boolean> {
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ */
+ public IsClosedTask(String cacheName, boolean async) {
+ super(cacheName, async);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean call() throws Exception {
+ return cache().isClosed();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class UnwrapTask<R> extends CacheTaskAdapter<Void, Void, R> {
+ /** Clazz. */
+ private final Class<R> clazz;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param clazz Clazz.
+ */
+ public UnwrapTask(String cacheName, boolean async, Class<R> clazz) {
+ super(cacheName, async);
+ this.clazz = clazz;
+ }
+
+ /** {@inheritDoc} */
+ @Override public R call() throws Exception {
+ return cache().unwrap(clazz);
+ }
+ }
+
+ /**
+ *
+ */
+ private static abstract class CacheTaskAdapter<K, V, R> implements IgniteCallable<R> {
+ /** Ignite. */
+ @IgniteInstanceResource
+ protected Ignite ignite;
+
+ /** Cache name. */
+ protected final String cacheName;
+
+ /** Async. */
+ protected final boolean async;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ */
+ public CacheTaskAdapter(String cacheName, boolean async) {
+ this.async = async;
+ this.cacheName = cacheName;
+ }
+
+ /**
+ * Returns cache instance.
+ */
+ protected IgniteCache<K, V> cache() {
+ IgniteCache<K, V> cache = ignite.cache(cacheName);
+
+ return async ? cache.withAsync() : cache;
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
index 05d6533..633e9d0 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
@@ -26,7 +26,6 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCluster;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.Ignition;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
@@ -36,6 +35,7 @@ import org.apache.ignite.internal.cluster.IgniteClusterEx;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.jetbrains.annotations.Nullable;
/**
@@ -43,9 +43,6 @@ import org.jetbrains.annotations.Nullable;
*/
@SuppressWarnings("TransientFieldInNonSerializableClass")
public class IgniteClusterProcessProxy implements IgniteClusterEx {
- /** Grid id. */
- private final UUID gridId;
-
/** Compute. */
private final transient IgniteCompute compute;
@@ -57,21 +54,11 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx {
*/
public IgniteClusterProcessProxy(IgniteProcessProxy proxy) {
this.proxy = proxy;
- gridId = proxy.getId();
compute = proxy.remoteCompute();
}
- /**
- * Returns cluster instance. Method to be called from closure at another JVM.
- *
- * @return Cache.
- */
- private IgniteClusterEx cluster() {
- return (IgniteClusterEx)Ignition.ignite(gridId).cluster();
- }
-
/** {@inheritDoc} */
- @Override public ClusterGroupEx forSubjectId(final UUID subjId) {
+ @Override public ClusterGroupEx forSubjectId(UUID subjId) {
throw new UnsupportedOperationException("Operation is not supported yet.");
}
@@ -83,11 +70,7 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx {
/** {@inheritDoc} */
@Override public ClusterNode localNode() {
- return compute.call(new IgniteCallable<ClusterNode>() {
- @Override public ClusterNode call() throws Exception {
- return cluster().localNode();
- }
- });
+ return compute.call(new LocalNodeTask());
}
/** {@inheritDoc} */
@@ -285,38 +268,22 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx {
/** {@inheritDoc} */
@Override public Collection<ClusterNode> nodes() {
- return compute.call(new IgniteCallable<Collection<ClusterNode>>() {
- @Override public Collection<ClusterNode> call() throws Exception {
- return cluster().nodes();
- }
- });
+ return compute.call(new NodesTask());
}
/** {@inheritDoc} */
- @Override public ClusterNode node(final UUID nid) {
- return compute.call(new IgniteCallable<ClusterNode>() {
- @Override public ClusterNode call() throws Exception {
- return cluster().node(nid);
- }
- });
+ @Override public ClusterNode node(UUID nid) {
+ return compute.call(new NodeTask(nid));
}
/** {@inheritDoc} */
@Override public ClusterNode node() {
- return compute.call(new IgniteCallable<ClusterNode>() {
- @Override public ClusterNode call() throws Exception {
- return cluster().node();
- }
- });
+ return compute.call(new NodeTask(null));
}
/** {@inheritDoc} */
@Override public Collection<String> hostNames() {
- return compute.call(new IgniteCallable<Collection<String>>() {
- @Override public Collection<String> call() throws Exception {
- return cluster().hostNames();
- }
- });
+ return compute.call(new HostNamesTask());
}
/** {@inheritDoc} */
@@ -333,4 +300,70 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx {
@Nullable @Override public IgniteFuture<?> clientReconnectFuture() {
throw new UnsupportedOperationException("Operation is not supported yet.");
}
+
+ /**
+ *
+ */
+ private static class LocalNodeTask extends ClusterTaskAdapter<ClusterNode> {
+ /** {@inheritDoc} */
+ @Override public ClusterNode call() throws Exception {
+ return cluster().localNode();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class NodesTask extends ClusterTaskAdapter<Collection<ClusterNode>> {
+ /** {@inheritDoc} */
+ @Override public Collection<ClusterNode> call() throws Exception {
+ return cluster().nodes();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class NodeTask extends ClusterTaskAdapter<ClusterNode> {
+ /** Node id. */
+ private final UUID nodeId;
+
+ /**
+ * @param nodeId Node id.
+ */
+ public NodeTask(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterNode call() throws Exception {
+ return nodeId == null ? cluster().node() : cluster().node(nodeId);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class HostNamesTask extends ClusterTaskAdapter<Collection<String>> {
+ /** {@inheritDoc} */
+ @Override public Collection<String> call() throws Exception {
+ return cluster().hostNames();
+ }
+ }
+
+ /**
+ *
+ */
+ private abstract static class ClusterTaskAdapter<R> implements IgniteCallable<R> {
+ /** Ignite. */
+ @IgniteInstanceResource
+ protected Ignite ignite;
+
+ /**
+ *
+ */
+ protected IgniteCluster cluster() {
+ return ignite.cluster();
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java
index 860f889..d5af81e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java
@@ -20,15 +20,16 @@ package org.apache.ignite.testframework.junits.multijvm;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.Ignition;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.events.Event;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.jetbrains.annotations.Nullable;
/**
@@ -39,23 +40,11 @@ public class IgniteEventsProcessProxy implements IgniteEvents {
/** Ignite proxy. */
private final transient IgniteProcessProxy igniteProxy;
- /** Grid id. */
- private final UUID gridId;
-
/**
* @param igniteProxy Ignite proxy.
*/
public IgniteEventsProcessProxy(IgniteProcessProxy igniteProxy) {
this.igniteProxy = igniteProxy;
-
- gridId = igniteProxy.getId();
- }
-
- /**
- * @return Events instance.
- */
- private IgniteEvents events() {
- return Ignition.ignite(gridId).events();
}
/** {@inheritDoc} */
@@ -105,11 +94,7 @@ public class IgniteEventsProcessProxy implements IgniteEvents {
/** {@inheritDoc} */
@Override public void localListen(final IgnitePredicate<? extends Event> lsnr, final int... types) {
- igniteProxy.remoteCompute().run(new IgniteRunnable() {
- @Override public void run() {
- events().localListen(lsnr, types);
- }
- });
+ igniteProxy.remoteCompute().run(new LocalListenTask(lsnr, types));
}
/** {@inheritDoc} */
@@ -151,4 +136,33 @@ public class IgniteEventsProcessProxy implements IgniteEvents {
@Override public <R> IgniteFuture<R> future() {
throw new UnsupportedOperationException("Operation isn't supported yet.");
}
+
+ /**
+ *
+ */
+ private static class LocalListenTask implements IgniteRunnable {
+ /** Ignite. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** Listener. */
+ private final IgnitePredicate<? extends Event> lsnr;
+
+ /** Types. */
+ private final int[] types;
+
+ /**
+ * @param lsnr Listener.
+ * @param types Types.
+ */
+ public LocalListenTask(IgnitePredicate<? extends Event> lsnr, int[] types) {
+ this.lsnr = lsnr;
+ this.types = types;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ ignite.events().localListen(lsnr, types);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
index f46e8e9..0597eda 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
@@ -25,13 +25,12 @@ import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStream;
-import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
@@ -39,8 +38,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfT
import org.apache.ignite.internal.util.GridJavaProcess;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
+import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.junits.IgniteTestResources;
import sun.jvmstat.monitor.HostIdentifier;
import sun.jvmstat.monitor.MonitoredHost;
import sun.jvmstat.monitor.MonitoredVm;
@@ -99,30 +99,6 @@ public class IgniteNodeRunner {
public static String storeToFile(IgniteConfiguration cfg) throws IOException {
String fileName = IGNITE_CONFIGURATION_FILE + cfg.getNodeId();
- // Check marshaller configuration, because read configuration method expect specific marshaller.
- if (cfg.getMarshaller() instanceof OptimizedMarshaller){
- OptimizedMarshaller marsh = (OptimizedMarshaller)cfg.getMarshaller();
-
- try {
- Field isRequireFiled = marsh.getClass().getDeclaredField("requireSer");
-
- isRequireFiled.setAccessible(true);
-
- boolean isRequireSer = isRequireFiled.getBoolean(marsh);
-
- if (isRequireSer)
- throw new UnsupportedOperationException("Unsupported marshaller configuration. " +
- "readCfgFromFileAndDeleteFile method expect " + OptimizedMarshaller.class.getSimpleName() +
- "with requireSerializeble flag in 'false'.");
- }
- catch (NoSuchFieldException|IllegalAccessException e) {
- throw new IgniteException("Failed to check filed of " + OptimizedMarshaller.class.getSimpleName(), e);
- }
- }
- else
- throw new UnsupportedOperationException("Unsupported marshaller. " +
- "readCfgFromFileAndDeleteFile method expect " + OptimizedMarshaller.class.getSimpleName());
-
try(OutputStream out = new BufferedOutputStream(new FileOutputStream(fileName))) {
cfg.setMBeanServer(null);
cfg.setMarshaller(null);
@@ -143,11 +119,16 @@ public class IgniteNodeRunner {
* @throws IOException If failed.
* @see #storeToFile(IgniteConfiguration)
*/
- private static IgniteConfiguration readCfgFromFileAndDeleteFile(String fileName) throws IOException {
+ private static IgniteConfiguration readCfgFromFileAndDeleteFile(String fileName)
+ throws IOException, IgniteCheckedException {
try(BufferedReader cfgReader = new BufferedReader(new FileReader(fileName))) {
IgniteConfiguration cfg = (IgniteConfiguration)new XStream().fromXML(cfgReader);
- cfg.setMarshaller(new OptimizedMarshaller(false));
+ Marshaller marsh = IgniteTestResources.getMarshaller();
+
+ cfg.setMarshaller(marsh);
+
+ X.println("Configured marshaller class: " + marsh.getClass().getName());
TcpDiscoverySpi disco = new TcpDiscoverySpi();
disco.setIpFinder(GridCacheAbstractFullApiSelfTest.LOCAL_IP_FINDER);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
index ec7dab7..aa1d470 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
@@ -41,13 +41,11 @@ import org.apache.ignite.IgniteFileSystem;
import org.apache.ignite.IgniteIllegalStateException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteMessaging;
-import org.apache.ignite.internal.portable.api.IgnitePortables;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.IgniteScheduler;
import org.apache.ignite.IgniteServices;
import org.apache.ignite.IgniteSet;
import org.apache.ignite.IgniteTransactions;
-import org.apache.ignite.Ignition;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
@@ -62,6 +60,7 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.IgniteClusterEx;
+import org.apache.ignite.internal.portable.api.IgnitePortables;
import org.apache.ignite.internal.processors.cache.GridCacheUtilityKey;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.hadoop.Hadoop;
@@ -78,6 +77,8 @@ import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.plugin.IgnitePlugin;
import org.apache.ignite.plugin.PluginNotFoundException;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.testframework.junits.IgniteTestResources;
import org.jetbrains.annotations.Nullable;
/**
@@ -88,6 +89,9 @@ public class IgniteProcessProxy implements IgniteEx {
/** Grid proxies. */
private static final transient ConcurrentMap<String, IgniteProcessProxy> gridProxies = new ConcurrentHashMap<>();
+ /** Property that specify alternative {@code JAVA_HOME}. */
+ private static final String TEST_MULTIJVM_JAVA_HOME = "test.multijvm.java.home";
+
/** Jvm process with ignite instance. */
private final transient GridJavaProcess proc;
@@ -108,7 +112,7 @@ public class IgniteProcessProxy implements IgniteEx {
* @param log Logger.
* @param locJvmGrid Local JVM grid.
*/
- public IgniteProcessProxy(final IgniteConfiguration cfg, final IgniteLogger log, final Ignite locJvmGrid)
+ public IgniteProcessProxy(IgniteConfiguration cfg, IgniteLogger log, Ignite locJvmGrid)
throws Exception {
this.cfg = cfg;
this.locJvmGrid = locJvmGrid;
@@ -121,7 +125,9 @@ public class IgniteProcessProxy implements IgniteEx {
Collection<String> filteredJvmArgs = new ArrayList<>();
for (String arg : jvmArgs) {
- if(!arg.toLowerCase().startsWith("-agentlib"))
+ if(arg.startsWith("-Xmx") || arg.startsWith("-Xms") ||
+ arg.startsWith("-cp") || arg.startsWith("-classpath") ||
+ arg.startsWith("-D" + IgniteTestResources.MARSH_CLASS_NAME))
filteredJvmArgs.add(arg);
}
@@ -130,7 +136,7 @@ public class IgniteProcessProxy implements IgniteEx {
locJvmGrid.events().localListen(new NodeStartedListener(id, rmtNodeStartedLatch), EventType.EVT_NODE_JOINED);
proc = GridJavaProcess.exec(
- IgniteNodeRunner.class,
+ IgniteNodeRunner.class.getCanonicalName(),
cfgFileName, // Params.
this.log,
// Optional closure to be called each time wrapped process prints line to system.out or system.err.
@@ -140,6 +146,7 @@ public class IgniteProcessProxy implements IgniteEx {
}
},
null,
+ System.getProperty(TEST_MULTIJVM_JAVA_HOME),
filteredJvmArgs, // JVM Args.
System.getProperty("surefire.test.class.path")
);
@@ -149,11 +156,7 @@ public class IgniteProcessProxy implements IgniteEx {
IgniteProcessProxy prevVal = gridProxies.putIfAbsent(cfg.getGridName(), this);
if (prevVal != null) {
- remoteCompute().run(new IgniteRunnable() {
- @Override public void run() {
- G.stop(cfg.getGridName(), true);
- }
- });
+ remoteCompute().run(new StopGridTask(cfg.getGridName(), true));
throw new IllegalStateException("There was found instance assotiated with " + cfg.getGridName() +
", instance= " + prevVal + ". New started node was stopped.");
@@ -208,30 +211,17 @@ public class IgniteProcessProxy implements IgniteEx {
* @param gridName Grid name.
* @param cancel Cacnel flag.
*/
- public static void stop(final String gridName, final boolean cancel) {
+ public static void stop(String gridName, boolean cancel) {
IgniteProcessProxy proxy = gridProxies.get(gridName);
if (proxy != null) {
- proxy.remoteCompute().run(new IgniteRunnable() {
- @Override public void run() {
- G.stop(gridName, cancel);
- }
- });
+ proxy.remoteCompute().run(new StopGridTask(gridName, cancel));
gridProxies.remove(gridName, proxy);
}
}
/**
- * For usage in closures.
- *
- * @return Ignite instance.
- */
- private Ignite igniteById() {
- return Ignition.ignite(id);
- }
-
- /**
* @param locNodeId ID of local node the requested grid instance is managing.
* @return An instance of named grid. This method never returns {@code null}.
* @throws IgniteIllegalStateException Thrown if grid was not properly initialized or grid instance was stopped or
@@ -357,11 +347,7 @@ public class IgniteProcessProxy implements IgniteEx {
/** {@inheritDoc} */
@Override public ClusterNode localNode() {
- return remoteCompute().call(new IgniteCallable<ClusterNode>() {
- @Override public ClusterNode call() throws Exception {
- return ((IgniteEx)Ignition.ignite(id)).localNode();
- }
- });
+ return remoteCompute().call(new NodeTask());
}
/** {@inheritDoc} */
@@ -467,7 +453,10 @@ public class IgniteProcessProxy implements IgniteEx {
}
/** {@inheritDoc} */
- @Override public <K, V> IgniteCache<K, V> createNearCache(@Nullable String cacheName, NearCacheConfiguration<K, V> nearCfg) {
+ @Override public <K, V> IgniteCache<K, V> createNearCache(
+ @Nullable String cacheName,
+ NearCacheConfiguration<K, V> nearCfg)
+ {
throw new UnsupportedOperationException("Operation isn't supported yet.");
}
@@ -508,7 +497,8 @@ public class IgniteProcessProxy implements IgniteEx {
}
/** {@inheritDoc} */
- @Override public IgniteAtomicSequence atomicSequence(String name, long initVal, boolean create) throws IgniteException {
+ @Override public IgniteAtomicSequence atomicSequence(String name, long initVal, boolean create)
+ throws IgniteException {
throw new UnsupportedOperationException("Operation isn't supported yet.");
}
@@ -524,8 +514,12 @@ public class IgniteProcessProxy implements IgniteEx {
}
/** {@inheritDoc} */
- @Override public <T, S> IgniteAtomicStamped<T, S> atomicStamped(String name, @Nullable T initVal, @Nullable S initStamp,
- boolean create) throws IgniteException {
+ @Override public <T, S> IgniteAtomicStamped<T, S> atomicStamped(
+ String name,
+ @Nullable T initVal,
+ @Nullable S initStamp,
+ boolean create) throws IgniteException
+ {
throw new UnsupportedOperationException("Operation isn't supported yet.");
}
@@ -572,11 +566,7 @@ public class IgniteProcessProxy implements IgniteEx {
}
}, EventType.EVT_NODE_LEFT, EventType.EVT_NODE_FAILED);
- compute().run(new IgniteRunnable() {
- @Override public void run() {
- igniteById().close();
- }
- });
+ compute().run(new StopGridTask(localJvmGrid().name(), true));
try {
assert U.await(rmtNodeStoppedLatch, 15, TimeUnit.SECONDS) : "NodeId=" + id;
@@ -616,4 +606,43 @@ public class IgniteProcessProxy implements IgniteEx {
return locJvmGrid.compute(grp);
}
+
+ /**
+ *
+ */
+ private static class StopGridTask implements IgniteRunnable {
+ /** Grid name. */
+ private final String gridName;
+
+ /** Cancel. */
+ private final boolean cancel;
+
+ /**
+ * @param gridName Grid name.
+ * @param cancel Cancel.
+ */
+ public StopGridTask(String gridName, boolean cancel) {
+ this.gridName = gridName;
+ this.cancel = cancel;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ G.stop(gridName, cancel);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class NodeTask implements IgniteCallable<ClusterNode> {
+ /** Ignite. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public ClusterNode call() throws Exception {
+ return ((IgniteEx)ignite).localNode();
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationP2PTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationP2PTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationP2PTest.java
index 5298dd4..3777154 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationP2PTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationP2PTest.java
@@ -103,6 +103,7 @@ public class CacheConfigurationP2PTest extends GridCommonAbstractTest {
}
},
null,
+ null,
jvmArgs,
null
);
@@ -119,6 +120,7 @@ public class CacheConfigurationP2PTest extends GridCommonAbstractTest {
}
},
null,
+ null,
jvmArgs,
null
);
@@ -139,6 +141,7 @@ public class CacheConfigurationP2PTest extends GridCommonAbstractTest {
}
},
null,
+ null,
jvmArgs,
cp
);