You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dm...@apache.org on 2015/10/09 13:24:03 UTC
[1/3] ignite git commit: ignite-1526: full support of IBM JDK by
Ignite
Repository: ignite
Updated Branches:
refs/heads/ignite-1.4.2 4f95be256 -> 6afc2fc3c
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
index dfbb0ae..04d7893 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
@@ -22,7 +22,6 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.locks.Lock;
import javax.cache.CacheException;
import javax.cache.CacheManager;
@@ -35,7 +34,6 @@ import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCompute;
-import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cache.CachePeekMode;
@@ -47,8 +45,8 @@ import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.mxbean.CacheMetricsMXBean;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.jetbrains.annotations.Nullable;
/**
@@ -62,9 +60,6 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
/** Cache name. */
private final String cacheName;
- /** Grid id. */
- private final UUID gridId;
-
/** With async. */
private final boolean isAsync;
@@ -81,31 +76,16 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
/**
* @param name Name.
- * @param async
+ * @param async Async flag.
* @param proxy Ignite Process Proxy.
*/
public IgniteCacheProcessProxy(String name, boolean async, IgniteProcessProxy proxy) {
cacheName = name;
isAsync = async;
- gridId = proxy.getId();
igniteProxy = proxy;
compute = proxy.remoteCompute();
}
- /**
- * Returns cache instance. Method to be called from closure at another JVM.
- *
- * @return Cache.
- */
- private IgniteCache<Object, Object> cache() {
- IgniteCache cache = Ignition.ignite(gridId).cache(cacheName);
-
- if (isAsync)
- cache = cache.withAsync();
-
- return cache;
- }
-
/** {@inheritDoc} */
@Override public IgniteCache<K, V> withAsync() {
return new IgniteCacheProcessProxy<>(cacheName, true, igniteProxy);
@@ -123,14 +103,8 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
}
/** {@inheritDoc} */
- @Override public <C extends Configuration<K, V>> C getConfiguration(final Class<C> clazz) {
- final Class cl = clazz;
-
- return (C)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().getConfiguration(cl);
- }
- });
+ @Override public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) {
+ return compute.call(new GetConfigurationTask<>(cacheName, isAsync, clazz));
}
/** {@inheritDoc} */
@@ -148,33 +122,26 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
throw new UnsupportedOperationException("Method should be supported.");
}
+ /** {@inheritDoc} */
@Override public IgniteCache<K, V> withNoRetries() {
throw new UnsupportedOperationException("Method should be supported.");
}
/** {@inheritDoc} */
- @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException {
+ @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args)
+ throws CacheException {
throw new UnsupportedOperationException("Method should be supported.");
}
/** {@inheritDoc} */
- @Override public void localLoadCache(@Nullable final IgniteBiPredicate<K, V> p, @Nullable final Object... args) throws CacheException {
- final IgniteBiPredicate pCopy = p;
-
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- cache().localLoadCache(pCopy, args);
- }
- });
+ @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args)
+ throws CacheException {
+ compute.call(new LocalLoadCacheTask<>(cacheName, isAsync, p, args));
}
/** {@inheritDoc} */
- @Override public V getAndPutIfAbsent(final K key, final V val) throws CacheException {
- return (V)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().getAndPutIfAbsent(key, val);
- }
- });
+ @Override public V getAndPutIfAbsent(K key, V val) throws CacheException {
+ return compute.call(new GetAndPutIfAbsentTask<>(cacheName, isAsync, key, val));
}
/** {@inheritDoc} */
@@ -188,12 +155,8 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
}
/** {@inheritDoc} */
- @Override public boolean isLocalLocked(final K key, final boolean byCurrThread) {
- return compute.call(new IgniteCallable<Boolean>() {
- @Override public Boolean call() throws Exception {
- return cache().isLocalLocked(key, byCurrThread);
- }
- });
+ @Override public boolean isLocalLocked(K key, boolean byCurrThread) {
+ return compute.call(new IsLocalLockedTask<>(cacheName, isAsync, key, byCurrThread));
}
/** {@inheritDoc} */
@@ -202,18 +165,8 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public Iterable<Entry<K, V>> localEntries(final CachePeekMode... peekModes) throws CacheException {
- return (Iterable<Entry<K, V>>)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- Collection<Entry> res = new ArrayList<>();
-
- for (Entry e : cache().localEntries(peekModes))
- res.add(e);
-
- return res;
- }
- });
+ @Override public Iterable<Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException {
+ return compute.call(new LocalEntriesTask<K, V>(cacheName, isAsync, peekModes));
}
/** {@inheritDoc} */
@@ -222,21 +175,13 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
}
/** {@inheritDoc} */
- @Override public void localEvict(final Collection<? extends K> keys) {
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- cache().localEvict(keys);
- }
- });
+ @Override public void localEvict(Collection<? extends K> keys) {
+ compute.call(new LocalEvictTask<>(cacheName, isAsync, keys));
}
/** {@inheritDoc} */
- @Override public V localPeek(final K key, final CachePeekMode... peekModes) {
- return (V)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().localPeek(key, peekModes);
- }
- });
+ @Override public V localPeek(K key, CachePeekMode... peekModes) {
+ return compute.call(new LocalPeekTask<K, V>(cacheName, isAsync, key, peekModes));
}
/** {@inheritDoc} */
@@ -245,274 +190,160 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
}
/** {@inheritDoc} */
- @Override public int size(final CachePeekMode... peekModes) throws CacheException {
- return (int)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().size(peekModes);
- }
- });
+ @Override public int size(CachePeekMode... peekModes) throws CacheException {
+ return compute.call(new SizeTask(cacheName, isAsync, peekModes, false));
}
/** {@inheritDoc} */
- @Override public int localSize(final CachePeekMode... peekModes) {
- return (int)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().localSize(peekModes);
- }
- });
+ @Override public int localSize(CachePeekMode... peekModes) {
+ return compute.call(new SizeTask(cacheName, isAsync, peekModes, true));
}
/** {@inheritDoc} */
- @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
- Object... args) {
+ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(
+ Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
+ Object... args)
+ {
throw new UnsupportedOperationException("Method should be supported.");
}
/** {@inheritDoc} */
- @Override public V get(final K key) {
- return (V)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().get(key);
- }
- });
+ @Override public V get(K key) {
+ return compute.call(new GetTask<K, V>(cacheName, isAsync, key));
}
/** {@inheritDoc} */
- @Override public Map<K, V> getAll(final Set<? extends K> keys) {
- return (Map<K, V>)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().getAll(keys);
- }
- });
+ @Override public Map<K, V> getAll(Set<? extends K> keys) {
+ return compute.call(new GetAllTask<K, V>(cacheName, isAsync, keys));
}
- @Override public Map<K, V> getAllOutTx(final Set<? extends K> keys) {
- return (Map<K, V>)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().getAllOutTx(keys);
- }
- });
+ /** {@inheritDoc} */
+ @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) {
+ return compute.call(new GetAllOutTxTask<K, V>(cacheName, isAsync, keys));
}
/** {@inheritDoc} */
- @Override public boolean containsKey(final K key) {
- return (boolean)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().containsKey(key);
- }
- });
+ @Override public boolean containsKey(K key) {
+ return compute.call(new ContainsKeyTask<>(cacheName, isAsync, key));
}
/** {@inheritDoc} */
- @Override public void loadAll(Set<? extends K> keys, boolean replaceExistingValues, CompletionListener completionLsnr) {
+ @Override public void loadAll(Set<? extends K> keys, boolean replaceExistVals, CompletionListener completionLsnr) {
throw new UnsupportedOperationException("Oparetion can't be supported automatically.");
}
/** {@inheritDoc} */
- @Override public boolean containsKeys(final Set<? extends K> keys) {
- return (boolean)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().containsKeys(keys);
- }
- });
+ @Override public boolean containsKeys(Set<? extends K> keys) {
+ return compute.call(new ContainsKeysTask<>(cacheName, isAsync, keys));
}
/** {@inheritDoc} */
- @Override public void put(final K key, final V val) {;
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- cache().put(key, val);
- }
- });
+ @Override public void put(K key, V val) {
+ compute.call(new PutTask<>(cacheName, isAsync, key, val));
}
/** {@inheritDoc} */
- @Override public V getAndPut(final K key, final V val) {
- return (V)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().getAndPut(key, val);
- }
- });
+ @Override public V getAndPut(K key, V val) {
+ return compute.call(new GetAndPutTask<>(cacheName, isAsync, key, val));
}
/** {@inheritDoc} */
- @Override public void putAll(final Map<? extends K, ? extends V> map) {
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- cache().putAll(map);
- }
- });
+ @Override public void putAll(Map<? extends K, ? extends V> map) {
+ compute.call(new PutAllTask<>(cacheName, isAsync, map));
}
/** {@inheritDoc} */
- @Override public boolean putIfAbsent(final K key, final V val) {
- return (boolean)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().putIfAbsent(key, val);
- }
- });
+ @Override public boolean putIfAbsent(K key, V val) {
+ return compute.call(new PutIfAbsentTask<>(cacheName, isAsync, key, val));
}
/** {@inheritDoc} */
- @Override public boolean remove(final K key) {
- return (boolean)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().remove(key);
- }
- });
+ @Override public boolean remove(K key) {
+ return compute.call(new RemoveTask<>(cacheName, isAsync, key));
}
/** {@inheritDoc} */
- @Override public boolean remove(final K key, final V oldVal) {
- return (boolean)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().remove(key, oldVal);
- }
- });
+ @Override public boolean remove(K key, V oldVal) {
+ return compute.call(new RemoveIfExistsTask<>(cacheName, isAsync, key, oldVal));
}
/** {@inheritDoc} */
- @Override public V getAndRemove(final K key) {
- return (V)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().getAndRemove(key);
- }
- });
+ @Override public V getAndRemove(K key) {
+ return compute.call(new GetAndRemoveTask<K, V>(cacheName, isAsync, key));
}
/** {@inheritDoc} */
- @Override public boolean replace(final K key, final V oldVal, final V newVal) {
- return (boolean)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().replace(key, oldVal, newVal);
- }
- });
+ @Override public boolean replace(K key, V oldVal, V newVal) {
+ return compute.call(new ReplaceIfExistsTask<>(cacheName, isAsync, key, oldVal, newVal));
}
/** {@inheritDoc} */
- @Override public boolean replace(final K key, final V val) {
- return (boolean)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().replace(key, val);
- }
- });
+ @Override public boolean replace(K key, V val) {
+ return compute.call(new ReplaceTask<>(cacheName, isAsync, key, val));
}
/** {@inheritDoc} */
- @Override public V getAndReplace(final K key, final V val) {
- return (V)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().getAndReplace(key, val);
- }
- });
+ @Override public V getAndReplace(K key, V val) {
+ return compute.call(new GetAndReplaceTask<>(cacheName, isAsync, key, val));
}
/** {@inheritDoc} */
- @Override public void removeAll(final Set<? extends K> keys) {
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- cache().removeAll(keys);
- }
- });
+ @Override public void removeAll(Set<? extends K> keys) {
+ compute.call(new RemoveAllKeysTask<>(cacheName, isAsync, keys));
}
/** {@inheritDoc} */
@Override public void removeAll() {
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- IgniteCache<Object, Object> cache = cache();
-
- cache.removeAll();
-
- if (isAsync)
- cache.future().get();
- }
- });
+ compute.call(new RemoveAllTask<K, V>(cacheName, isAsync));
}
/** {@inheritDoc} */
@Override public void clear() {
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- cache().clear();
- }
- });
+ compute.call(new ClearTask(cacheName, isAsync));
}
/** {@inheritDoc} */
- @Override public void clear(final K key) {
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- cache().clear(key);
- }
- });
+ @Override public void clear(K key) {
+ compute.call(new ClearKeyTask<>(cacheName, isAsync, false, key));
}
/** {@inheritDoc} */
- @Override public void clearAll(final Set<? extends K> keys) {
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- cache().clearAll(keys);
- }
- });
+ @Override public void clearAll(Set<? extends K> keys) {
+ compute.call(new ClearAllKeys<>(cacheName, isAsync, false, keys));
}
/** {@inheritDoc} */
- @Override public void localClear(final K key) {
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- cache().localClear(key);
- }
- });
+ @Override public void localClear(K key) {
+ compute.call(new ClearKeyTask<>(cacheName, isAsync, true, key));
}
/** {@inheritDoc} */
- @Override public void localClearAll(final Set<? extends K> keys) {
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- cache().localClearAll(keys);
- }
- });
+ @Override public void localClearAll(Set<? extends K> keys) {
+ compute.call(new ClearAllKeys<>(cacheName, isAsync, true, keys));
}
/** {@inheritDoc} */
- @Override public <T> T invoke(final K key, final EntryProcessor<K, V, T> entryProcessor, final Object... arguments) {
- return (T)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().invoke(key,
- (EntryProcessor<Object, Object, Object>)entryProcessor, arguments);
- }
- });
+ @Override public <T> T invoke(K key, EntryProcessor<K, V, T> processor, Object... args) {
+ return compute.call(new InvokeTask<>(cacheName, isAsync, key, processor, args));
}
/** {@inheritDoc} */
- @Override public <T> T invoke(final K key, final CacheEntryProcessor<K, V, T> entryProcessor, final Object... arguments) {
- return (T)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().invoke(key,
- (CacheEntryProcessor<Object, Object, Object>)entryProcessor, arguments);
- }
- });
+ @Override public <T> T invoke(K key, CacheEntryProcessor<K, V, T> processor, Object... args) {
+ return compute.call(new InvokeTask<>(cacheName, isAsync, key, processor, args));
}
/** {@inheritDoc} */
- @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(final Set<? extends K> keys, final EntryProcessor<K, V, T> entryProcessor,
- final Object... args) {
- return (Map<K, EntryProcessorResult<T>>)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().invokeAll(keys,
- (EntryProcessor<Object, Object, Object>)entryProcessor, args);
- }
- });
+ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(
+ Set<? extends K> keys,
+ EntryProcessor<K, V, T> processor,
+ Object... args)
+ {
+ return compute.call(new InvokeAllTask<>(cacheName, isAsync, keys, processor, args));
}
/** {@inheritDoc} */
@Override public String getName() {
- return (String)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().getName();
- }
- });
+ return compute.call(new GetNameTask(cacheName, isAsync));
}
/** {@inheritDoc} */
@@ -522,72 +353,47 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
/** {@inheritDoc} */
@Override public void close() {
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- cache().close();
- }
- });
+ compute.call(new CloseTask(cacheName, isAsync));
}
/** {@inheritDoc} */
@Override public void destroy() {
- compute.run(new IgniteRunnable() {
- @Override public void run() {
- cache().destroy();
- }
- });
+ compute.call(new DestroyTask(cacheName, isAsync));
}
/** {@inheritDoc} */
@Override public boolean isClosed() {
- return (boolean)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().isClosed();
- }
- });
+ return compute.call(new IsClosedTask(cacheName, isAsync));
}
/** {@inheritDoc} */
- @Override public <T> T unwrap(final Class<T> clazz) {
+ @SuppressWarnings("unchecked")
+ @Override public <T> T unwrap(Class<T> clazz) {
if (Ignite.class.equals(clazz))
return (T)igniteProxy;
try {
- return (T)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return cache().unwrap(clazz);
- }
- });
+ return compute.call(new UnwrapTask<>(cacheName, isAsync, clazz));
}
catch (Exception e) {
- throw new IllegalArgumentException("Looks like class " + clazz + " is unmarshallable. Exception type:" + e.getClass(), e);
+ throw new IllegalArgumentException("Looks like class " + clazz +
+ " is unmarshallable. Exception type:" + e.getClass(), e);
}
}
/** {@inheritDoc} */
- @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {
+ @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
throw new UnsupportedOperationException("Method should be supported.");
}
/** {@inheritDoc} */
- @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {
+ @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
throw new UnsupportedOperationException("Method should be supported.");
}
/** {@inheritDoc} */
@Override public Iterator<Entry<K, V>> iterator() {
- final Collection<Entry<K, V>> col = (Collection<Entry<K, V>>)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- Collection res = new ArrayList();
-
- for (Object o : cache())
- res.add(o);
-
- return res;
- }
- });
-
- return col.iterator();
+ return compute.call(new IteratorTask<K, V>(cacheName, isAsync)).iterator();
}
/** {@inheritDoc} */
@@ -615,4 +421,968 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
@Override public CacheMetricsMXBean mxBean() {
throw new UnsupportedOperationException("Method should be supported.");
}
-}
\ No newline at end of file
+
+ /**
+ *
+ */
+ private static class GetConfigurationTask<K, V, C extends Configuration<K, V>> extends CacheTaskAdapter<K, V, C> {
+ /** Clazz. */
+ private final Class<C> clazz;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param clazz Clazz.
+ */
+ public GetConfigurationTask(String cacheName, boolean async, Class<C> clazz) {
+ super(cacheName, async);
+ this.clazz = clazz;
+ }
+
+ /** {@inheritDoc} */
+ @Override public C call() throws Exception {
+ return cache().getConfiguration(clazz);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class LocalLoadCacheTask<K, V> extends CacheTaskAdapter<K, V, Void> {
+ /** Predicate. */
+ private final IgniteBiPredicate<K, V> p;
+
+ /** Args. */
+ private final Object[] args;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param p P.
+ * @param args Args.
+ */
+ public LocalLoadCacheTask(String cacheName, boolean async, IgniteBiPredicate<K, V> p, Object[] args) {
+ super(cacheName, async);
+ this.p = p;
+ this.args = args;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() {
+ cache().localLoadCache(p, args);
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class GetAndPutIfAbsentTask<K, V> extends CacheTaskAdapter<K, V, V> {
+ /** Key. */
+ private final K key;
+
+ /** Value. */
+ private final V val;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ * @param val Value.
+ */
+ public GetAndPutIfAbsentTask(String cacheName, boolean async, K key, V val) {
+ super(cacheName, async);
+ this.key = key;
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V call() throws Exception {
+ return cache().getAndPutIfAbsent(key, val);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class IsLocalLockedTask<K> extends CacheTaskAdapter<K, Void, Boolean> {
+ /** Key. */
+ private final K key;
+
+ /** By current thread. */
+ private final boolean byCurrThread;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ * @param byCurrThread By current thread.
+ */
+ public IsLocalLockedTask(String cacheName, boolean async, K key, boolean byCurrThread) {
+ super(cacheName, async);
+ this.key = key;
+ this.byCurrThread = byCurrThread;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean call() throws Exception {
+ return cache().isLocalLocked(key, byCurrThread);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class LocalEntriesTask<K, V> extends CacheTaskAdapter<K, V, Iterable<Entry<K, V>>> {
+ /** Peek modes. */
+ private final CachePeekMode[] peekModes;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param peekModes Peek modes.
+ */
+ public LocalEntriesTask(String cacheName, boolean async, CachePeekMode[] peekModes) {
+ super(cacheName, async);
+ this.peekModes = peekModes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterable<Entry<K, V>> call() throws Exception {
+ Collection<Entry<K, V>> res = new ArrayList<>();
+
+ for (Entry<K, V> e : cache().localEntries(peekModes))
+ res.add(new CacheEntryImpl<>(e.getKey(), e.getValue()));
+
+ return res;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class LocalEvictTask<K> extends CacheTaskAdapter<K, Void, Void> {
+ /** Keys. */
+ private final Collection<? extends K> keys;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param keys Keys.
+ */
+ public LocalEvictTask(String cacheName, boolean async, Collection<? extends K> keys) {
+ super(cacheName, async);
+ this.keys = keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() {
+ cache().localEvict(keys);
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class LocalPeekTask<K, V> extends CacheTaskAdapter<K, V, V> {
+ /** Key. */
+ private final K key;
+
+ /** Peek modes. */
+ private final CachePeekMode[] peekModes;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ * @param peekModes Peek modes.
+ */
+ public LocalPeekTask(String cacheName, boolean async, K key, CachePeekMode[] peekModes) {
+ super(cacheName, async);
+ this.key = key;
+ this.peekModes = peekModes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V call() throws Exception {
+ return cache().localPeek(key, peekModes);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class SizeTask extends CacheTaskAdapter<Void, Void, Integer> {
+ /** Peek modes. */
+ private final CachePeekMode[] peekModes;
+
+ /** Local. */
+ private final boolean loc;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param peekModes Peek modes.
+ * @param loc Local.
+ */
+ public SizeTask(String cacheName, boolean async, CachePeekMode[] peekModes, boolean loc) {
+ super(cacheName, async);
+ this.loc = loc;
+ this.peekModes = peekModes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Integer call() throws Exception {
+ return loc ? cache().localSize(peekModes) : cache().size(peekModes);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class GetTask<K, V> extends CacheTaskAdapter<K, V, V> {
+ /** Key. */
+ private final K key;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ */
+ public GetTask(String cacheName, boolean async, K key) {
+ super(cacheName, async);
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V call() throws Exception {
+ return cache().get(key);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class RemoveAllTask<K, V> extends CacheTaskAdapter<K, V, Void> {
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ */
+ public RemoveAllTask(String cacheName, boolean async) {
+ super(cacheName, async);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() {
+ IgniteCache<K, V> cache = cache();
+
+ cache.removeAll();
+
+ if (async)
+ cache.future().get();
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class PutTask<K, V> extends CacheTaskAdapter<K, V, Void> {
+ /** Key. */
+ private final K key;
+
+ /** Value. */
+ private final V val;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ * @param val Value.
+ */
+ public PutTask(String cacheName, boolean async, K key, V val) {
+ super(cacheName, async);
+ this.key = key;
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() {
+ cache().put(key, val);
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ContainsKeyTask<K> extends CacheTaskAdapter<K, Object, Boolean> {
+ /** Key. */
+ private final K key;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ */
+ public ContainsKeyTask(String cacheName, boolean async, K key) {
+ super(cacheName, async);
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean call() throws Exception {
+ return cache().containsKey(key);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ClearTask extends CacheTaskAdapter<Object, Object, Void> {
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ */
+ public ClearTask(String cacheName, boolean async) {
+ super(cacheName, async);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() {
+ cache().clear();
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class IteratorTask<K, V> extends CacheTaskAdapter<K, V, Collection<Entry<K, V>>> {
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ */
+ public IteratorTask(String cacheName, boolean async) {
+ super(cacheName, async);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<Entry<K, V>> call() throws Exception {
+ Collection<Entry<K, V>> res = new ArrayList<>();
+
+ for (Entry<K, V> o : cache())
+ res.add(o);
+
+ return res;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ReplaceTask<K, V> extends CacheTaskAdapter<K, V, Boolean> {
+ /** Key. */
+ private final K key;
+
+ /** Value. */
+ private final V val;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ * @param val Value.
+ */
+ public ReplaceTask(String cacheName, boolean async, K key, V val) {
+ super(cacheName, async);
+ this.key = key;
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean call() throws Exception {
+ return cache().replace(key, val);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class GetNameTask extends CacheTaskAdapter<Void, Void, String> {
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ */
+ public GetNameTask(String cacheName, boolean async) {
+ super(cacheName, async);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String call() throws Exception {
+ return cache().getName();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class RemoveTask<K> extends CacheTaskAdapter<K, Void, Boolean> {
+ /** Key. */
+ private final K key;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ */
+ public RemoveTask(String cacheName, boolean async, K key) {
+ super(cacheName, async);
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean call() throws Exception {
+ return cache().remove(key);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class PutAllTask<K, V> extends CacheTaskAdapter<K, V, Void> {
+ /** Map. */
+ private final Map<? extends K, ? extends V> map;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param map Map.
+ */
+ public PutAllTask(String cacheName, boolean async, Map<? extends K, ? extends V> map) {
+ super(cacheName, async);
+ this.map = map;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() {
+ cache().putAll(map);
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class RemoveAllKeysTask<K> extends CacheTaskAdapter<K, Void, Void> {
+ /** Keys. */
+ private final Set<? extends K> keys;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param keys Keys.
+ */
+ public RemoveAllKeysTask(String cacheName, boolean async, Set<? extends K> keys) {
+ super(cacheName, async);
+ this.keys = keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() {
+ cache().removeAll(keys);
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class GetAllTask<K, V> extends CacheTaskAdapter<K, V, Map<K, V>> {
+ /** Keys. */
+ private final Set<? extends K> keys;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param keys Keys.
+ */
+ public GetAllTask(String cacheName, boolean async, Set<? extends K> keys) {
+ super(cacheName, async);
+ this.keys = keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<K, V> call() throws Exception {
+ return cache().getAll(keys);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class GetAllOutTxTask<K, V> extends CacheTaskAdapter<K, V, Map<K, V>> {
+ /** Keys. */
+ private final Set<? extends K> keys;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param keys Keys.
+ */
+ public GetAllOutTxTask(String cacheName, boolean async, Set<? extends K> keys) {
+ super(cacheName, async);
+ this.keys = keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<K, V> call() throws Exception {
+ return cache().getAllOutTx(keys);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ContainsKeysTask<K, V> extends CacheTaskAdapter<K, V, Boolean> {
+ /** Keys. */
+ private final Set<? extends K> keys;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param keys Keys.
+ */
+ public ContainsKeysTask(String cacheName, boolean async, Set<? extends K> keys) {
+ super(cacheName, async);
+ this.keys = keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean call() throws Exception {
+ return cache().containsKeys(keys);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class GetAndPutTask<K, V> extends CacheTaskAdapter<K, V, V> {
+ /** Key. */
+ private final K key;
+
+ /** Value. */
+ private final V val;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ * @param val Value.
+ */
+ public GetAndPutTask(String cacheName, boolean async, K key, V val) {
+ super(cacheName, async);
+ this.key = key;
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V call() throws Exception {
+ return cache().getAndPut(key, val);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class PutIfAbsentTask<K, V> extends CacheTaskAdapter<K, V, Boolean> {
+ /** Key. */
+ private final K key;
+
+ /** Value. */
+ private final V val;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ * @param val Value.
+ */
+ public PutIfAbsentTask(String cacheName, boolean async, K key, V val) {
+ super(cacheName, async);
+ this.key = key;
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean call() throws Exception {
+ return cache().putIfAbsent(key, val);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class RemoveIfExistsTask<K, V> extends CacheTaskAdapter<K, V, Boolean> {
+ /** Key. */
+ private final K key;
+
+ /** Old value. */
+ private final V oldVal;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ * @param oldVal Old value.
+ */
+ public RemoveIfExistsTask(String cacheName, boolean async, K key, V oldVal) {
+ super(cacheName, async);
+ this.key = key;
+ this.oldVal = oldVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean call() throws Exception {
+ return cache().remove(key, oldVal);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class GetAndRemoveTask<K, V> extends CacheTaskAdapter<K, V, V> {
+ /** Key. */
+ private final K key;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ */
+ public GetAndRemoveTask(String cacheName, boolean async, K key) {
+ super(cacheName, async);
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V call() throws Exception {
+ return cache().getAndRemove(key);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ReplaceIfExistsTask<K, V> extends CacheTaskAdapter<K, V, Boolean> {
+ /** Key. */
+ private final K key;
+
+ /** Old value. */
+ private final V oldVal;
+
+ /** New value. */
+ private final V newVal;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ * @param oldVal Old value.
+ * @param newVal New value.
+ */
+ public ReplaceIfExistsTask(String cacheName, boolean async, K key, V oldVal, V newVal) {
+ super(cacheName, async);
+ this.key = key;
+ this.oldVal = oldVal;
+ this.newVal = newVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean call() throws Exception {
+ return cache().replace(key, oldVal, newVal);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class GetAndReplaceTask<K, V> extends CacheTaskAdapter<K, V, V> {
+ /** Key. */
+ private final K key;
+
+ /** Value. */
+ private final V val;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ * @param val Value.
+ */
+ public GetAndReplaceTask(String cacheName, boolean async, K key, V val) {
+ super(cacheName, async);
+ this.key = key;
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V call() throws Exception {
+ return cache().getAndReplace(key, val);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ClearKeyTask<K> extends CacheTaskAdapter<K, Void, Void> {
+ /** Key. */
+ private final K key;
+
+ /** Local. */
+ private final boolean loc;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ */
+ public ClearKeyTask(String cacheName, boolean async, boolean loc, K key) {
+ super(cacheName, async);
+ this.key = key;
+ this.loc = loc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() {
+ if (loc)
+ cache().localClear(key);
+ else
+ cache().clear(key);
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ClearAllKeys<K> extends CacheTaskAdapter<K, Void, Void> {
+ /** Keys. */
+ private final Set<? extends K> keys;
+
+ /** Local. */
+ private final boolean loc;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param keys Keys.
+ */
+ public ClearAllKeys(String cacheName, boolean async, boolean loc, Set<? extends K> keys) {
+ super(cacheName, async);
+ this.keys = keys;
+ this.loc = loc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() {
+ if (loc)
+ cache().localClearAll(keys);
+ else
+ cache().clearAll(keys);
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class InvokeTask<K, V, R> extends CacheTaskAdapter<K, V, R> {
+ /** Key. */
+ private final K key;
+
+ /** Processor. */
+ private final EntryProcessor<K, V, R> processor;
+
+ /** Args. */
+ private final Object[] args;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ * @param processor Processor.
+ * @param args Args.
+ */
+ public InvokeTask(String cacheName, boolean async, K key, EntryProcessor<K, V, R> processor,
+ Object[] args) {
+ super(cacheName, async);
+ this.args = args;
+ this.key = key;
+ this.processor = processor;
+ }
+
+ /** {@inheritDoc} */
+ @Override public R call() throws Exception {
+ return cache().invoke(key, processor, args);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class InvokeAllTask<K, V, T> extends CacheTaskAdapter<K, V, Map<K, EntryProcessorResult<T>>> {
+ /** Keys. */
+ private final Set<? extends K> keys;
+
+ /** Processor. */
+ private final EntryProcessor<K, V, T> processor;
+
+ /** Args. */
+ private final Object[] args;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param keys Keys.
+ * @param processor Processor.
+ * @param args Args.
+ */
+ public InvokeAllTask(String cacheName, boolean async, Set<? extends K> keys,
+ EntryProcessor<K, V, T> processor, Object[] args) {
+ super(cacheName, async);
+ this.args = args;
+ this.keys = keys;
+ this.processor = processor;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<K, EntryProcessorResult<T>> call() throws Exception {
+ return cache().invokeAll(keys, processor, args);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CloseTask extends CacheTaskAdapter<Void, Void, Void> {
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ */
+ public CloseTask(String cacheName, boolean async) {
+ super(cacheName, async);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() {
+ cache().close();
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class DestroyTask extends CacheTaskAdapter<Void, Void, Void> {
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ */
+ public DestroyTask(String cacheName, boolean async) {
+ super(cacheName, async);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() {
+ cache().destroy();
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class IsClosedTask extends CacheTaskAdapter<Void, Void, Boolean> {
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ */
+ public IsClosedTask(String cacheName, boolean async) {
+ super(cacheName, async);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean call() throws Exception {
+ return cache().isClosed();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class UnwrapTask<R> extends CacheTaskAdapter<Void, Void, R> {
+ /** Clazz. */
+ private final Class<R> clazz;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param clazz Clazz.
+ */
+ public UnwrapTask(String cacheName, boolean async, Class<R> clazz) {
+ super(cacheName, async);
+ this.clazz = clazz;
+ }
+
+ /** {@inheritDoc} */
+ @Override public R call() throws Exception {
+ return cache().unwrap(clazz);
+ }
+ }
+
+ /**
+ *
+ */
+ private static abstract class CacheTaskAdapter<K, V, R> implements IgniteCallable<R> {
+ /** Ignite. */
+ @IgniteInstanceResource
+ protected Ignite ignite;
+
+ /** Cache name. */
+ protected final String cacheName;
+
+ /** Async. */
+ protected final boolean async;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ */
+ public CacheTaskAdapter(String cacheName, boolean async) {
+ this.async = async;
+ this.cacheName = cacheName;
+ }
+
+ /**
+ * Returns cache instance.
+ */
+ protected IgniteCache<K, V> cache() {
+ IgniteCache<K, V> cache = ignite.cache(cacheName);
+
+ return async ? cache.withAsync() : cache;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
index 05d6533..633e9d0 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
@@ -26,7 +26,6 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCluster;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.Ignition;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
@@ -36,6 +35,7 @@ import org.apache.ignite.internal.cluster.IgniteClusterEx;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.jetbrains.annotations.Nullable;
/**
@@ -43,9 +43,6 @@ import org.jetbrains.annotations.Nullable;
*/
@SuppressWarnings("TransientFieldInNonSerializableClass")
public class IgniteClusterProcessProxy implements IgniteClusterEx {
- /** Grid id. */
- private final UUID gridId;
-
/** Compute. */
private final transient IgniteCompute compute;
@@ -57,21 +54,11 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx {
*/
public IgniteClusterProcessProxy(IgniteProcessProxy proxy) {
this.proxy = proxy;
- gridId = proxy.getId();
compute = proxy.remoteCompute();
}
- /**
- * Returns cluster instance. Method to be called from closure at another JVM.
- *
- * @return Cache.
- */
- private IgniteClusterEx cluster() {
- return (IgniteClusterEx)Ignition.ignite(gridId).cluster();
- }
-
/** {@inheritDoc} */
- @Override public ClusterGroupEx forSubjectId(final UUID subjId) {
+ @Override public ClusterGroupEx forSubjectId(UUID subjId) {
throw new UnsupportedOperationException("Operation is not supported yet.");
}
@@ -83,11 +70,7 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx {
/** {@inheritDoc} */
@Override public ClusterNode localNode() {
- return compute.call(new IgniteCallable<ClusterNode>() {
- @Override public ClusterNode call() throws Exception {
- return cluster().localNode();
- }
- });
+ return compute.call(new LocalNodeTask());
}
/** {@inheritDoc} */
@@ -285,38 +268,22 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx {
/** {@inheritDoc} */
@Override public Collection<ClusterNode> nodes() {
- return compute.call(new IgniteCallable<Collection<ClusterNode>>() {
- @Override public Collection<ClusterNode> call() throws Exception {
- return cluster().nodes();
- }
- });
+ return compute.call(new NodesTask());
}
/** {@inheritDoc} */
- @Override public ClusterNode node(final UUID nid) {
- return compute.call(new IgniteCallable<ClusterNode>() {
- @Override public ClusterNode call() throws Exception {
- return cluster().node(nid);
- }
- });
+ @Override public ClusterNode node(UUID nid) {
+ return compute.call(new NodeTask(nid));
}
/** {@inheritDoc} */
@Override public ClusterNode node() {
- return compute.call(new IgniteCallable<ClusterNode>() {
- @Override public ClusterNode call() throws Exception {
- return cluster().node();
- }
- });
+ return compute.call(new NodeTask(null));
}
/** {@inheritDoc} */
@Override public Collection<String> hostNames() {
- return compute.call(new IgniteCallable<Collection<String>>() {
- @Override public Collection<String> call() throws Exception {
- return cluster().hostNames();
- }
- });
+ return compute.call(new HostNamesTask());
}
/** {@inheritDoc} */
@@ -333,4 +300,70 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx {
@Nullable @Override public IgniteFuture<?> clientReconnectFuture() {
throw new UnsupportedOperationException("Operation is not supported yet.");
}
+
+ /**
+ *
+ */
+ private static class LocalNodeTask extends ClusterTaskAdapter<ClusterNode> {
+ /** {@inheritDoc} */
+ @Override public ClusterNode call() throws Exception {
+ return cluster().localNode();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class NodesTask extends ClusterTaskAdapter<Collection<ClusterNode>> {
+ /** {@inheritDoc} */
+ @Override public Collection<ClusterNode> call() throws Exception {
+ return cluster().nodes();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class NodeTask extends ClusterTaskAdapter<ClusterNode> {
+ /** Node id. */
+ private final UUID nodeId;
+
+ /**
+ * @param nodeId Node id.
+ */
+ public NodeTask(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterNode call() throws Exception {
+ return nodeId == null ? cluster().node() : cluster().node(nodeId);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class HostNamesTask extends ClusterTaskAdapter<Collection<String>> {
+ /** {@inheritDoc} */
+ @Override public Collection<String> call() throws Exception {
+ return cluster().hostNames();
+ }
+ }
+
+ /**
+ *
+ */
+ private abstract static class ClusterTaskAdapter<R> implements IgniteCallable<R> {
+ /** Ignite. */
+ @IgniteInstanceResource
+ protected Ignite ignite;
+
+ /**
+ *
+ */
+ protected IgniteCluster cluster() {
+ return ignite.cluster();
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java
index 860f889..d5af81e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java
@@ -20,15 +20,16 @@ package org.apache.ignite.testframework.junits.multijvm;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.Ignition;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.events.Event;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.jetbrains.annotations.Nullable;
/**
@@ -39,23 +40,11 @@ public class IgniteEventsProcessProxy implements IgniteEvents {
/** Ignite proxy. */
private final transient IgniteProcessProxy igniteProxy;
- /** Grid id. */
- private final UUID gridId;
-
/**
* @param igniteProxy Ignite proxy.
*/
public IgniteEventsProcessProxy(IgniteProcessProxy igniteProxy) {
this.igniteProxy = igniteProxy;
-
- gridId = igniteProxy.getId();
- }
-
- /**
- * @return Events instance.
- */
- private IgniteEvents events() {
- return Ignition.ignite(gridId).events();
}
/** {@inheritDoc} */
@@ -105,11 +94,7 @@ public class IgniteEventsProcessProxy implements IgniteEvents {
/** {@inheritDoc} */
@Override public void localListen(final IgnitePredicate<? extends Event> lsnr, final int... types) {
- igniteProxy.remoteCompute().run(new IgniteRunnable() {
- @Override public void run() {
- events().localListen(lsnr, types);
- }
- });
+ igniteProxy.remoteCompute().run(new LocalListenTask(lsnr, types));
}
/** {@inheritDoc} */
@@ -151,4 +136,33 @@ public class IgniteEventsProcessProxy implements IgniteEvents {
@Override public <R> IgniteFuture<R> future() {
throw new UnsupportedOperationException("Operation isn't supported yet.");
}
+
+ /**
+ *
+ */
+ private static class LocalListenTask implements IgniteRunnable {
+ /** Ignite. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** Listener. */
+ private final IgnitePredicate<? extends Event> lsnr;
+
+ /** Types. */
+ private final int[] types;
+
+ /**
+ * @param lsnr Listener.
+ * @param types Types.
+ */
+ public LocalListenTask(IgnitePredicate<? extends Event> lsnr, int[] types) {
+ this.lsnr = lsnr;
+ this.types = types;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ ignite.events().localListen(lsnr, types);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
index f46e8e9..0597eda 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
@@ -25,13 +25,12 @@ import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStream;
-import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
@@ -39,8 +38,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfT
import org.apache.ignite.internal.util.GridJavaProcess;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
+import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.junits.IgniteTestResources;
import sun.jvmstat.monitor.HostIdentifier;
import sun.jvmstat.monitor.MonitoredHost;
import sun.jvmstat.monitor.MonitoredVm;
@@ -99,30 +99,6 @@ public class IgniteNodeRunner {
public static String storeToFile(IgniteConfiguration cfg) throws IOException {
String fileName = IGNITE_CONFIGURATION_FILE + cfg.getNodeId();
- // Check marshaller configuration, because read configuration method expect specific marshaller.
- if (cfg.getMarshaller() instanceof OptimizedMarshaller){
- OptimizedMarshaller marsh = (OptimizedMarshaller)cfg.getMarshaller();
-
- try {
- Field isRequireFiled = marsh.getClass().getDeclaredField("requireSer");
-
- isRequireFiled.setAccessible(true);
-
- boolean isRequireSer = isRequireFiled.getBoolean(marsh);
-
- if (isRequireSer)
- throw new UnsupportedOperationException("Unsupported marshaller configuration. " +
- "readCfgFromFileAndDeleteFile method expect " + OptimizedMarshaller.class.getSimpleName() +
- "with requireSerializeble flag in 'false'.");
- }
- catch (NoSuchFieldException|IllegalAccessException e) {
- throw new IgniteException("Failed to check filed of " + OptimizedMarshaller.class.getSimpleName(), e);
- }
- }
- else
- throw new UnsupportedOperationException("Unsupported marshaller. " +
- "readCfgFromFileAndDeleteFile method expect " + OptimizedMarshaller.class.getSimpleName());
-
try(OutputStream out = new BufferedOutputStream(new FileOutputStream(fileName))) {
cfg.setMBeanServer(null);
cfg.setMarshaller(null);
@@ -143,11 +119,16 @@ public class IgniteNodeRunner {
* @throws IOException If failed.
* @see #storeToFile(IgniteConfiguration)
*/
- private static IgniteConfiguration readCfgFromFileAndDeleteFile(String fileName) throws IOException {
+ private static IgniteConfiguration readCfgFromFileAndDeleteFile(String fileName)
+ throws IOException, IgniteCheckedException {
try(BufferedReader cfgReader = new BufferedReader(new FileReader(fileName))) {
IgniteConfiguration cfg = (IgniteConfiguration)new XStream().fromXML(cfgReader);
- cfg.setMarshaller(new OptimizedMarshaller(false));
+ Marshaller marsh = IgniteTestResources.getMarshaller();
+
+ cfg.setMarshaller(marsh);
+
+ X.println("Configured marshaller class: " + marsh.getClass().getName());
TcpDiscoverySpi disco = new TcpDiscoverySpi();
disco.setIpFinder(GridCacheAbstractFullApiSelfTest.LOCAL_IP_FINDER);
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
index ec7dab7..aa1d470 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
@@ -41,13 +41,11 @@ import org.apache.ignite.IgniteFileSystem;
import org.apache.ignite.IgniteIllegalStateException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteMessaging;
-import org.apache.ignite.internal.portable.api.IgnitePortables;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.IgniteScheduler;
import org.apache.ignite.IgniteServices;
import org.apache.ignite.IgniteSet;
import org.apache.ignite.IgniteTransactions;
-import org.apache.ignite.Ignition;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
@@ -62,6 +60,7 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.IgniteClusterEx;
+import org.apache.ignite.internal.portable.api.IgnitePortables;
import org.apache.ignite.internal.processors.cache.GridCacheUtilityKey;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.hadoop.Hadoop;
@@ -78,6 +77,8 @@ import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.plugin.IgnitePlugin;
import org.apache.ignite.plugin.PluginNotFoundException;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.testframework.junits.IgniteTestResources;
import org.jetbrains.annotations.Nullable;
/**
@@ -88,6 +89,9 @@ public class IgniteProcessProxy implements IgniteEx {
/** Grid proxies. */
private static final transient ConcurrentMap<String, IgniteProcessProxy> gridProxies = new ConcurrentHashMap<>();
+ /** Property that specify alternative {@code JAVA_HOME}. */
+ private static final String TEST_MULTIJVM_JAVA_HOME = "test.multijvm.java.home";
+
/** Jvm process with ignite instance. */
private final transient GridJavaProcess proc;
@@ -108,7 +112,7 @@ public class IgniteProcessProxy implements IgniteEx {
* @param log Logger.
* @param locJvmGrid Local JVM grid.
*/
- public IgniteProcessProxy(final IgniteConfiguration cfg, final IgniteLogger log, final Ignite locJvmGrid)
+ public IgniteProcessProxy(IgniteConfiguration cfg, IgniteLogger log, Ignite locJvmGrid)
throws Exception {
this.cfg = cfg;
this.locJvmGrid = locJvmGrid;
@@ -121,7 +125,9 @@ public class IgniteProcessProxy implements IgniteEx {
Collection<String> filteredJvmArgs = new ArrayList<>();
for (String arg : jvmArgs) {
- if(!arg.toLowerCase().startsWith("-agentlib"))
+ if(arg.startsWith("-Xmx") || arg.startsWith("-Xms") ||
+ arg.startsWith("-cp") || arg.startsWith("-classpath") ||
+ arg.startsWith("-D" + IgniteTestResources.MARSH_CLASS_NAME))
filteredJvmArgs.add(arg);
}
@@ -130,7 +136,7 @@ public class IgniteProcessProxy implements IgniteEx {
locJvmGrid.events().localListen(new NodeStartedListener(id, rmtNodeStartedLatch), EventType.EVT_NODE_JOINED);
proc = GridJavaProcess.exec(
- IgniteNodeRunner.class,
+ IgniteNodeRunner.class.getCanonicalName(),
cfgFileName, // Params.
this.log,
// Optional closure to be called each time wrapped process prints line to system.out or system.err.
@@ -140,6 +146,7 @@ public class IgniteProcessProxy implements IgniteEx {
}
},
null,
+ System.getProperty(TEST_MULTIJVM_JAVA_HOME),
filteredJvmArgs, // JVM Args.
System.getProperty("surefire.test.class.path")
);
@@ -149,11 +156,7 @@ public class IgniteProcessProxy implements IgniteEx {
IgniteProcessProxy prevVal = gridProxies.putIfAbsent(cfg.getGridName(), this);
if (prevVal != null) {
- remoteCompute().run(new IgniteRunnable() {
- @Override public void run() {
- G.stop(cfg.getGridName(), true);
- }
- });
+ remoteCompute().run(new StopGridTask(cfg.getGridName(), true));
throw new IllegalStateException("There was found instance assotiated with " + cfg.getGridName() +
", instance= " + prevVal + ". New started node was stopped.");
@@ -208,30 +211,17 @@ public class IgniteProcessProxy implements IgniteEx {
* @param gridName Grid name.
* @param cancel Cacnel flag.
*/
- public static void stop(final String gridName, final boolean cancel) {
+ public static void stop(String gridName, boolean cancel) {
IgniteProcessProxy proxy = gridProxies.get(gridName);
if (proxy != null) {
- proxy.remoteCompute().run(new IgniteRunnable() {
- @Override public void run() {
- G.stop(gridName, cancel);
- }
- });
+ proxy.remoteCompute().run(new StopGridTask(gridName, cancel));
gridProxies.remove(gridName, proxy);
}
}
/**
- * For usage in closures.
- *
- * @return Ignite instance.
- */
- private Ignite igniteById() {
- return Ignition.ignite(id);
- }
-
- /**
* @param locNodeId ID of local node the requested grid instance is managing.
* @return An instance of named grid. This method never returns {@code null}.
* @throws IgniteIllegalStateException Thrown if grid was not properly initialized or grid instance was stopped or
@@ -357,11 +347,7 @@ public class IgniteProcessProxy implements IgniteEx {
/** {@inheritDoc} */
@Override public ClusterNode localNode() {
- return remoteCompute().call(new IgniteCallable<ClusterNode>() {
- @Override public ClusterNode call() throws Exception {
- return ((IgniteEx)Ignition.ignite(id)).localNode();
- }
- });
+ return remoteCompute().call(new NodeTask());
}
/** {@inheritDoc} */
@@ -467,7 +453,10 @@ public class IgniteProcessProxy implements IgniteEx {
}
/** {@inheritDoc} */
- @Override public <K, V> IgniteCache<K, V> createNearCache(@Nullable String cacheName, NearCacheConfiguration<K, V> nearCfg) {
+ @Override public <K, V> IgniteCache<K, V> createNearCache(
+ @Nullable String cacheName,
+ NearCacheConfiguration<K, V> nearCfg)
+ {
throw new UnsupportedOperationException("Operation isn't supported yet.");
}
@@ -508,7 +497,8 @@ public class IgniteProcessProxy implements IgniteEx {
}
/** {@inheritDoc} */
- @Override public IgniteAtomicSequence atomicSequence(String name, long initVal, boolean create) throws IgniteException {
+ @Override public IgniteAtomicSequence atomicSequence(String name, long initVal, boolean create)
+ throws IgniteException {
throw new UnsupportedOperationException("Operation isn't supported yet.");
}
@@ -524,8 +514,12 @@ public class IgniteProcessProxy implements IgniteEx {
}
/** {@inheritDoc} */
- @Override public <T, S> IgniteAtomicStamped<T, S> atomicStamped(String name, @Nullable T initVal, @Nullable S initStamp,
- boolean create) throws IgniteException {
+ @Override public <T, S> IgniteAtomicStamped<T, S> atomicStamped(
+ String name,
+ @Nullable T initVal,
+ @Nullable S initStamp,
+ boolean create) throws IgniteException
+ {
throw new UnsupportedOperationException("Operation isn't supported yet.");
}
@@ -572,11 +566,7 @@ public class IgniteProcessProxy implements IgniteEx {
}
}, EventType.EVT_NODE_LEFT, EventType.EVT_NODE_FAILED);
- compute().run(new IgniteRunnable() {
- @Override public void run() {
- igniteById().close();
- }
- });
+ compute().run(new StopGridTask(localJvmGrid().name(), true));
try {
assert U.await(rmtNodeStoppedLatch, 15, TimeUnit.SECONDS) : "NodeId=" + id;
@@ -616,4 +606,43 @@ public class IgniteProcessProxy implements IgniteEx {
return locJvmGrid.compute(grp);
}
+
+ /**
+ *
+ */
+ private static class StopGridTask implements IgniteRunnable {
+ /** Grid name. */
+ private final String gridName;
+
+ /** Cancel. */
+ private final boolean cancel;
+
+ /**
+ * @param gridName Grid name.
+ * @param cancel Cancel.
+ */
+ public StopGridTask(String gridName, boolean cancel) {
+ this.gridName = gridName;
+ this.cancel = cancel;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ G.stop(gridName, cancel);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class NodeTask implements IgniteCallable<ClusterNode> {
+ /** Ignite. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public ClusterNode call() throws Exception {
+ return ((IgniteEx)ignite).localNode();
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationP2PTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationP2PTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationP2PTest.java
index 5298dd4..3777154 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationP2PTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationP2PTest.java
@@ -103,6 +103,7 @@ public class CacheConfigurationP2PTest extends GridCommonAbstractTest {
}
},
null,
+ null,
jvmArgs,
null
);
@@ -119,6 +120,7 @@ public class CacheConfigurationP2PTest extends GridCommonAbstractTest {
}
},
null,
+ null,
jvmArgs,
null
);
@@ -139,6 +141,7 @@ public class CacheConfigurationP2PTest extends GridCommonAbstractTest {
}
},
null,
+ null,
jvmArgs,
cp
);
[3/3] ignite git commit: ignite-1526: fixed merging conflicts
Posted by dm...@apache.org.
ignite-1526: fixed merging conflicts
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6afc2fc3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6afc2fc3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6afc2fc3
Branch: refs/heads/ignite-1.4.2
Commit: 6afc2fc3ca390dad7834105b53804b89f50aecf2
Parents: 5fc682f
Author: Andrey Gura <ag...@gridgain.com>
Authored: Fri Oct 9 14:23:34 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Fri Oct 9 14:23:34 2015 +0300
----------------------------------------------------------------------
.../testframework/junits/multijvm/IgniteCacheProcessProxy.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6afc2fc3/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
index 04d7893..02c6a26 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
@@ -553,7 +553,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
Collection<Entry<K, V>> res = new ArrayList<>();
for (Entry<K, V> e : cache().localEntries(peekModes))
- res.add(new CacheEntryImpl<>(e.getKey(), e.getValue()));
+ res.add(e);
return res;
}
[2/3] ignite git commit: ignite-1526: full support of IBM JDK by
Ignite
Posted by dm...@apache.org.
ignite-1526: full support of IBM JDK by Ignite
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5fc682f1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5fc682f1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5fc682f1
Branch: refs/heads/ignite-1.4.2
Commit: 5fc682f11f43f61d14d6b70be5ccf949a9ae05ac
Parents: 4f95be2
Author: Andrey Gura <ag...@gridgain.com>
Authored: Fri Oct 9 13:54:56 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Fri Oct 9 14:22:34 2015 +0300
----------------------------------------------------------------------
.../discovery/GridDiscoveryManager.java | 17 +-
.../internal/portable/PortableContext.java | 7 +
.../portable/api/PortableMarshaller.java | 14 +-
.../ignite/internal/util/GridJavaProcess.java | 12 +-
.../ignite/internal/util/lang/GridFunc.java | 12 +
.../apache/ignite/marshaller/Marshaller.java | 2 +-
.../optimized/OptimizedMarshallerUtils.java | 6 +-
.../cache/GridCacheAbstractFullApiSelfTest.java | 486 ++++---
.../CacheNoValueClassOnServerNodeTest.java | 1 +
...tomicClientOnlyMultiNodeFullApiSelfTest.java | 71 +-
...achePartitionedMultiNodeFullApiSelfTest.java | 129 +-
.../testframework/junits/GridAbstractTest.java | 116 +-
.../junits/IgniteTestResources.java | 8 +-
.../junits/common/GridCommonAbstractTest.java | 15 +-
.../junits/multijvm/AffinityProcessProxy.java | 440 ++++--
.../multijvm/IgniteCacheProcessProxy.java | 1348 ++++++++++++++----
.../multijvm/IgniteClusterProcessProxy.java | 115 +-
.../multijvm/IgniteEventsProcessProxy.java | 50 +-
.../junits/multijvm/IgniteNodeRunner.java | 39 +-
.../junits/multijvm/IgniteProcessProxy.java | 107 +-
.../cache/CacheConfigurationP2PTest.java | 3 +
21 files changed, 2186 insertions(+), 812 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 1801b9c..cba06de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -987,19 +987,24 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
boolean locP2pEnabled = locNode.attribute(ATTR_PEER_CLASSLOADING);
- boolean warned = false;
+ boolean ipV4Warned = false;
+
+ boolean jvmMajVerWarned = false;
for (ClusterNode n : nodes) {
int rmtJvmMajVer = nodeJavaMajorVersion(n);
- if (locJvmMajVer != rmtJvmMajVer)
- throw new IgniteCheckedException("Local node's java major version is different from remote node's one" +
- " [locJvmMajVer=" + locJvmMajVer + ", rmtJvmMajVer=" + rmtJvmMajVer + "]");
+ if (locJvmMajVer != rmtJvmMajVer && !jvmMajVerWarned) {
+ U.warn(log, "Local java version is different from remote [loc=" +
+ locJvmMajVer + ", rmt=" + rmtJvmMajVer + "]");
+
+ jvmMajVerWarned = true;
+ }
String rmtPreferIpV4 = n.attribute("java.net.preferIPv4Stack");
if (!F.eq(rmtPreferIpV4, locPreferIpV4)) {
- if (!warned)
+ if (!ipV4Warned)
U.warn(log, "Local node's value of 'java.net.preferIPv4Stack' " +
"system property differs from remote node's " +
"(all nodes in topology should have identical value) " +
@@ -1008,7 +1013,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
", rmtAddrs=" + U.addressesAsString(n) + ']',
"Local and remote 'java.net.preferIPv4Stack' system properties do not match.");
- warned = true;
+ ipV4Warned = true;
}
// Daemon nodes are allowed to have any deployment they need.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
index 2ee96b7..1ad42ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
@@ -967,6 +967,9 @@ public class PortableContext implements Externalizable {
}
}
+ /**
+ * Basic class ID mapper.
+ */
private static class BasicClassIdMapper implements PortableIdMapper {
/** {@inheritDoc} */
@Override public int typeId(String clsName) {
@@ -1121,6 +1124,10 @@ public class PortableContext implements Externalizable {
/** Whether the following type is registered in a cache or not */
private final boolean registered;
+ /**
+ * @param id Id.
+ * @param registered Registered.
+ */
public Type(int id, boolean registered) {
this.id = id;
this.registered = registered;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/main/java/org/apache/ignite/internal/portable/api/PortableMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/api/PortableMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/api/PortableMarshaller.java
index de0df8d..3dfbdf0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/api/PortableMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/api/PortableMarshaller.java
@@ -29,12 +29,6 @@ import org.apache.ignite.internal.portable.GridPortableMarshaller;
import org.apache.ignite.internal.portable.PortableContext;
import org.apache.ignite.marshaller.AbstractMarshaller;
import org.apache.ignite.marshaller.MarshallerContext;
-import org.apache.ignite.internal.portable.api.PortableException;
-import org.apache.ignite.internal.portable.api.PortableIdMapper;
-import org.apache.ignite.internal.portable.api.PortableObject;
-import org.apache.ignite.internal.portable.api.PortableProtocolVersion;
-import org.apache.ignite.internal.portable.api.PortableSerializer;
-import org.apache.ignite.internal.portable.api.PortableTypeConfiguration;
import org.jetbrains.annotations.Nullable;
/**
@@ -336,7 +330,7 @@ public class PortableMarshaller extends AbstractMarshaller {
/** {@inheritDoc} */
@Override public <T> T unmarshal(InputStream in, @Nullable ClassLoader clsLdr) throws IgniteCheckedException {
- ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ ByteArrayOutputStream buf = new ByteArrayOutputStream();
byte[] arr = new byte[4096];
int cnt;
@@ -345,11 +339,11 @@ public class PortableMarshaller extends AbstractMarshaller {
// returns number of bytes remaining.
try {
while ((cnt = in.read(arr)) != -1)
- buffer.write(arr, 0, cnt);
+ buf.write(arr, 0, cnt);
- buffer.flush();
+ buf.flush();
- return impl.deserialize(buffer.toByteArray(), clsLdr);
+ return impl.deserialize(buf.toByteArray(), clsLdr);
}
catch (IOException e) {
throw new PortableException("Failed to unmarshal the object from InputStream", e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
index 92c20fe..3371eb8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
@@ -89,7 +89,7 @@ public final class GridJavaProcess {
*/
public static GridJavaProcess exec(Class cls, String params, @Nullable IgniteLogger log,
@Nullable IgniteInClosure<String> printC, @Nullable GridAbsClosure procKilledC) throws Exception {
- return exec(cls.getCanonicalName(), params, log, printC, procKilledC, null, null);
+ return exec(cls.getCanonicalName(), params, log, printC, procKilledC, null, null, null);
}
/**
@@ -108,7 +108,7 @@ public final class GridJavaProcess {
public static GridJavaProcess exec(Class cls, String params, @Nullable IgniteLogger log,
@Nullable IgniteInClosure<String> printC, @Nullable GridAbsClosure procKilledC,
@Nullable Collection<String> jvmArgs, @Nullable String cp) throws Exception {
- return exec(cls.getCanonicalName(), params, log, printC, procKilledC, jvmArgs, cp);
+ return exec(cls.getCanonicalName(), params, log, printC, procKilledC, null, jvmArgs, cp);
}
/**
@@ -116,9 +116,10 @@ public final class GridJavaProcess {
*
* @param clsName Class with main() method to be run.
* @param params main() method parameters.
+ * @param log Log to use.
* @param printC Optional closure to be called each time wrapped process prints line to system.out or system.err.
* @param procKilledC Optional closure to be called when process termination is detected.
- * @param log Log to use.
+ * @param javaHome Java home location. The process will be started under given JVM.
* @param jvmArgs JVM arguments to use.
* @param cp Additional classpath.
* @return Wrapper around {@link Process}
@@ -126,7 +127,7 @@ public final class GridJavaProcess {
*/
public static GridJavaProcess exec(String clsName, String params, @Nullable IgniteLogger log,
@Nullable IgniteInClosure<String> printC, @Nullable GridAbsClosure procKilledC,
- @Nullable Collection<String> jvmArgs, @Nullable String cp) throws Exception {
+ @Nullable String javaHome, @Nullable Collection<String> jvmArgs, @Nullable String cp) throws Exception {
if (!(U.isLinux() || U.isMacOs() || U.isWindows()))
throw new Exception("Your OS is not supported.");
@@ -140,7 +141,8 @@ public final class GridJavaProcess {
List<String> procCommands = new ArrayList<>();
- String javaBin = System.getProperty("java.home") + File.separator + "bin" + File.separator + "java";
+ String javaBin = (javaHome == null ? System.getProperty("java.home") : javaHome) +
+ File.separator + "bin" + File.separator + "java";
procCommands.add(javaBin);
procCommands.addAll(jvmArgs == null ? U.jvmArgs() : jvmArgs);
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index ffeeca0..43bc5f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -103,6 +103,9 @@ public class GridFunc {
/** */
private static final IgniteClosure IDENTITY = new C1() {
+ /** */
+ private static final long serialVersionUID = -6338573080046225172L;
+
@Override public Object apply(Object o) {
return o;
}
@@ -1196,6 +1199,9 @@ public class GridFunc {
A.notNull(nodeId, "nodeId");
return new P1<T>() {
+ /** */
+ private static final long serialVersionUID = -7082730222779476623L;
+
@Override public boolean apply(ClusterNode e) {
return e.id().equals(nodeId);
}
@@ -1705,6 +1711,9 @@ public class GridFunc {
assert c != null;
return new GridSerializableList<T2>() {
+ /** */
+ private static final long serialVersionUID = 3126625219739967068L;
+
@Override public T2 get(int idx) {
return trans.apply(c.get(idx));
}
@@ -1766,6 +1775,9 @@ public class GridFunc {
assert m != null;
return isEmpty(p) || isAlwaysTrue(p) ? m : new GridSerializableMap<K, V>() {
+ /** */
+ private static final long serialVersionUID = 5531745605372387948L;
+
/** Entry predicate. */
private IgnitePredicate<Entry<K, V>> ep = new P1<Map.Entry<K, V>>() {
@Override public boolean apply(Entry<K, V> e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/main/java/org/apache/ignite/marshaller/Marshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/Marshaller.java b/modules/core/src/main/java/org/apache/ignite/marshaller/Marshaller.java
index 3e815fd..a76daa8 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/Marshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/Marshaller.java
@@ -92,7 +92,7 @@ public interface Marshaller {
public byte[] marshal(@Nullable Object obj) throws IgniteCheckedException;
/**
- * Unmarshals object from the output stream using given class loader.
+ * Unmarshals object from the input stream using given class loader.
* This method should not close given input stream.
*
* @param <T> Type of unmarshalled object.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java
index 4abbd04..584083c 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java
@@ -42,6 +42,10 @@ class OptimizedMarshallerUtils {
/** */
private static final Unsafe UNSAFE = GridUnsafe.unsafe();
+ /** Use default {@code serialVersionUid} for {@link Serializable} classes. */
+ private static final boolean USE_DFLT_SUID =
+ Boolean.valueOf(System.getProperty("ignite.marsh.optimized.useDefaultSUID", Boolean.TRUE.toString()));
+
/** */
static final long HASH_SET_MAP_OFF;
@@ -283,7 +287,7 @@ class OptimizedMarshallerUtils {
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
static short computeSerialVersionUid(Class cls, List<Field> fields) throws IOException {
- if (Serializable.class.isAssignableFrom(cls) && !Enum.class.isAssignableFrom(cls))
+ if (USE_DFLT_SUID && Serializable.class.isAssignableFrom(cls) && !Enum.class.isAssignableFrom(cls))
return (short)ObjectStreamClass.lookup(cls).getSerialVersionUID();
MessageDigest md;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 2a64963..ec3ea0c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -49,6 +50,7 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheMemoryMode;
@@ -75,6 +77,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi;
import org.apache.ignite.testframework.GridTestUtils;
@@ -117,48 +120,37 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
/** */
public static final CacheEntryProcessor<String, Integer, String> ERR_PROCESSOR =
new CacheEntryProcessor<String, Integer, String>() {
+ /** */
+ private static final long serialVersionUID = 0L;
+
@Override public String process(MutableEntry<String, Integer> e, Object... args) {
throw new RuntimeException("Failed!");
}
};
/** Increment processor for invoke operations. */
- public static final EntryProcessor<String, Integer, String> INCR_PROCESSOR = new EntryProcessor<String, Integer, String>() {
- @Override public String process(MutableEntry<String, Integer> e, Object... args) {
- assertNotNull(e.getKey());
-
- Integer old = e.getValue();
-
- e.setValue(old == null ? 1 : old + 1);
-
- return String.valueOf(old);
- }
- };
+ public static final EntryProcessor<String, Integer, String> INCR_PROCESSOR = new IncrementEntryProcessor();
/** Increment processor for invoke operations with IgniteEntryProcessor. */
public static final CacheEntryProcessor<String, Integer, String> INCR_IGNITE_PROCESSOR =
new CacheEntryProcessor<String, Integer, String>() {
+ /** */
+ private static final long serialVersionUID = 0L;
+
@Override public String process(MutableEntry<String, Integer> e, Object... args) {
return INCR_PROCESSOR.process(e, args);
}
};
/** Increment processor for invoke operations. */
- public static final EntryProcessor<String, Integer, String> RMV_PROCESSOR = new EntryProcessor<String, Integer, String>() {
- @Override public String process(MutableEntry<String, Integer> e, Object... args) {
- assertNotNull(e.getKey());
-
- Integer old = e.getValue();
-
- e.remove();
-
- return String.valueOf(old);
- }
- };
+ public static final EntryProcessor<String, Integer, String> RMV_PROCESSOR = new RemoveEntryProcessor();
/** Increment processor for invoke operations with IgniteEntryProcessor. */
public static final CacheEntryProcessor<String, Integer, String> RMV_IGNITE_PROCESSOR =
new CacheEntryProcessor<String, Integer, String>() {
+ /** */
+ private static final long serialVersionUID = 0L;
+
@Override public String process(MutableEntry<String, Integer> e, Object... args) {
return RMV_PROCESSOR.process(e, args);
}
@@ -346,21 +338,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assert jcache(i).localSize() != 0 || F.isEmpty(keysCol);
}
- for (int i = 0; i < gridCount(); i++) {
- executeOnLocalOrRemoteJvm(i, new TestIgniteIdxRunnable() {
- @Override public void run(int idx) throws Exception {
- GridCacheContext<String, Integer> ctx = context(idx);
-
- int sum = 0;
-
- for (String key : map.keySet())
- if (ctx.affinity().localNode(key, new AffinityTopologyVersion(ctx.discovery().topologyVersion())))
- sum++;
-
- assertEquals("Incorrect key size on cache #" + idx, sum, jcache(idx).localSize(ALL));
- }
- });
- }
+ for (int i = 0; i < gridCount(); i++)
+ executeOnLocalOrRemoteJvm(i, new CheckCacheSizeTask(map));
for (int i = 0; i < gridCount(); i++) {
Collection<String> keysCol = mapped.get(grid(i).localNode());
@@ -1350,13 +1329,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertEquals((Integer)3, cache.get("k1"));
- EntryProcessor<String, Integer, Integer> c = new EntryProcessor<String, Integer, Integer>() {
- @Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
- e.remove();
-
- return null;
- }
- };
+ EntryProcessor<String, Integer, Integer> c = new RemoveAndReturnNullEntryProcessor();
assertNull(cache.invoke("k1", c));
assertNull(cache.get("k1"));
@@ -1364,11 +1337,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
for (int i = 0; i < gridCount(); i++)
assertNull(jcache(i).localPeek("k1", ONHEAP));
- final EntryProcessor<String, Integer, Integer> errProcessor = new EntryProcessor<String, Integer, Integer>() {
- @Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
- throw new EntryProcessorException("Test entry processor exception.");
- }
- };
+ final EntryProcessor<String, Integer, Integer> errProcessor = new FailedEntryProcessor();
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
@@ -2001,7 +1970,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertFalse(cacheAsync.<Boolean>future().get());
}
- cache.localEvict(Arrays.asList("key2"));
+ cache.localEvict(Collections.singletonList("key2"));
// Same checks inside tx.
Transaction tx = inTx ? transactions().txStart() : null;
@@ -2357,27 +2326,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
for (int i = 0; i < cnt; i++)
cache.remove(String.valueOf(i));
- for (int g = 0; g < gridCount(); g++) {
- executeOnLocalOrRemoteJvm(g, new TestIgniteIdxRunnable() {
- @Override public void run(int idx) throws Exception {
- for (int i = 0; i < cnt; i++) {
- String key = String.valueOf(i);
-
- GridCacheContext<String, Integer> cctx = context(idx);
-
- GridCacheEntryEx entry = cctx.isNear() ? cctx.near().dht().peekEx(key) :
- cctx.cache().peekEx(key);
-
- if (grid(idx).affinity(null).mapKeyToPrimaryAndBackups(key).contains(grid(idx).localNode())) {
- assertNotNull(entry);
- assertTrue(entry.deleted());
- }
- else
- assertNull(entry);
- }
- }
- });
- }
+ for (int g = 0; g < gridCount(); g++)
+ executeOnLocalOrRemoteJvm(g, new CheckEntriesDeletedTask(cnt));
}
}
@@ -2587,8 +2537,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
c.add(null);
GridTestUtils.assertThrows(log, new Callable<Void>() {
- @Override
- public Void call() throws Exception {
+ @Override public Void call() throws Exception {
cache.removeAll(c);
return null;
@@ -2725,7 +2674,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
public void testRemoveAfterClear() throws Exception {
IgniteEx ignite = grid(0);
- boolean affNode = ((IgniteKernal)ignite).context().cache().internalCache(null).context().affinityNode();
+ boolean affNode = ignite.context().cache().internalCache(null).context().affinityNode();
if (!affNode) {
if (gridCount() < 2)
@@ -2766,13 +2715,6 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
}
/**
- *
- */
- private void xxx() {
- System.out.printf("");
- }
-
- /**
* @throws Exception In case of error.
*/
public void testClear() throws Exception {
@@ -3597,26 +3539,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
info("Local keys (primary + backup): " + locKeys);
}
- for (int i = 0; i < gridCount(); i++) {
- grid(i).events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- info("Received event: " + evt);
-
- switch (evt.type()) {
- case EVT_CACHE_OBJECT_SWAPPED:
- swapEvts.incrementAndGet();
-
- break;
- case EVT_CACHE_OBJECT_UNSWAPPED:
- unswapEvts.incrementAndGet();
-
- break;
- }
-
- return true;
- }
- }, EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED);
- }
+ for (int i = 0; i < gridCount(); i++)
+ grid(i).events().localListen(
+ new SwapEvtsLocalListener(swapEvts, unswapEvts), EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED);
cache.localEvict(F.asList(k2, k3));
@@ -3934,7 +3859,6 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
map.put(entry.getKey(), entry.getValue());
}
- assert map != null;
assert map.size() == 2;
assert map.get("key1") == 1;
assert map.get("key2") == 2;
@@ -3951,32 +3875,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
if (nearEnabled())
assertEquals(keys.size(), jcache().localSize(CachePeekMode.ALL));
else {
- for (int i = 0; i < gridCount(); i++) {
- executeOnLocalOrRemoteJvm(i, new TestIgniteIdxRunnable() {
- @Override public void run(int idx) throws Exception {
- GridCacheContext<String, Integer> ctx = context(idx);
-
- if (ctx.cache().configuration().getMemoryMode() == OFFHEAP_TIERED)
- return;
-
- int size = 0;
-
- for (String key : keys) {
- if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) {
- GridCacheEntryEx e =
- ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);
-
- assert e != null : "Entry is null [idx=" + idx + ", key=" + key + ", ctx=" + ctx + ']';
- assert !e.deleted() : "Entry is deleted: " + e;
-
- size++;
- }
- }
-
- assertEquals("Incorrect size on cache #" + idx, size, jcache(idx).localSize(ALL));
- }
- });
- }
+ for (int i = 0; i < gridCount(); i++)
+ executeOnLocalOrRemoteJvm(i, new CheckEntriesTask(keys));
}
}
@@ -3989,21 +3889,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertEquals("Invalid key size: " + jcache().localSize(ALL),
keys.size(), jcache().localSize(ALL));
else {
- for (int i = 0; i < gridCount(); i++) {
- executeOnLocalOrRemoteJvm(i, new TestIgniteIdxRunnable() {
- @Override public void run(int idx) throws Exception {
- GridCacheContext<String, Integer> ctx = context(idx);
-
- int size = 0;
-
- for (String key : keys)
- if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx()))
- size++;
-
- assertEquals("Incorrect key size on cache #" + idx, size, jcache(idx).localSize(ALL));
- }
- });
- }
+ for (int i = 0; i < gridCount(); i++)
+ executeOnLocalOrRemoteJvm(i, new CheckKeySizeTask(keys));
}
}
@@ -4061,27 +3948,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
* @param cnt Keys count.
* @return Collection of keys for which given cache is primary.
*/
- protected List<String> primaryKeysForCache(final IgniteCache<String, Integer> cache, final int cnt, final int startFrom) {
- return executeOnLocalOrRemoteJvm(cache, new TestCacheCallable<String, Integer, List<String>>() {
- @Override public List<String> call(Ignite ignite, IgniteCache<String, Integer> cache) throws Exception {
- List<String> found = new ArrayList<>();
-
- Affinity<Object> affinity = ignite.affinity(cache.getName());
-
- for (int i = startFrom; i < startFrom + 100_000; i++) {
- String key = "key" + i;
-
- if (affinity.isPrimary(ignite.cluster().localNode(), key)) {
- found.add(key);
-
- if (found.size() == cnt)
- return found;
- }
- }
-
- throw new IgniteException("Unable to find " + cnt + " keys as primary for cache.");
- }
- });
+ protected List<String> primaryKeysForCache(IgniteCache<String, Integer> cache, int cnt, int startFrom) {
+ return executeOnLocalOrRemoteJvm(cache, new CheckPrimaryKeysTask(startFrom, cnt));
}
/**
@@ -4272,18 +4140,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
* Checks iterators are cleared.
*/
private void checkIteratorsCleared() {
- for (int j = 0; j < gridCount(); j++) {
- executeOnLocalOrRemoteJvm(j, new TestIgniteIdxRunnable() {
- @Override public void run(int idx) throws Exception {
- GridCacheQueryManager queries = context(idx).queries();
-
- Map map = GridTestUtils.getFieldValue(queries, GridCacheQueryManager.class, "qryIters");
-
- for (Object obj : map.values())
- assertEquals("Iterators not removed for grid " + idx, 0, ((Map)obj).size());
- }
- });
- }
+ for (int j = 0; j < gridCount(); j++)
+ executeOnLocalOrRemoteJvm(j, new CheckIteratorTask());
}
/**
@@ -5226,4 +5084,280 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
/** */
ONE_BY_ONE
}
+
+ /**
+ *
+ */
+ private static class RemoveEntryProcessor implements EntryProcessor<String, Integer, String>, Serializable {
+ /** {@inheritDoc} */
+ @Override public String process(MutableEntry<String, Integer> e, Object... args) {
+ assertNotNull(e.getKey());
+
+ Integer old = e.getValue();
+
+ e.remove();
+
+ return String.valueOf(old);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class IncrementEntryProcessor implements EntryProcessor<String, Integer, String>, Serializable {
+ /** {@inheritDoc} */
+ @Override public String process(MutableEntry<String, Integer> e, Object... args) {
+ assertNotNull(e.getKey());
+
+ Integer old = e.getValue();
+
+ e.setValue(old == null ? 1 : old + 1);
+
+ return String.valueOf(old);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CheckEntriesTask extends TestIgniteIdxRunnable {
+ /** Keys. */
+ private final Collection<String> keys;
+
+ /**
+ * @param keys Keys.
+ */
+ public CheckEntriesTask(Collection<String> keys) {
+ this.keys = keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run(int idx) throws Exception {
+ GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache().context();
+
+ if (ctx.cache().configuration().getMemoryMode() == OFFHEAP_TIERED)
+ return;
+
+ int size = 0;
+
+ for (String key : keys) {
+ if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) {
+ GridCacheEntryEx e =
+ ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);
+
+ assert e != null : "Entry is null [idx=" + idx + ", key=" + key + ", ctx=" + ctx + ']';
+ assert !e.deleted() : "Entry is deleted: " + e;
+
+ size++;
+ }
+ }
+
+ assertEquals("Incorrect size on cache #" + idx, size, ignite.cache(ctx.name()).localSize(ALL));
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CheckCacheSizeTask extends TestIgniteIdxRunnable {
+ private final Map<String, Integer> map;
+
+ /**
+ * @param map Map.
+ */
+ public CheckCacheSizeTask(Map<String, Integer> map) {
+ this.map = map;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run(int idx) throws Exception {
+ GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache().context();
+
+ int size = 0;
+
+ for (String key : map.keySet())
+ if (ctx.affinity().localNode(key, new AffinityTopologyVersion(ctx.discovery().topologyVersion())))
+ size++;
+
+ assertEquals("Incorrect key size on cache #" + idx, size, ignite.cache(ctx.name()).localSize(ALL));
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CheckPrimaryKeysTask implements TestCacheCallable<String, Integer, List<String>> {
+ /** Start from. */
+ private final int startFrom;
+
+ /** Count. */
+ private final int cnt;
+
+ /**
+ * @param startFrom Start from.
+ * @param cnt Count.
+ */
+ public CheckPrimaryKeysTask(int startFrom, int cnt) {
+ this.startFrom = startFrom;
+ this.cnt = cnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<String> call(Ignite ignite, IgniteCache<String, Integer> cache) throws Exception {
+ List<String> found = new ArrayList<>();
+
+ Affinity<Object> affinity = ignite.affinity(cache.getName());
+
+ for (int i = startFrom; i < startFrom + 100_000; i++) {
+ String key = "key" + i;
+
+ if (affinity.isPrimary(ignite.cluster().localNode(), key)) {
+ found.add(key);
+
+ if (found.size() == cnt)
+ return found;
+ }
+ }
+
+ throw new IgniteException("Unable to find " + cnt + " keys as primary for cache.");
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CheckIteratorTask extends TestIgniteIdxCallable<Void> {
+ /**
+ * @param idx Index.
+ */
+ @Override public Void call(int idx) throws Exception {
+ GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache().context();
+ GridCacheQueryManager queries = ctx.queries();
+
+ Map map = GridTestUtils.getFieldValue(queries, GridCacheQueryManager.class, "qryIters");
+
+ for (Object obj : map.values())
+ assertEquals("Iterators not removed for grid " + idx, 0, ((Map)obj).size());
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class RemoveAndReturnNullEntryProcessor implements
+ EntryProcessor<String, Integer, Integer>, Serializable {
+
+ /** {@inheritDoc} */
+ @Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
+ e.remove();
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class SwapEvtsLocalListener implements IgnitePredicate<Event> {
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** Swap events. */
+ private final AtomicInteger swapEvts;
+
+ /** Unswap events. */
+ private final AtomicInteger unswapEvts;
+
+ /**
+ * @param swapEvts Swap events.
+ * @param unswapEvts Unswap events.
+ */
+ public SwapEvtsLocalListener(AtomicInteger swapEvts, AtomicInteger unswapEvts) {
+ this.swapEvts = swapEvts;
+ this.unswapEvts = unswapEvts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(Event evt) {
+ log.info("Received event: " + evt);
+
+ switch (evt.type()) {
+ case EVT_CACHE_OBJECT_SWAPPED:
+ swapEvts.incrementAndGet();
+
+ break;
+ case EVT_CACHE_OBJECT_UNSWAPPED:
+ unswapEvts.incrementAndGet();
+
+ break;
+ }
+
+ return true;
+ }
+ }
+
+ private static class CheckEntriesDeletedTask extends TestIgniteIdxRunnable {
+ private final int cnt;
+
+ public CheckEntriesDeletedTask(int cnt) {
+ this.cnt = cnt;
+ }
+
+ @Override public void run(int idx) throws Exception {
+ for (int i = 0; i < cnt; i++) {
+ String key = String.valueOf(i);
+
+ GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache().context();
+
+ GridCacheEntryEx entry = ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);
+
+ if (ignite.affinity(null).mapKeyToPrimaryAndBackups(key).contains(((IgniteKernal)ignite).localNode())) {
+ assertNotNull(entry);
+ assertTrue(entry.deleted());
+ }
+ else
+ assertNull(entry);
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CheckKeySizeTask extends TestIgniteIdxRunnable {
+ /** Keys. */
+ private final Collection<String> keys;
+
+ /**
+ * @param keys Keys.
+ */
+ public CheckKeySizeTask(Collection<String> keys) {
+ this.keys = keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run(int idx) throws Exception {
+ GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache().context();
+
+ int size = 0;
+
+ for (String key : keys)
+ if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx()))
+ size++;
+
+ assertEquals("Incorrect key size on cache #" + idx, size, ignite.cache(null).localSize(ALL));
+ }
+ }
+
+ /**
+ *
+ */
+ private static class FailedEntryProcessor implements EntryProcessor<String, Integer, Integer>, Serializable {
+ /** {@inheritDoc} */
+ @Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
+ throw new EntryProcessorException("Test entry processor exception.");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNoValueClassOnServerNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNoValueClassOnServerNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNoValueClassOnServerNodeTest.java
index da694b5..c6ce81e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNoValueClassOnServerNodeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNoValueClassOnServerNodeTest.java
@@ -111,6 +111,7 @@ public class CacheNoValueClassOnServerNodeTest extends GridCommonAbstractTest {
}
},
null,
+ null,
jvmArgs,
cp
);
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java
index 1511c45..927ee62 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -29,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.expiry.Duration;
import javax.cache.expiry.TouchedExpiryPolicy;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -37,6 +39,7 @@ import org.apache.ignite.events.Event;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.LoggerResource;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
@@ -106,9 +109,11 @@ public class GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest extends GridCache
affinityNodes(); // Just to ack cache configuration to log..
- checkKeySize(map.keySet());
+ Set<String> keys = new LinkedHashSet<>(map.keySet());
- checkSize(map.keySet());
+ checkKeySize(keys);
+
+ checkSize(keys);
int fullCacheSize = 0;
@@ -317,24 +322,8 @@ public class GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest extends GridCache
Collection<String> locKeys = new HashSet<>();
for (int i = 0; i < gridCount(); i++) {
- grid(i).events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- info("Received event: " + evt);
-
- switch (evt.type()) {
- case EVT_CACHE_OBJECT_SWAPPED:
- swapEvts.incrementAndGet();
-
- break;
- case EVT_CACHE_OBJECT_UNSWAPPED:
- unswapEvts.incrementAndGet();
-
- break;
- }
-
- return true;
- }
- }, EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED);
+ grid(i).events().localListen(
+ new LocalListener(swapEvts, unswapEvts), EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED);
}
cache.localEvict(Collections.singleton(k2));
@@ -416,4 +405,46 @@ public class GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest extends GridCache
assertEquals(cnt, swapEvts.get());
assertEquals(cnt, unswapEvts.get());
}
+
+ /**
+ *
+ */
+ private static class LocalListener implements IgnitePredicate<Event> {
+ /** Logger. */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** Swap events. */
+ private final AtomicInteger swapEvts;
+
+ /** Unswap events. */
+ private final AtomicInteger unswapEvts;
+
+ /**
+ * @param swapEvts Swap events.
+ * @param unswapEvts Unswap events.
+ */
+ public LocalListener(AtomicInteger swapEvts, AtomicInteger unswapEvts) {
+ this.swapEvts = swapEvts;
+ this.unswapEvts = unswapEvts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(Event evt) {
+ log.info("Received event: " + evt);
+
+ switch (evt.type()) {
+ case EVT_CACHE_OBJECT_SWAPPED:
+ swapEvts.incrementAndGet();
+
+ break;
+ case EVT_CACHE_OBJECT_UNSWAPPED:
+ unswapEvts.incrementAndGet();
+
+ break;
+ }
+
+ return true;
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
index c04bf2e..a2440e2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
@@ -20,11 +20,14 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteAtomicLong;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.Affinity;
@@ -36,6 +39,8 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -93,7 +98,7 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti
atomicClockModeDelay(c0);
- c1.removeAll(putMap.keySet());
+ c1.removeAll(new HashSet<>(putMap.keySet()));
for (int i = 0; i < size; i++) {
assertNull(c0.get(i));
@@ -159,22 +164,8 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti
info("Finished putting value [i=" + i + ']');
}
- for (int i = 0; i < gridCount(); i++) {
- executeOnLocalOrRemoteJvm(i, new TestIgniteIdxRunnable() {
- @Override public void run(int idx) throws Exception {
- assertEquals(0, context(idx).tm().idMapSize());
-
- IgniteCache<Object, Object> cache = grid(idx).cache(null);
- ClusterNode node = grid(idx).localNode();
-
- for (int k = 0; k < size; k++) {
- if (affinity(cache).isPrimaryOrBackup(node, k))
- assertEquals("Check failed for node: " + node.id(), k,
- cache.localPeek(k, CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP));
- }
- }
- });
- }
+ for (int i = 0; i < gridCount(); i++)
+ executeOnLocalOrRemoteJvm(i, new CheckAffinityTask(size));
for (int i = 0; i < size; i++) {
info("Putting value 2 [i=" + i + ']');
@@ -199,28 +190,9 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti
final IgniteAtomicLong unswapEvts = grid(0).atomicLong("unswapEvts", 0, true);
- for (int i = 0; i < gridCount(); i++) {
- final int iCopy = i;
-
- grid(i).events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- info("Received event: " + evt);
-
- switch (evt.type()) {
- case EVT_CACHE_OBJECT_SWAPPED:
- grid(iCopy).atomicLong("swapEvts", 0, false).incrementAndGet();
-
- break;
- case EVT_CACHE_OBJECT_UNSWAPPED:
- grid(iCopy).atomicLong("unswapEvts", 0, false).incrementAndGet();
-
- break;
- }
-
- return true;
- }
- }, EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED);
- }
+ for (int i = 0; i < gridCount(); i++)
+ grid(i).events().localListen(
+ new SwapUnswapLocalListener(), EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED);
jcache().put("key", 1);
@@ -254,13 +226,8 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti
boolean nearEnabled = nearEnabled(c);
- if (nearEnabled) {
- executeOnLocalOrRemoteJvm(i, new TestIgniteIdxRunnable() {
- @Override public void run(int idx) throws Exception {
- assertTrue(((IgniteKernal)ignite(idx)).internalCache().context().isNear());
- }
- });
- }
+ if (nearEnabled)
+ executeOnLocalOrRemoteJvm(i, new IsNearTask());
Integer nearPeekVal = nearEnabled ? 1 : null;
@@ -476,4 +443,74 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti
assertFalse(affinity(cache).isPrimaryOrBackup(other, key));
}
}
+
+ /**
+ *
+ */
+ private static class SwapUnswapLocalListener implements IgnitePredicate<Event> {
+ /** Logger. */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** Ignite. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(Event evt) {
+ log.info("Received event: " + evt);
+
+ switch (evt.type()) {
+ case EVT_CACHE_OBJECT_SWAPPED:
+ ignite.atomicLong("swapEvts", 0, false).incrementAndGet();
+
+ break;
+ case EVT_CACHE_OBJECT_UNSWAPPED:
+ ignite.atomicLong("unswapEvts", 0, false).incrementAndGet();
+
+ break;
+ }
+
+ return true;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CheckAffinityTask extends TestIgniteIdxRunnable {
+ /** Size. */
+ private final int size;
+
+ /**
+ * @param size Size.
+ */
+ public CheckAffinityTask(int size) {
+ this.size = size;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run(int idx) throws Exception {
+ assertEquals(0, ((IgniteKernal)ignite).<String, Integer>internalCache().context().tm().idMapSize());
+
+ IgniteCache<Object, Object> cache = ignite.cache(null);
+ ClusterNode node = ((IgniteKernal)ignite).localNode();
+
+ for (int k = 0; k < size; k++) {
+ if (affinity(cache).isPrimaryOrBackup(node, k))
+ assertEquals("Check failed for node: " + node.id(), k,
+ cache.localPeek(k, CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP));
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private static class IsNearTask extends TestIgniteIdxRunnable {
+ /** {@inheritDoc} */
+ @Override public void run(int idx) throws Exception {
+ assertTrue(((IgniteKernal)ignite).internalCache().context().isNear());
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index f54fe06..d133a84 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -73,6 +73,7 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.MarshallerExclusions;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.checkpoint.sharedfs.SharedFsCheckpointSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -1471,6 +1472,8 @@ public abstract class GridAbstractTest extends TestCase {
if (!isMultiJvmObject(ignite))
try {
+ job.setIgnite(ignite);
+
return job.call(idx);
}
catch (Exception e) {
@@ -1532,11 +1535,7 @@ public abstract class GridAbstractTest extends TestCase {
IgniteProcessProxy proxy = (IgniteProcessProxy)ignite;
- return proxy.remoteCompute().call(new IgniteCallable<R>() {
- @Override public R call() throws Exception {
- return job.call(idx);
- }
- });
+ return proxy.remoteCompute().call(new ExecuteRemotelyTask<>(job, idx));
}
/**
@@ -1546,15 +1545,7 @@ public abstract class GridAbstractTest extends TestCase {
* @param job Job.
*/
public static <R> R executeRemotely(IgniteProcessProxy proxy, final TestIgniteCallable<R> job) {
- final UUID id = proxy.getId();
-
- return proxy.remoteCompute().call(new IgniteCallable<R>() {
- @Override public R call() throws Exception {
- Ignite ignite = Ignition.ignite(id);
-
- return job.call(ignite);
- }
- });
+ return proxy.remoteCompute().call(new TestRemoteTask<>(proxy.getId(), job));
}
/**
@@ -1571,6 +1562,8 @@ public abstract class GridAbstractTest extends TestCase {
final String cacheName = cache.getName();
return proxy.remoteCompute().call(new IgniteCallable<R>() {
+ private static final long serialVersionUID = -3868429485920845137L;
+
@Override public R call() throws Exception {
Ignite ignite = Ignition.ignite(id);
IgniteCache<K,V> cache = ignite.cache(cacheName);
@@ -1745,6 +1738,22 @@ public abstract class GridAbstractTest extends TestCase {
}
/**
+ * @param name Name.
+ * @param remote Remote.
+ * @param thisRemote This remote.
+ */
+ public static IgniteEx grid(String name, boolean remote, boolean thisRemote) {
+ if (!remote)
+ return (IgniteEx)G.ignite(name);
+ else {
+ if (thisRemote)
+ return IgniteNodeRunner.startedInstance();
+ else
+ return IgniteProcessProxy.ignite(name);
+ }
+ }
+
+ /**
*
*/
private static interface WriteReplaceOwner {
@@ -1781,6 +1790,67 @@ public abstract class GridAbstractTest extends TestCase {
}
/**
+ * Remote computation task.
+ */
+ private static class TestRemoteTask<R> implements IgniteCallable<R> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Node ID. */
+ private final UUID id;
+
+ /** Job. */
+ private final TestIgniteCallable<R> job;
+
+ /**
+ * @param id Id.
+ * @param job Job.
+ */
+ public TestRemoteTask(UUID id, TestIgniteCallable<R> job) {
+ this.id = id;
+ this.job = job;
+ }
+
+ /** {@inheritDoc} */
+ @Override public R call() throws Exception {
+ Ignite ignite = Ignition.ignite(id);
+
+ return job.call(ignite);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ExecuteRemotelyTask<R> implements IgniteCallable<R> {
+ /** Ignite. */
+ @IgniteInstanceResource
+ protected Ignite ignite;
+
+ /** Job. */
+ private final TestIgniteIdxCallable<R> job;
+
+ /** Index. */
+ private final int idx;
+
+ /**
+ * @param job Job.
+ * @param idx Index.
+ */
+ public ExecuteRemotelyTask(TestIgniteIdxCallable<R> job, int idx) {
+ this.job = job;
+ this.idx = idx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public R call() throws Exception {
+ job.setIgnite(ignite);
+
+ return job.call(idx);
+ }
+ }
+
+ /**
* Test counters.
*/
protected class TestCounters {
@@ -1923,17 +1993,27 @@ public abstract class GridAbstractTest extends TestCase {
}
/** */
- public static interface TestIgniteIdxCallable<R> extends Serializable {
+ public static abstract class TestIgniteIdxCallable<R> implements Serializable {
+ @IgniteInstanceResource
+ protected Ignite ignite;
+
+ /**
+ * @param ignite Ignite.
+ */
+ public void setIgnite(Ignite ignite) {
+ this.ignite = ignite;
+ }
+
/**
* @param idx Grid index.
*/
- R call(int idx) throws Exception;
+ protected abstract R call(int idx) throws Exception;
}
/** */
- public abstract static class TestIgniteIdxRunnable implements TestIgniteIdxCallable<Object> {
+ public abstract static class TestIgniteIdxRunnable extends TestIgniteIdxCallable<Void> {
/** {@inheritDoc} */
- @Override public Object call(int idx) throws Exception {
+ @Override public Void call(int idx) throws Exception {
run(idx);
return null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
index eb72252..406318f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
@@ -42,6 +42,9 @@ import org.jetbrains.annotations.Nullable;
* Test resources for injection.
*/
public class IgniteTestResources {
+ /** Marshaller class name. */
+ public static final String MARSH_CLASS_NAME = "test.marshaller.class";
+
/** */
private static final IgniteLogger rootLog = new GridTestLog4jLogger(false);
@@ -230,8 +233,9 @@ public class IgniteTestResources {
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
- public synchronized Marshaller getMarshaller() throws IgniteCheckedException {
- String marshallerName = GridTestProperties.getProperty(GridTestProperties.MARSH_CLASS_NAME);
+ public static synchronized Marshaller getMarshaller() throws IgniteCheckedException {
+ String marshallerName =
+ System.getProperty(MARSH_CLASS_NAME, GridTestProperties.getProperty(GridTestProperties.MARSH_CLASS_NAME));
Marshaller marsh;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 4bcf51e..e4c2129 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -246,10 +246,12 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
protected static <K, V> boolean nearEnabled(final IgniteCache<K,V> cache) {
CacheConfiguration cfg = GridAbstractTest.executeOnLocalOrRemoteJvm(cache,
new TestCacheCallable<K, V, CacheConfiguration>() {
- @Override public CacheConfiguration call(Ignite ignite, IgniteCache<K, V> cache) throws Exception {
- return ((IgniteKernal)ignite).<K, V>internalCache(cache.getName()).context().config();
- }
- });
+ private static final long serialVersionUID = 0L;
+
+ @Override public CacheConfiguration call(Ignite ignite, IgniteCache<K, V> cache) throws Exception {
+ return ((IgniteKernal)ignite).<K, V>internalCache(cache.getName()).context().config();
+ }
+ });
return isNearEnabled(cfg);
}
@@ -285,10 +287,13 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
- protected static <K> void loadAll(Cache<K, ?> cache, final Set<K> keys, final boolean replaceExistingValues) throws Exception {
+ protected static <K> void loadAll(Cache<K, ?> cache, final Set<K> keys, final boolean replaceExistingValues)
+ throws Exception {
IgniteCache<K, Object> cacheCp = (IgniteCache<K, Object>)cache;
GridAbstractTest.executeOnLocalOrRemoteJvm(cacheCp, new TestCacheRunnable<K, Object>() {
+ private static final long serialVersionUID = -3030833765012500545L;
+
@Override public void run(Ignite ignite, IgniteCache<K, Object> cache) throws Exception {
final AtomicReference<Exception> ex = new AtomicReference<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/AffinityProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/AffinityProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/AffinityProcessProxy.java
index e1959e5..57fbcfc 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/AffinityProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/AffinityProcessProxy.java
@@ -19,12 +19,12 @@ package org.apache.ignite.testframework.junits.multijvm;
import java.util.Collection;
import java.util.Map;
-import java.util.UUID;
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCompute;
-import org.apache.ignite.Ignition;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.jetbrains.annotations.Nullable;
/**
@@ -38,160 +38,388 @@ public class AffinityProcessProxy<K> implements Affinity<K> {
/** Cache name. */
private final String cacheName;
- /** Grid id. */
- private final UUID gridId;
-
/**
* @param cacheName Cache name.
- * @param proxy Ignite ptocess proxy.
+ * @param proxy Ignite process proxy.
*/
public AffinityProcessProxy(String cacheName, IgniteProcessProxy proxy) {
this.cacheName = cacheName;
- gridId = proxy.getId();
- compute = proxy.remoteCompute();
- }
-
- /**
- * Returns cache instance. Method to be called from closure at another JVM.
- *
- * @return Cache.
- */
- private Affinity<Object> affinity() {
- return Ignition.ignite(gridId).affinity(cacheName);
+ this.compute = proxy.remoteCompute();
}
/** {@inheritDoc} */
@Override public int partitions() {
- return (int)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return affinity().partitions();
- }
- });
+ return compute.call(new PartitionsTask(cacheName));
}
/** {@inheritDoc} */
- @Override public int partition(final K key) {
- return (int)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return affinity().partition(key);
- }
- });
+ @Override public int partition(K key) {
+ return compute.call(new PartitionTask<>(cacheName, key));
}
/** {@inheritDoc} */
- @Override public boolean isPrimary(final ClusterNode n, final K key) {
- return (boolean)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return affinity().isPrimary(n, key);
- }
- });
+ @Override public boolean isPrimary(ClusterNode n, K key) {
+ return compute.call(new PrimaryOrBackupNodeTask<>(cacheName, key, n, true, false));
}
/** {@inheritDoc} */
- @Override public boolean isBackup(final ClusterNode n, final K key) {
- return (boolean)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return affinity().isBackup(n, key);
- }
- });
+ @Override public boolean isBackup(ClusterNode n, K key) {
+ return compute.call(new PrimaryOrBackupNodeTask<>(cacheName, key, n, false, true));
}
/** {@inheritDoc} */
- @Override public boolean isPrimaryOrBackup(final ClusterNode n, final K key) {
- return (boolean)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return affinity().isPrimaryOrBackup(n, key);
- }
- });
+ @Override public boolean isPrimaryOrBackup(ClusterNode n, K key) {
+ return compute.call(new PrimaryOrBackupNodeTask<>(cacheName, key, n, true, true));
}
/** {@inheritDoc} */
- @Override public int[] primaryPartitions(final ClusterNode n) {
- return (int[])compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return affinity().primaryPartitions(n);
- }
- });
+ @Override public int[] primaryPartitions(ClusterNode n) {
+ return compute.call(new GetPartitionsTask(cacheName, n, true, false));
}
/** {@inheritDoc} */
- @Override public int[] backupPartitions(final ClusterNode n) {
- return (int[])compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return affinity().backupPartitions(n);
- }
- });
+ @Override public int[] backupPartitions(ClusterNode n) {
+ return compute.call(new GetPartitionsTask(cacheName, n, false, true));
}
/** {@inheritDoc} */
- @Override public int[] allPartitions(final ClusterNode n) {
- return (int[])compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return affinity().allPartitions(n);
- }
- });
+ @Override public int[] allPartitions(ClusterNode n) {
+ return compute.call(new GetPartitionsTask(cacheName, n, true, true));
}
/** {@inheritDoc} */
- @Override public Object affinityKey(final K key) {
- return compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return affinity().affinityKey(key);
- }
- });
+ @Override public Object affinityKey(K key) {
+ return compute.call(new AffinityKeyTask<>(cacheName, key));
}
/** {@inheritDoc} */
- @Override public Map<ClusterNode, Collection<K>> mapKeysToNodes(final Collection<? extends K> keys) {
- return (Map<ClusterNode, Collection<K>>)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return affinity().mapKeysToNodes(keys);
- }
- });
+ @Override public Map<ClusterNode, Collection<K>> mapKeysToNodes(Collection<? extends K> keys) {
+ return compute.call(new MapKeysToNodesTask<>(cacheName, keys));
}
/** {@inheritDoc} */
- @Nullable @Override public ClusterNode mapKeyToNode(final K key) {
- return (ClusterNode)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return affinity().mapKeyToNode(key);
- }
- });
+ @Nullable @Override public ClusterNode mapKeyToNode(K key) {
+ return compute.call(new MapKeyToNodeTask<>(cacheName, key));
}
/** {@inheritDoc} */
- @Override public Collection<ClusterNode> mapKeyToPrimaryAndBackups(final K key) {
- return (Collection<ClusterNode>)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return affinity().mapKeyToPrimaryAndBackups(key);
- }
- });
+ @Override public Collection<ClusterNode> mapKeyToPrimaryAndBackups(K key) {
+ return compute.call(new MapKeyToPrimaryAndBackupsTask<>(cacheName, key));
}
/** {@inheritDoc} */
- @Override public ClusterNode mapPartitionToNode(final int part) {
- return (ClusterNode)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return affinity().mapPartitionToNode(part);
- }
- });
+ @Override public ClusterNode mapPartitionToNode(int part) {
+ return compute.call(new MapPartitionToNode<>(cacheName, part));
}
/** {@inheritDoc} */
- @Override public Map<Integer, ClusterNode> mapPartitionsToNodes(final Collection<Integer> parts) {
- return (Map<Integer, ClusterNode>)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return affinity().mapPartitionsToNodes(parts);
- }
- });
+ @Override public Map<Integer, ClusterNode> mapPartitionsToNodes(Collection<Integer> parts) {
+ return compute.call(new MapPartitionsToNodes<>(cacheName, parts));
}
/** {@inheritDoc} */
- @Override public Collection<ClusterNode> mapPartitionToPrimaryAndBackups(final int part) {
- return (Collection<ClusterNode>)compute.call(new IgniteCallable<Object>() {
- @Override public Object call() throws Exception {
- return affinity().mapPartitionToPrimaryAndBackups(part);
- }
- });
+ @Override public Collection<ClusterNode> mapPartitionToPrimaryAndBackups(int part) {
+ return compute.call(new MapPartitionsToPrimaryAndBackupsTask<>(cacheName, part));
+ }
+
+ /**
+ *
+ */
+ private static class PrimaryOrBackupNodeTask<K> extends AffinityTaskAdapter<K, Boolean> {
+ /** Key. */
+ private final K key;
+
+ /** Node. */
+ private final ClusterNode n;
+
+ /** Primary. */
+ private final boolean primary;
+
+ /** Backup. */
+ private final boolean backup;
+
+ /**
+ * @param cacheName Cache name.
+ * @param key Key.
+ * @param n N.
+ */
+ public PrimaryOrBackupNodeTask(String cacheName, K key, ClusterNode n,
+ boolean primary, boolean backup) {
+ super(cacheName);
+ this.key = key;
+ this.n = n;
+ this.primary = primary;
+ this.backup = backup;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean call() throws Exception {
+ if (primary && backup)
+ return affinity().isPrimaryOrBackup(n, key);
+ else if (primary)
+ return affinity().isPrimary(n, key);
+ else if (backup)
+ return affinity().isBackup(n, key);
+ else
+ throw new IllegalStateException("primary or backup or both flags should be switched on");
+ }
+ }
+
+ /**
+ *
+ */
+ private static class MapKeyToPrimaryAndBackupsTask<K> extends AffinityTaskAdapter<K, Collection<ClusterNode>> {
+ /** Key. */
+ private final K key;
+
+ /**
+ * @param cacheName Cache name.
+ * @param key Key.
+ */
+ public MapKeyToPrimaryAndBackupsTask(String cacheName, K key) {
+ super(cacheName);
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<ClusterNode> call() throws Exception {
+ return affinity().mapKeyToPrimaryAndBackups(key);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class PartitionsTask extends AffinityTaskAdapter<Void, Integer> {
+ /**
+ * @param cacheName Cache name.
+ */
+ public PartitionsTask(String cacheName) {
+ super(cacheName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Integer call() throws Exception {
+ return affinity().partitions();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class PartitionTask<K> extends AffinityTaskAdapter<K, Integer> {
+ /** Key. */
+ private final K key;
+
+ /**
+ * @param cacheName Cache name.
+ * @param key Key.
+ */
+ public PartitionTask(String cacheName, K key) {
+ super(cacheName);
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Integer call() throws Exception {
+ return affinity().partition(key);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class GetPartitionsTask extends AffinityTaskAdapter<Void, int[]> {
+ /** Node. */
+ private final ClusterNode n;
+
+ /** Primary. */
+ private final boolean primary;
+
+ /** Backup. */
+ private final boolean backup;
+
+ /**
+ * @param cacheName Cache name.
+ * @param n N.
+ * @param primary Primary.
+ * @param backup Backup.
+ */
+ public GetPartitionsTask(String cacheName, ClusterNode n, boolean primary, boolean backup) {
+ super(cacheName);
+ this.n = n;
+ this.primary = primary;
+ this.backup = backup;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int[] call() throws Exception {
+ if (primary && backup)
+ return affinity().allPartitions(n);
+ else if (primary)
+ return affinity().primaryPartitions(n);
+ else if (backup)
+ return affinity().backupPartitions(n);
+ else
+ throw new IllegalStateException("primary or backup or both flags should be switched on");
+ }
+ }
+
+ /**
+ *
+ */
+ private static class AffinityKeyTask<K> extends AffinityTaskAdapter<K, Object> {
+ /** Key. */
+ private final K key;
+
+ /**
+ * @param cacheName Cache name.
+ * @param key Key.
+ */
+ public AffinityKeyTask(String cacheName, K key) {
+ super(cacheName);
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object call() throws Exception {
+ return affinity().affinityKey(key);
+ }
+ }
+
+ /**
+ * @param <K>
+ */
+ private static class MapKeysToNodesTask<K> extends AffinityTaskAdapter<K, Map<ClusterNode, Collection<K>>> {
+ /** Keys. */
+ private final Collection<? extends K> keys;
+
+ /**
+ * @param cacheName Cache name.
+ * @param keys Keys.
+ */
+ public MapKeysToNodesTask(String cacheName, Collection<? extends K> keys) {
+ super(cacheName);
+ this.keys = keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<ClusterNode, Collection<K>> call() throws Exception {
+ return affinity().mapKeysToNodes(keys);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class MapKeyToNodeTask<K> extends AffinityTaskAdapter<K, ClusterNode> {
+ /** Key. */
+ private final K key;
+
+ /**
+ * @param cacheName Cache name.
+ * @param key Key.
+ */
+ public MapKeyToNodeTask(String cacheName, K key) {
+ super(cacheName);
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterNode call() throws Exception {
+ return affinity().mapKeyToNode(key);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class MapPartitionToNode<K> extends AffinityTaskAdapter<K, ClusterNode> {
+ /** Partition. */
+ private final int part;
+
+ /**
+ * @param cacheName Cache name.
+ * @param part Partition.
+ */
+ public MapPartitionToNode(String cacheName, int part) {
+ super(cacheName);
+ this.part = part;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterNode call() throws Exception {
+ return affinity().mapPartitionToNode(part);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class MapPartitionsToNodes<K> extends AffinityTaskAdapter<K, Map<Integer, ClusterNode>> {
+ /** Parts. */
+ private final Collection<Integer> parts;
+
+ /**
+ * @param cacheName Cache name.
+ * @param parts Parts.
+ */
+ public MapPartitionsToNodes(String cacheName, Collection<Integer> parts) {
+ super(cacheName);
+ this.parts = parts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<Integer, ClusterNode> call() throws Exception {
+ return affinity().mapPartitionsToNodes(parts);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class MapPartitionsToPrimaryAndBackupsTask<K> extends AffinityTaskAdapter<K, Collection<ClusterNode>> {
+ /** Partition. */
+ private final int part;
+
+ /**
+ * @param cacheName Cache name.
+ * @param part Partition.
+ */
+ public MapPartitionsToPrimaryAndBackupsTask(String cacheName, int part) {
+ super(cacheName);
+ this.part = part;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<ClusterNode> call() throws Exception {
+ return affinity().mapPartitionToPrimaryAndBackups(part);
+ }
+ }
+
+ /**
+ *
+ */
+ private abstract static class AffinityTaskAdapter<K, R> implements IgniteCallable<R> {
+ /** Ignite. */
+ @IgniteInstanceResource
+ protected Ignite ignite;
+
+ /** Cache name. */
+ protected final String cacheName;
+
+ /**
+ * @param cacheName Cache name.
+ */
+ public AffinityTaskAdapter(String cacheName) {
+ this.cacheName = cacheName;
+ }
+
+ /**
+ * @return Affinity.
+ */
+ protected Affinity<K> affinity() {
+ return ignite.affinity(cacheName);
+ }
}
}
\ No newline at end of file