You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2017/04/18 10:00:50 UTC
[01/19] ignite git commit: IGNITE-3827: Removed double marshalling of
keys in DataStreamerImpl.addData(Map) method.
Repository: ignite
Updated Branches:
refs/heads/master b2fb9be1d -> ea9a9dda7
IGNITE-3827: Removed double marshalling of keys in DataStreamerImpl.addData(Map) method.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8c56e451
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8c56e451
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8c56e451
Branch: refs/heads/master
Commit: 8c56e4516e55c7ed9c14779f0e77e00f055d9a81
Parents: 12fd497
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Sep 2 18:05:16 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Sep 2 18:25:49 2016 +0300
----------------------------------------------------------------------
.../datastreamer/DataStreamerImpl.java | 23 ++++++++------------
1 file changed, 9 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c56e451/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index e565cba..a3bae24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -87,7 +87,6 @@ import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
@@ -513,23 +512,19 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
activeFuts.add(resFut);
- Collection<KeyCacheObject> keys = null;
+ Collection<KeyCacheObject> keys =
+ new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1);
- if (entries.size() > 1) {
- keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1);
+ Collection<DataStreamerEntry> entries0 = new ArrayList<>(entries.size());
- for (Map.Entry<K, V> entry : entries)
- keys.add(cacheObjProc.toCacheKeyObject(cacheObjCtx, null, entry.getKey(), true));
- }
+ for (Map.Entry<K, V> entry : entries) {
+ KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, null, entry.getKey(), true);
+ CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx, entry.getValue(), true);
- Collection<? extends DataStreamerEntry> entries0 = F.viewReadOnly(entries, new C1<Entry<K, V>, DataStreamerEntry>() {
- @Override public DataStreamerEntry apply(Entry<K, V> e) {
- KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, null, e.getKey(), true);
- CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx, e.getValue(), true);
+ keys.add(key);
- return new DataStreamerEntry(key, val);
- }
- });
+ entries0.add(new DataStreamerEntry(key, val));
+ }
load0(entries0, resFut, keys, 0);
[15/19] ignite git commit: Fix missing test resource directory for
org.apache.ignite.spi.discovery.tcp
.TcpDiscoveryNodeAttributesUpdateOnReconnectTest.testReconnect
Posted by nt...@apache.org.
Fix missing test resource directory for
org.apache.ignite.spi.discovery.tcp
.TcpDiscoveryNodeAttributesUpdateOnReconnectTest.testReconnect
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/02b19426
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/02b19426
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/02b19426
Branch: refs/heads/master
Commit: 02b194268071b179d291b28472cef5d587e7558a
Parents: 89e9dbe
Author: Alexander Fedotov <al...@gmail.com>
Authored: Tue Apr 11 12:00:59 2017 +0300
Committer: Alexander Fedotov <al...@gmail.com>
Committed: Tue Apr 11 12:00:59 2017 +0300
----------------------------------------------------------------------
modules/core/pom.xml | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/02b19426/modules/core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/core/pom.xml b/modules/core/pom.xml
index f12cdfa..f786a27 100644
--- a/modules/core/pom.xml
+++ b/modules/core/pom.xml
@@ -234,6 +234,9 @@
<exclude>**/*.java</exclude>
</excludes>
</testResource>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
</testResources>
<plugins>
[14/19] ignite git commit: Fix
org.apache.ignite.internal.processors.cache.expiry
.IgniteCacheExpiryPolicyAbstractTest#testNearExpiresWithCacheStore
Posted by nt...@apache.org.
Fix org.apache.ignite.internal.processors.cache.expiry
.IgniteCacheExpiryPolicyAbstractTest#testNearExpiresWithCacheStore
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/89e9dbe4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/89e9dbe4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/89e9dbe4
Branch: refs/heads/master
Commit: 89e9dbe484312c251f02c9fbe9698c3ac2e03df8
Parents: 4fce280
Author: Alexander Fedotov <al...@gmail.com>
Authored: Mon Apr 10 16:36:33 2017 +0300
Committer: Alexander Fedotov <al...@gmail.com>
Committed: Mon Apr 10 16:36:33 2017 +0300
----------------------------------------------------------------------
.../cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/89e9dbe4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index 3339f65..41c915b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -1061,6 +1061,9 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
* @throws Exception If failed.
*/
public void testNearExpiresWithCacheStore() throws Exception {
+ if(cacheMode() != PARTITIONED)
+ return;
+
factory = CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 1));
nearCache = true;
[07/19] ignite git commit: IGNITE-4200: Added copying of the C++
binaries.
Posted by nt...@apache.org.
IGNITE-4200: Added copying of the C++ binaries.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ab3b2926
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ab3b2926
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ab3b2926
Branch: refs/heads/master
Commit: ab3b2926213291a45305c21e7efe066855c1342f
Parents: 1859857
Author: Igor Sapego <is...@gridgain.com>
Authored: Tue Mar 21 17:47:09 2017 +0300
Committer: Igor Sapego <is...@gridgain.com>
Committed: Tue Mar 21 17:47:09 2017 +0300
----------------------------------------------------------------------
assembly/release-fabric-base.xml | 6 ++++++
1 file changed, 6 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab3b2926/assembly/release-fabric-base.xml
----------------------------------------------------------------------
diff --git a/assembly/release-fabric-base.xml b/assembly/release-fabric-base.xml
index 7b3d8cf..7f05c30 100644
--- a/assembly/release-fabric-base.xml
+++ b/assembly/release-fabric-base.xml
@@ -174,6 +174,12 @@
<outputDirectory>/platforms/cpp/docs</outputDirectory>
</fileSet>
+ <!-- Move CPP binaries. -->
+ <fileSet>
+ <directory>modules/platforms/cpp/bin</directory>
+ <outputDirectory>/platforms/cpp/bin</outputDirectory>
+ </fileSet>
+
<!-- Other files. -->
<fileSet>
<directory>bin</directory>
[05/19] ignite git commit: Minors
Posted by nt...@apache.org.
Minors
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fe424591
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fe424591
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fe424591
Branch: refs/heads/master
Commit: fe424591161595de1151a29cf1cdeb50456c3e39
Parents: 09a3922
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Thu Jan 12 17:43:21 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Thu Jan 12 17:43:21 2017 +0300
----------------------------------------------------------------------
.../IgniteCacheExpiryPolicyAbstractTest.java | 42 ++++++++++++++++++++
1 file changed, 42 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/fe424591/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index 794519a..4368a15 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -36,24 +36,30 @@ import javax.cache.expiry.ExpiryPolicy;
import javax.cache.expiry.ModifiedExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCacheTestStore;
import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.PAX;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
@@ -1003,6 +1009,42 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
}
/**
+ * @throws Exception If failed.
+ */
+ public void testNearExpiresWithCacheStore() throws Exception {
+ factory = CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 1));
+
+ nearCache = true;
+
+ startGridsMultiThreaded(gridCount());
+
+ IgniteConfiguration clientCfg = getConfiguration("client").setClientMode(true);
+
+ ((TcpDiscoverySpi)clientCfg.getDiscoverySpi()).setForceServerMode(false);
+
+ Ignite client = startGrid("client", clientCfg);
+
+ CacheConfiguration ccfg = cacheConfiguration("testCache");
+
+// ccfg.setExpiryPolicyFactory( CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 1)));
+
+ IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg);
+
+ Integer key = 1;
+
+ cache.put(key, 1);
+
+ assertEquals(1, cache.localPeek(key, CachePeekMode.NEAR));
+ assertEquals(1, cache.get(key));
+
+ waitExpired(key);
+
+ for(int i = 0; i < gridCount(); i++)
+ assertNull(jcache(i).localPeek(key, CachePeekMode.BACKUP, CachePeekMode.PRIMARY));
+
+ assertEquals(null, cache.get(key));
+ }
+ /**
* @return Test keys.
* @throws Exception If failed.
*/
[16/19] ignite git commit: Fix
org.apache.ignite.internal.processors.cache.expiry
.IgniteCacheExpiryPolicyAbstractTest#testNearExpiresWithCacheStore
Posted by nt...@apache.org.
Fix org.apache.ignite.internal.processors.cache.expiry
.IgniteCacheExpiryPolicyAbstractTest#testNearExpiresWithCacheStore
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/20016a20
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/20016a20
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/20016a20
Branch: refs/heads/master
Commit: 20016a20f780eb3c21f249d3cb74d08018c4eea5
Parents: 02b1942
Author: Alexander Fedotov <al...@gmail.com>
Authored: Tue Apr 11 14:54:06 2017 +0300
Committer: Alexander Fedotov <al...@gmail.com>
Committed: Tue Apr 11 14:54:06 2017 +0300
----------------------------------------------------------------------
.../cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/20016a20/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index 41c915b..e6737e4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -1087,9 +1087,11 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
cache.put(key, 1);
- assertEquals(1, cache.localPeek(key, CachePeekMode.NEAR));
+ // Make entry cached in client NearCache.
assertEquals(1, cache.get(key));
+ assertEquals(1, cache.localPeek(key, CachePeekMode.NEAR));
+
waitExpired(key);
for(int i = 0; i < gridCount(); i++)
[04/19] ignite git commit: IGNITE-4424 REPLICATED cache isn't synced
across nodes
Posted by nt...@apache.org.
IGNITE-4424 REPLICATED cache isn't synced across nodes
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/71412cec
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/71412cec
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/71412cec
Branch: refs/heads/master
Commit: 71412cecd861119965a873520da96078f99c94e2
Parents: 8dd4ada
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Dec 30 13:41:34 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Dec 30 13:41:34 2016 +0300
----------------------------------------------------------------------
.../GridNearAtomicAbstractUpdateFuture.java | 34 ++-
.../GridNearAtomicSingleUpdateFuture.java | 44 ++--
.../dht/atomic/GridNearAtomicUpdateFuture.java | 57 ++---
.../AtomicPutAllChangingTopologyTest.java | 212 +++++++++++++++++++
.../IgniteCacheFailoverTestSuite.java | 3 +
5 files changed, 284 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/71412cec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 2fbabaa..c92e0f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -212,14 +212,18 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
// Cannot remap.
remapCnt = 1;
- map(topVer);
+ GridCacheVersion futVer = addAtomicFuture(topVer);
+
+ if (futVer != null)
+ map(topVer, futVer);
}
}
/**
* @param topVer Topology version.
+ * @param futVer Future version
*/
- protected abstract void map(AffinityTopologyVersion topVer);
+ protected abstract void map(AffinityTopologyVersion topVer, GridCacheVersion futVer);
/**
* Maps future on ready topology.
@@ -302,7 +306,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
* @param req Request.
* @param e Error.
*/
- protected void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException e) {
+ protected final void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException e) {
synchronized (mux) {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
req.nodeId(),
@@ -314,4 +318,28 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
onResult(req.nodeId(), res, true);
}
}
+
+ /**
+ * Adds future prevents topology change before operation complete.
+ * Should be invoked before topology lock released.
+ *
+ * @param topVer Topology version.
+ * @return Future version in case future added.
+ */
+ protected final GridCacheVersion addAtomicFuture(AffinityTopologyVersion topVer) {
+ GridCacheVersion futVer = cctx.versions().next(topVer);
+
+ synchronized (mux) {
+ assert this.futVer == null : this;
+ assert this.topVer == AffinityTopologyVersion.ZERO : this;
+
+ this.topVer = topVer;
+ this.futVer = futVer;
+ }
+
+ if (storeFuture() && !cctx.mvcc().addAtomicFuture(futVer, this))
+ return null;
+
+ return futVer;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/71412cec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index bd231cf..7376aff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -348,14 +348,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
@Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
cctx.kernalContext().closure().runLocalSafe(new Runnable() {
@Override public void run() {
- try {
- AffinityTopologyVersion topVer = fut.get();
-
- map(topVer);
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
+ mapOnTopology();
}
});
}
@@ -388,7 +381,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
@Override protected void mapOnTopology() {
cache.topology().readLock();
- AffinityTopologyVersion topVer = null;
+ AffinityTopologyVersion topVer;
+
+ GridCacheVersion futVer;
try {
if (cache.topology().stopping()) {
@@ -410,6 +405,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
}
topVer = fut.topologyVersion();
+
+ futVer = addAtomicFuture(topVer);
}
else {
if (waitTopFut) {
@@ -435,11 +432,12 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
cache.topology().readUnlock();
}
- map(topVer);
+ if (futVer != null)
+ map(topVer, futVer);
}
/** {@inheritDoc} */
- protected void map(AffinityTopologyVersion topVer) {
+ @Override protected void map(AffinityTopologyVersion topVer, GridCacheVersion futVer) {
Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
if (F.isEmpty(topNodes)) {
@@ -449,11 +447,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
return;
}
- Exception err = null;
- GridNearAtomicAbstractUpdateRequest singleReq0 = null;
-
- GridCacheVersion futVer = cctx.versions().next(topVer);
-
GridCacheVersion updVer;
// Assign version on near node in CLOCK ordering mode even if fastMap is false.
@@ -470,16 +463,17 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
else
updVer = null;
+ Exception err = null;
+ GridNearAtomicAbstractUpdateRequest singleReq0 = null;
+
try {
singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
synchronized (mux) {
- assert this.futVer == null : this;
- assert this.topVer == AffinityTopologyVersion.ZERO : this;
+ assert this.futVer == futVer || (this.isDone() && this.error() != null);
+ assert this.topVer == topVer;
- this.topVer = topVer;
this.updVer = updVer;
- this.futVer = futVer;
resCnt = 0;
@@ -496,14 +490,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
return;
}
- if (storeFuture()) {
- if (!cctx.mvcc().addAtomicFuture(futVer, this)) {
- assert isDone() : this;
-
- return;
- }
- }
-
// Optimize mapping for single key.
mapSingle(singleReq0.nodeId(), singleReq0);
}
@@ -511,7 +497,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
/**
* @return Future version.
*/
- GridCacheVersion onFutureDone() {
+ private GridCacheVersion onFutureDone() {
GridCacheVersion ver0;
GridFutureAdapter<Void> fut0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/71412cec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index cd64117..950e5bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -456,14 +456,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
@Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
cctx.kernalContext().closure().runLocalSafe(new Runnable() {
@Override public void run() {
- try {
- AffinityTopologyVersion topVer = fut.get();
-
- map(topVer, remapKeys);
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
+ mapOnTopology();
}
});
}
@@ -497,7 +490,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
@Override protected void mapOnTopology() {
cache.topology().readLock();
- AffinityTopologyVersion topVer = null;
+ AffinityTopologyVersion topVer;
+
+ GridCacheVersion futVer;
try {
if (cache.topology().stopping()) {
@@ -519,6 +514,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
topVer = fut.topologyVersion();
+
+ futVer = addAtomicFuture(topVer);
}
else {
if (waitTopFut) {
@@ -544,7 +541,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
cache.topology().readUnlock();
}
- map(topVer, null);
+ if (futVer != null)
+ map(topVer, futVer, remapKeys);
}
/**
@@ -602,15 +600,18 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
/** {@inheritDoc} */
- protected void map(AffinityTopologyVersion topVer) {
- map(topVer, null);
+ @Override protected void map(AffinityTopologyVersion topVer, GridCacheVersion futVer) {
+ map(topVer, futVer, null);
}
/**
* @param topVer Topology version.
+ * @param futVer Future ID.
* @param remapKeys Keys to remap.
*/
- void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
+ void map(AffinityTopologyVersion topVer,
+ GridCacheVersion futVer,
+ @Nullable Collection<KeyCacheObject> remapKeys) {
Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
if (F.isEmpty(topNodes)) {
@@ -620,14 +621,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
return;
}
- Exception err = null;
- GridNearAtomicFullUpdateRequest singleReq0 = null;
- Map<UUID, GridNearAtomicFullUpdateRequest> mappings0 = null;
-
- int size = keys.size();
-
- GridCacheVersion futVer = cctx.versions().next(topVer);
-
GridCacheVersion updVer;
// Assign version on near node in CLOCK ordering mode even if fastMap is false.
@@ -644,6 +637,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
else
updVer = null;
+ Exception err = null;
+ GridNearAtomicFullUpdateRequest singleReq0 = null;
+ Map<UUID, GridNearAtomicFullUpdateRequest> mappings0 = null;
+
+ int size = keys.size();
+
try {
if (size == 1 && !fastMap) {
assert remapKeys == null || remapKeys.size() == 1;
@@ -676,12 +675,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
synchronized (mux) {
- assert this.futVer == null : this;
- assert this.topVer == AffinityTopologyVersion.ZERO : this;
+ assert this.futVer == futVer || (this.isDone() && this.error() != null);
+ assert this.topVer == topVer;
- this.topVer = topVer;
this.updVer = updVer;
- this.futVer = futVer;
resCnt = 0;
@@ -701,14 +698,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
return;
}
- if (storeFuture()) {
- if (!cctx.mvcc().addAtomicFuture(futVer, this)) {
- assert isDone() : this;
-
- return;
- }
- }
-
// Optimize mapping for single key.
if (singleReq0 != null)
mapSingle(singleReq0.nodeId(), singleReq0);
@@ -725,7 +714,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
/**
* @return Future version.
*/
- GridCacheVersion onFutureDone() {
+ private GridCacheVersion onFutureDone() {
GridCacheVersion ver0;
GridFutureAdapter<Void> fut0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/71412cec/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/AtomicPutAllChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/AtomicPutAllChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/AtomicPutAllChangingTopologyTest.java
new file mode 100644
index 0000000..878cb17
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/AtomicPutAllChangingTopologyTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CachePeekMode.BACKUP;
+import static org.apache.ignite.cache.CachePeekMode.PRIMARY;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/** */
+public class AtomicPutAllChangingTopologyTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int NODES_CNT = 3;
+
+ /** */
+ public static final String CACHE_NAME = "test-cache";
+
+ /** */
+ private static final int CACHE_SIZE = 20_000;
+
+ /** */
+ private static volatile CountDownLatch FILLED_LATCH;
+
+ /**
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Integer, Integer> cacheConfig() {
+ return new CacheConfiguration<Integer, Integer>()
+ .setAtomicityMode(ATOMIC)
+ .setCacheMode(REPLICATED)
+ .setAffinity(new FairAffinityFunction(false, 1))
+ .setWriteSynchronizationMode(FULL_SYNC)
+ .setRebalanceMode(SYNC)
+ .setName(CACHE_NAME);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAllOnChangingTopology() throws Exception {
+ List<IgniteInternalFuture> futs = new LinkedList<>();
+
+ for (int i = 1; i < NODES_CNT; i++)
+ futs.add(startNodeAsync(i));
+
+ futs.add(startSeedNodeAsync());
+
+ boolean failed = false;
+
+ for (IgniteInternalFuture fut : futs) {
+ try {
+ fut.get();
+ }
+ catch (Throwable th) {
+ log.error("Check failed.", th);
+
+ failed = true;
+ }
+ }
+
+ if (failed)
+ throw new RuntimeException("Test Failed.");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ FILLED_LATCH = new CountDownLatch(1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @return Future.
+ * @throws IgniteCheckedException If failed.
+ */
+ private IgniteInternalFuture startSeedNodeAsync() throws IgniteCheckedException {
+ return GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Boolean call() throws Exception {
+ Ignite node = startGrid(0);
+
+ log.info("Creating cache.");
+
+ IgniteCache<Integer, Integer> cache = node.getOrCreateCache(cacheConfig());
+
+ log.info("Created cache.");
+
+ Map<Integer, Integer> data = new HashMap<>(CACHE_SIZE);
+
+ for (int i = 0; i < CACHE_SIZE; i++)
+ data.put(i, i);
+
+ log.info("Filling.");
+
+ cache.putAll(data);
+
+ log.info("Filled.");
+
+ FILLED_LATCH.countDown();
+
+ checkCacheState(node, cache);
+
+ return true;
+ }
+ });
+ }
+
+ /**
+ * @param nodeId Node index.
+ * @return Future.
+ * @throws IgniteCheckedException If failed.
+ */
+ private IgniteInternalFuture startNodeAsync(final int nodeId) throws IgniteCheckedException {
+ return GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Boolean call() throws Exception {
+ Ignite node = startGrid(nodeId);
+
+ log.info("Getting cache.");
+
+ IgniteCache<Integer, Integer> cache = node.getOrCreateCache(cacheConfig());
+
+ log.info("Got cache.");
+
+ FILLED_LATCH.await();
+
+ log.info("Got Filled.");
+
+ cache.put(1, nodeId);
+
+ checkCacheState(node, cache);
+
+ return true;
+ }
+ });
+ }
+
+ /**
+ * @param node Node.
+ * @param cache Cache.
+ * @throws Exception If failed.
+ */
+ private void checkCacheState(Ignite node, IgniteCache<Integer, Integer> cache) throws Exception {
+ int locSize = cache.localSize(PRIMARY, BACKUP);
+ int locSize2 = -1;
+
+ if (locSize != CACHE_SIZE) {
+ U.sleep(5000);
+
+ // Rechecking.
+ locSize2 = cache.localSize(PRIMARY, BACKUP);
+ }
+
+ assertEquals("Wrong cache size on node [node=" + node.configuration().getGridName() +
+ ", expected= " + CACHE_SIZE +
+ ", actual=" + locSize +
+ ", actual2=" + locSize2 + "]",
+ locSize, CACHE_SIZE);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/71412cec/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
index c9e507d..5bc6729 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtR
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheTxNodeFailureSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridNearCacheTxNodeFailureSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteAtomicLongChangingTopologySelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.AtomicPutAllChangingTopologyTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridCacheAtomicClientInvalidPartitionHandlingSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridCacheAtomicClientRemoveFailureTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridCacheAtomicInvalidPartitionHandlingSelfTest;
@@ -95,6 +96,8 @@ public class IgniteCacheFailoverTestSuite extends TestSuite {
suite.addTestSuite(GridCacheTxNodeFailureSelfTest.class);
suite.addTestSuite(GridNearCacheTxNodeFailureSelfTest.class);
+ suite.addTestSuite(AtomicPutAllChangingTopologyTest.class);
+
return suite;
}
}
[11/19] ignite git commit: Merge branch ignite-1.6.12 into
ignite-1.8.5-p1
Posted by nt...@apache.org.
Merge branch ignite-1.6.12 into ignite-1.8.5-p1
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6954ff0c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6954ff0c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6954ff0c
Branch: refs/heads/master
Commit: 6954ff0c85f2f75507ee0bd306c879f490b4201a
Parents: d81548d ab3b292
Author: Alexander Fedotov <al...@gmail.com>
Authored: Fri Apr 7 15:44:48 2017 +0300
Committer: Alexander Fedotov <al...@gmail.com>
Committed: Fri Apr 7 15:44:48 2017 +0300
----------------------------------------------------------------------
----------------------------------------------------------------------
[10/19] ignite git commit: Merge branch ignite-1.6.6-p1 into
ignite-1.8.5-p1
Posted by nt...@apache.org.
Merge branch ignite-1.6.6-p1 into ignite-1.8.5-p1
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d81548d3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d81548d3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d81548d3
Branch: refs/heads/master
Commit: d81548d3a4e384e1a9b4adacf1fb487444bbfd33
Parents: 3be4e00 7b0d232
Author: Alexander Fedotov <al...@gmail.com>
Authored: Fri Apr 7 15:33:08 2017 +0300
Committer: Alexander Fedotov <al...@gmail.com>
Committed: Fri Apr 7 15:33:08 2017 +0300
----------------------------------------------------------------------
----------------------------------------------------------------------
[08/19] ignite git commit: ignite-2913 - SQL: EXISTS support added
Posted by nt...@apache.org.
ignite-2913 - SQL: EXISTS support added
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ae435fb5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ae435fb5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ae435fb5
Branch: refs/heads/master
Commit: ae435fb5474429e7ac76faadd680c19a4c657371
Parents: 2e6bc44
Author: Sergi Vladykin <se...@gmail.com>
Authored: Sun Apr 2 15:35:04 2017 +0300
Committer: Alexander Fedotov <al...@gmail.com>
Committed: Wed Apr 5 21:15:24 2017 +0300
----------------------------------------------------------------------
.../query/h2/sql/GridSqlOperationType.java | 23 ++++-
.../query/h2/sql/GridSqlQueryParser.java | 19 +++-
.../query/IgniteSqlSplitterSelfTest.java | 94 +++++++++++++++-----
.../query/h2/sql/GridQueryParsingTest.java | 91 +++++++++++++++----
4 files changed, 183 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae435fb5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java
index 8d31651..9ffabad 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java
@@ -31,7 +31,7 @@ public enum GridSqlOperationType {
MULTIPLY(2, new BiExpressionSqlGenerator("*")),
DIVIDE(2, new BiExpressionSqlGenerator("/")),
MODULUS(2, new BiExpressionSqlGenerator("%")),
- NEGATE(1, new PrefixSqlGenerator("-")),
+ NEGATE(1, new PrefixSqlGenerator("-", true)),
// from org.h2.expression.Comparison
EQUAL(2, new BiExpressionSqlGenerator("=")),
@@ -47,7 +47,7 @@ public enum GridSqlOperationType {
IS_NULL(1, new SuffixSqlGenerator("IS NULL")),
IS_NOT_NULL(1, new SuffixSqlGenerator("IS NOT NULL")),
- NOT(1, new PrefixSqlGenerator("NOT")),
+ NOT(1, new PrefixSqlGenerator("NOT", true)),
// from org.h2.expression.ConditionAndOr
AND(2, new BiExpressionSqlGenerator("AND")),
@@ -58,6 +58,7 @@ public enum GridSqlOperationType {
LIKE(2, new BiExpressionSqlGenerator("LIKE")),
IN(-1, new ConditionInSqlGenerator()),
+ EXISTS(1, new PrefixSqlGenerator("EXISTS", false)),
;
/** */
@@ -145,18 +146,32 @@ public enum GridSqlOperationType {
/** */
private final String text;
+ /** */
+ private final boolean addSpace;
+
/**
* @param text Text.
+ * @param addSpace Add space char after the prefix.
*/
- private PrefixSqlGenerator(String text) {
+ private PrefixSqlGenerator(String text, boolean addSpace) {
this.text = text;
+ this.addSpace = addSpace;
}
/** {@inheritDoc} */
@Override public String getSql(GridSqlOperation operation) {
assert operation.operationType().childrenCnt == 1;
- return '(' + text + ' ' + operation.child().getSQL() + ')';
+ StringBuilder b = new StringBuilder();
+
+ b.append('(').append(text);
+
+ if (addSpace)
+ b.append(' ');
+
+ b.append(operation.child(0).getSQL()).append(')');
+
+ return b.toString();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae435fb5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
index d9c546c..a3e80ec 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
@@ -46,6 +46,7 @@ import org.h2.expression.Alias;
import org.h2.expression.CompareLike;
import org.h2.expression.Comparison;
import org.h2.expression.ConditionAndOr;
+import org.h2.expression.ConditionExists;
import org.h2.expression.ConditionIn;
import org.h2.expression.ConditionInConstantSet;
import org.h2.expression.ConditionInSelect;
@@ -80,6 +81,7 @@ import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlOperatio
import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlOperationType.DIVIDE;
import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlOperationType.EQUAL;
import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlOperationType.EQUAL_NULL_SAFE;
+import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlOperationType.EXISTS;
import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlOperationType.IN;
import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlOperationType.IS_NOT_NULL;
import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlOperationType.IS_NULL;
@@ -186,7 +188,10 @@ public class GridSqlQueryParser {
"compareType");
/** */
- private static final Getter<ConditionInSelect, Query> QUERY = getter(ConditionInSelect.class, "query");
+ private static final Getter<ConditionInSelect, Query> QUERY_IN = getter(ConditionInSelect.class, "query");
+
+ /** */
+ private static final Getter<ConditionExists, Query> QUERY_EXISTS = getter(ConditionExists.class, "query");
/** */
private static final Getter<CompareLike, Expression> LEFT = getter(CompareLike.class, "left");
@@ -854,7 +859,7 @@ public class GridSqlQueryParser {
res.addChild(parseExpression(LEFT_CIS.get((ConditionInSelect)expression), calcTypes));
- Query qry = QUERY.get((ConditionInSelect)expression);
+ Query qry = QUERY_IN.get((ConditionInSelect)expression);
assert0(qry instanceof Select, qry);
@@ -959,6 +964,16 @@ public class GridSqlQueryParser {
return res;
}
+ if (expression instanceof ConditionExists) {
+ Query qry = QUERY_EXISTS.get((ConditionExists)expression);
+
+ GridSqlOperation res = new GridSqlOperation(EXISTS);
+
+ res.addChild(new GridSqlSubquery(parse(qry, null)));
+
+ return res;
+ }
+
throw new IgniteException("Unsupported expression: " + expression + " [type=" +
expression.getClass().getSimpleName() + ']');
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae435fb5/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index e72c9cb..c3f1bd2 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -21,9 +21,11 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import javax.cache.CacheException;
@@ -33,10 +35,8 @@ import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheKeyConfiguration;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
-import org.apache.ignite.cache.affinity.AffinityKeyMapped;
-import org.apache.ignite.cache.affinity.Affinity;
-import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
@@ -63,8 +63,8 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
/** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
CacheKeyConfiguration keyCfg = new CacheKeyConfiguration(TestKey.class.getName(), "affKey");
@@ -81,6 +81,11 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
return cfg;
}
+ @Override
+ protected long getTestTimeout() {
+ return 100_000_000;
+ }
+
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
startGridsMultiThreaded(3, false);
@@ -148,6 +153,47 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
}
}
+ @SuppressWarnings("SuspiciousMethodCalls")
+ public void testExists() {
+ IgniteCache<Integer,Person2> x = ignite(0).getOrCreateCache(cacheConfig("x", true,
+ Integer.class, Person2.class));
+ IgniteCache<Integer,Person2> y = ignite(0).getOrCreateCache(cacheConfig("y", true,
+ Integer.class, Person2.class));
+
+ try {
+ GridRandom rnd = new GridRandom();
+
+ Set<Integer> intersects = new HashSet<>();
+
+ for (int i = 0; i < 3000; i++) {
+ int r = rnd.nextInt(3);
+
+ if (r != 0)
+ x.put(i, new Person2(i, "pers_x_" + i));
+
+ if (r != 1)
+ y.put(i, new Person2(i, "pers_y_" + i));
+
+ if (r == 2)
+ intersects.add(i);
+ }
+
+ assertFalse(intersects.isEmpty());
+
+ List<List<?>> res = x.query(new SqlFieldsQuery("select _key from \"x\".Person2 px " +
+ "where exists(select 1 from \"y\".Person2 py where px._key = py._key)")).getAll();
+
+ assertEquals(intersects.size(), res.size());
+
+ for (List<?> row : res)
+ assertTrue(intersects.contains(row.get(0)));
+ }
+ finally {
+ x.destroy();
+ y.destroy();
+ }
+ }
+
/**
* @throws Exception If failed.
*/
@@ -550,15 +596,15 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
"\"orgRepl\".Organization o",
"where p.affKey = o._key", true);
- checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ",
- "(select * from \"persPart\".Person2) p",
- "\"orgPart\".Organization o",
- "where p._key = o._key", false);
-
- checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ",
- "\"persPart\".Person2 p",
- "(select * from \"orgPart\".Organization) o",
- "where p._key = o._key", false);
+ // TODO Now we can not analyze subqueries to decide if we are collocated or not.
+// checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ",
+// "(select * from \"persPart\".Person2) p",
+// "\"orgPart\".Organization o",
+// "where p._key = o._key", false);
+// checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ",
+// "\"persPart\".Person2 p",
+// "(select * from \"orgPart\".Organization) o",
+// "where p._key = o._key", false);
// Join multiple.
@@ -703,6 +749,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
ignite(0).destroyCache(cache.getName());
}
}
+
/**
* @throws Exception If failed.
*/
@@ -791,26 +838,26 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
false,
0,
select +
- "from " + cache1 + "," + cache2 + " "+ where);
+ "from " + cache1 + "," + cache2 + " " + where);
checkQueryPlan(cache,
false,
0,
select +
- "from " + cache2 + "," + cache1 + " "+ where);
+ "from " + cache2 + "," + cache1 + " " + where);
if (testEnforceJoinOrder) {
checkQueryPlan(cache,
true,
0,
select +
- "from " + cache1 + "," + cache2 + " "+ where);
+ "from " + cache1 + "," + cache2 + " " + where);
checkQueryPlan(cache,
true,
0,
select +
- "from " + cache2 + "," + cache1 + " "+ where);
+ "from " + cache2 + "," + cache1 + " " + where);
}
}
@@ -825,7 +872,8 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
boolean enforceJoinOrder,
int expBatchedJoins,
String sql,
- String...expText) {
+ String...expText
+ ) {
checkQueryPlan(cache,
enforceJoinOrder,
expBatchedJoins,
@@ -850,13 +898,13 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
boolean enforceJoinOrder,
int expBatchedJoins,
SqlFieldsQuery qry,
- String...expText) {
+ String... expText) {
qry.setEnforceJoinOrder(enforceJoinOrder);
qry.setDistributedJoins(true);
String plan = queryPlan(cache, qry);
- log.info("Plan: " + plan);
+ log.info("\n Plan:\n" + plan);
assertEquals("Unexpected number of batched joins in plan [plan=" + plan + ", qry=" + qry + ']',
expBatchedJoins,
@@ -986,7 +1034,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
* @param args Arguments.
* @return Column as list.
*/
- private static <X> List<X> columnQuery(IgniteCache<?,?> c, String qry, Object... args) {
+ private static <X> List<X> columnQuery(IgniteCache<?, ?> c, String qry, Object... args) {
return column(0, c.query(new SqlFieldsQuery(qry).setArgs(args)).getAll());
}
@@ -1584,4 +1632,4 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
@QuerySqlField
private int goodId;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae435fb5/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
index 537ccdf..ecdb593 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
@@ -39,12 +39,8 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.h2.command.Prepared;
-import org.h2.command.dml.Query;
-import org.h2.command.dml.Update;
import org.h2.engine.Session;
-import org.h2.expression.Expression;
import org.h2.jdbc.JdbcConnection;
-import org.h2.util.StringUtils;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -57,12 +53,18 @@ public class GridQueryParsingTest extends GridCommonAbstractTest {
private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
/** */
+ private static final String TEST_SCHEMA = "SCH";
+
+ /** */
+ private static final String TEST_CACHE = "my-cache";
+
+ /** */
private static Ignite ignite;
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration c = super.getConfiguration(gridName);
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration c = super.getConfiguration(igniteInstanceName);
TcpDiscoverySpi disco = new TcpDiscoverySpi();
@@ -73,12 +75,14 @@ public class GridQueryParsingTest extends GridCommonAbstractTest {
// Cache.
CacheConfiguration cc = defaultCacheConfiguration();
+ cc.setName(TEST_CACHE);
cc.setCacheMode(CacheMode.PARTITIONED);
cc.setAtomicityMode(CacheAtomicityMode.ATOMIC);
cc.setNearConfiguration(null);
cc.setWriteSynchronizationMode(FULL_SYNC);
cc.setRebalanceMode(SYNC);
cc.setSwapEnabled(false);
+ cc.setSqlSchema(TEST_SCHEMA);
cc.setSqlFunctionClasses(GridQueryParsingTest.class);
cc.setIndexedTypes(
String.class, Address.class,
@@ -110,6 +114,12 @@ public class GridQueryParsingTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testParseSelectAndUnion() throws Exception {
+ checkQuery("select 1 from Person p where addrIds in ((1,2,3), (3,4,5))");
+ checkQuery("select 1 from Person p where addrId in ((1,))");
+ checkQuery("select 1 from Person p " +
+ "where p.addrId in (select a.id from Address a)");
+ checkQuery("select 1 from Person p " +
+ "where exists(select 1 from Address a where p.addrId = a.id)");
checkQuery("select 42");
checkQuery("select ()");
checkQuery("select (1)");
@@ -244,16 +254,16 @@ public class GridQueryParsingTest extends GridCommonAbstractTest {
checkQuery("select street from Person p, (select a.street from Address a where a.street is not null) ");
checkQuery("select addr.street from Person p, (select a.street from Address a where a.street is not null) addr");
- checkQuery("select p.name n from \"\".Person p order by p.old + 10");
+ checkQuery("select p.name n from sch.Person p order by p.old + 10");
- checkQuery("select case when p.name is null then 'Vasya' end x from \"\".Person p");
- checkQuery("select case when p.name like 'V%' then 'Vasya' else 'Other' end x from \"\".Person p");
- checkQuery("select case when upper(p.name) = 'VASYA' then 'Vasya' when p.name is not null then p.name else 'Other' end x from \"\".Person p");
+ checkQuery("select case when p.name is null then 'Vasya' end x from sch.Person p");
+ checkQuery("select case when p.name like 'V%' then 'Vasya' else 'Other' end x from sch.Person p");
+ checkQuery("select case when upper(p.name) = 'VASYA' then 'Vasya' when p.name is not null then p.name else 'Other' end x from sch.Person p");
- checkQuery("select case p.name when 'Vasya' then 1 end z from \"\".Person p");
- checkQuery("select case p.name when 'Vasya' then 1 when 'Petya' then 2 end z from \"\".Person p");
- checkQuery("select case p.name when 'Vasya' then 1 when 'Petya' then 2 else 3 end z from \"\".Person p");
- checkQuery("select case p.name when 'Vasya' then 1 else 3 end z from \"\".Person p");
+ checkQuery("select case p.name when 'Vasya' then 1 end z from sch.Person p");
+ checkQuery("select case p.name when 'Vasya' then 1 when 'Petya' then 2 end z from sch.Person p");
+ checkQuery("select case p.name when 'Vasya' then 1 when 'Petya' then 2 else 3 end z from sch.Person p");
+ checkQuery("select case p.name when 'Vasya' then 1 else 3 end z from sch.Person p");
checkQuery("select count(*) as a from Person union select count(*) as a from Address");
checkQuery("select old, count(*) as a from Person group by old union select 1, count(*) as a from Address");
@@ -268,6 +278,54 @@ public class GridQueryParsingTest extends GridCommonAbstractTest {
checkQuery("(select 2 a) union all (select 1) order by a desc nulls first limit ? offset ?");
}
+ /**
+ * Query AST transformation heavily depends on this behavior.
+ *
+ * @throws Exception If failed.
+ */
+ public void testParseTableFilter() throws Exception {
+ Prepared prepared = parse("select Person.old, p1.old, p1.addrId from Person, Person p1 " +
+ "where exists(select 1 from Address a where a.id = p1.addrId)");
+
+ GridSqlSelect select = (GridSqlSelect)new GridSqlQueryParser().parse(prepared);
+
+ GridSqlJoin join = (GridSqlJoin)select.from();
+
+ GridSqlTable tbl1 = (GridSqlTable)join.leftTable();
+ GridSqlAlias tbl2Alias = (GridSqlAlias)join.rightTable();
+ GridSqlTable tbl2 = tbl2Alias.child();
+
+ // Must be distinct objects, even if it is the same table.
+ //assertNotSame(tbl1, tbl2);
+
+ assertNotNull(tbl1.dataTable());
+ assertNotNull(tbl2.dataTable());
+ assertSame(tbl1.dataTable(), tbl2.dataTable());
+
+ GridSqlColumn col1 = (GridSqlColumn)select.column(0);
+ GridSqlColumn col2 = (GridSqlColumn)select.column(1);
+
+ assertSame(tbl1, col1.expressionInFrom());
+
+ // Alias in FROM must be included in column.
+ assertSame(tbl2Alias, col2.expressionInFrom());
+
+ // In EXISTS we must correctly reference the column from the outer query.
+ GridSqlElement exists = select.where();
+ GridSqlSubquery subqry = exists.child();
+ GridSqlSelect subSelect = (GridSqlSelect)subqry.select();
+
+ GridSqlColumn p1AddrIdCol = (GridSqlColumn)select.column(2);
+
+ assertEquals("ADDRID", p1AddrIdCol.column().getName());
+ assertSame(tbl2Alias, p1AddrIdCol.expressionInFrom());
+
+ GridSqlColumn p1AddrIdColExists = subSelect.where().child(1);
+ assertEquals("ADDRID", p1AddrIdCol.column().getName());
+
+ assertSame(tbl2Alias, p1AddrIdColExists.expressionInFrom());
+ }
+
/** */
public void testParseMerge() throws Exception {
/* Plain rows w/functions, operators, defaults, and placeholders. */
@@ -378,7 +436,7 @@ public class GridQueryParsingTest extends GridCommonAbstractTest {
IgniteH2Indexing idx = U.field(qryProcessor, "idx");
- return (JdbcConnection)idx.connectionForSpace(null);
+ return (JdbcConnection)idx.connectionForSpace(TEST_CACHE);
}
/**
@@ -455,6 +513,9 @@ public class GridQueryParsingTest extends GridCommonAbstractTest {
@QuerySqlField(index = true)
public int addrId;
+ @QuerySqlField
+ public Integer[] addrIds;
+
@QuerySqlField(index = true)
public int old;
}
[13/19] ignite git commit: Merge branch ignite-1.8.4-p1 into
ignite-1.8.5-p1
Posted by nt...@apache.org.
Merge branch ignite-1.8.4-p1 into ignite-1.8.5-p1
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4fce2805
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4fce2805
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4fce2805
Branch: refs/heads/master
Commit: 4fce28054bc325741f65035ae384f9b4b9c3fee8
Parents: 62dbba8 8273e67
Author: Alexander Fedotov <al...@gmail.com>
Authored: Fri Apr 7 16:06:34 2017 +0300
Committer: Alexander Fedotov <al...@gmail.com>
Committed: Fri Apr 7 16:06:34 2017 +0300
----------------------------------------------------------------------
.../internal/managers/discovery/DiscoCache.java | 310 ++++++++
.../discovery/GridDiscoveryManager.java | 710 ++++++-------------
.../eventstorage/DiscoveryEventListener.java | 33 +
.../eventstorage/GridEventStorageManager.java | 162 ++++-
.../affinity/GridAffinityAssignmentCache.java | 7 +-
.../cache/CacheAffinitySharedManager.java | 35 +-
.../cache/GridCacheAffinityManager.java | 2 +-
.../GridCachePartitionExchangeManager.java | 64 +-
.../dht/GridClientPartitionTopology.java | 20 +-
.../dht/GridDhtAssignmentFetchFuture.java | 7 +-
.../dht/GridDhtPartitionTopologyImpl.java | 44 +-
.../dht/atomic/GridDhtAtomicCache.java | 4 +-
.../GridDhtPartitionsExchangeFuture.java | 33 +-
.../service/GridServiceProcessor.java | 21 +-
.../GridDiscoveryManagerAliveCacheSelfTest.java | 64 +-
.../GridDiscoveryManagerAttributesSelfTest.java | 14 +-
.../discovery/GridDiscoveryManagerSelfTest.java | 214 ------
.../IgniteTopologyPrintFormatSelfTest.java | 8 +-
.../testsuites/IgniteKernalSelfTestSuite.java | 3 -
19 files changed, 854 insertions(+), 901 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4fce2805/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 2ec1070,80549dc..53e6069
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@@ -112,8 -108,8 +108,9 @@@ import org.apache.ignite.spi.discovery.
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.thread.IgniteThread;
+ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@@ -1892,29 -1901,114 +1902,137 @@@ public class GridDiscoveryManager exten
}
/**
+ * @return {@code True} if local node client and discovery SPI supports reconnect.
+ */
+ public boolean reconnectSupported() {
+ DiscoverySpi spi = getSpi();
+
+ return ctx.clientNode() && (spi instanceof TcpDiscoverySpi) &&
+ !(((TcpDiscoverySpi) spi).isClientReconnectDisabled());
+ }
+
+ /**
+ * Leave cluster and try to join again.
+ *
+ * @throws IgniteSpiException If failed.
+ */
+ public void reconnect() {
+ assert reconnectSupported();
+
+ DiscoverySpi discoverySpi = getSpi();
+
+ ((TcpDiscoverySpi)discoverySpi).reconnect();
+ }
+
+ /**
+ * @param loc Local node.
+ * @param topSnapshot Topology snapshot.
+ * @return Newly created discovery cache.
+ */
+ @NotNull private DiscoCache createDiscoCache(ClusterNode loc, Collection<ClusterNode> topSnapshot) {
+ HashSet<UUID> alives = U.newHashSet(topSnapshot.size());
+ HashMap<UUID, ClusterNode> nodeMap = U.newHashMap(topSnapshot.size());
+
+ ArrayList<ClusterNode> daemonNodes = new ArrayList<>(topSnapshot.size());
+ ArrayList<ClusterNode> srvNodes = new ArrayList<>(topSnapshot.size());
+ ArrayList<ClusterNode> rmtNodes = new ArrayList<>(topSnapshot.size());
+ ArrayList<ClusterNode> allNodes = new ArrayList<>(topSnapshot.size());
+
+ for (ClusterNode node : topSnapshot) {
+ if (alive(node))
+ alives.add(node.id());
+
+ if (node.isDaemon())
+ daemonNodes.add(node);
+ else {
+ allNodes.add(node);
+
+ if (!node.isLocal())
+ rmtNodes.add(node);
+
+ if (!CU.clientNode(node))
+ srvNodes.add(node);
+ }
+
+ nodeMap.put(node.id(), node);
+ }
+
+ assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local node" +
+ " [rmtNodes=" + rmtNodes + ", loc=" + loc + ']';
+
+ Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size());
+ Map<Integer, List<ClusterNode>> affCacheNodes = U.newHashMap(allNodes.size());
+
+ Set<ClusterNode> allNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
+ Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
+ Set<ClusterNode> srvNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
+
+ Set<Integer> nearEnabledCaches = new HashSet<>();
+
+ for (ClusterNode node : allNodes) {
+ assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']';
+ assert !node.isDaemon();
+
+ for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) {
+ String cacheName = entry.getKey();
+ CachePredicate filter = entry.getValue();
+
+ if (filter.cacheNode(node)) {
+ allNodesWithCaches.add(node);
+
+ if(!CU.clientNode(node))
+ srvNodesWithCaches.add(node);
+
+ if (!node.isLocal())
+ rmtNodesWithCaches.add(node);
+
+ addToMap(allCacheNodes, cacheName, node);
+
+ if (filter.dataNode(node))
+ addToMap(affCacheNodes, cacheName, node);
+
+ if (filter.nearNode(node))
+ nearEnabledCaches.add(CU.cacheId(cacheName));
+ }
+ }
+ }
+
+ return new DiscoCache(
+ loc,
+ Collections.unmodifiableList(rmtNodes),
+ Collections.unmodifiableList(allNodes),
+ Collections.unmodifiableList(srvNodes),
+ Collections.unmodifiableList(daemonNodes),
+ U.sealList(srvNodesWithCaches),
+ U.sealList(allNodesWithCaches),
+ U.sealList(rmtNodesWithCaches),
+ Collections.unmodifiableMap(allCacheNodes),
+ Collections.unmodifiableMap(affCacheNodes),
+ Collections.unmodifiableMap(nodeMap),
+ Collections.unmodifiableSet(nearEnabledCaches),
+ alives);
+ }
+
+ /**
+ * Adds node to map.
+ *
+ * @param cacheMap Map to add to.
+ * @param cacheName Cache name.
+ * @param rich Node to add
+ */
+ private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) {
+ List<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName));
+
+ if (cacheNodes == null) {
+ cacheNodes = new ArrayList<>();
+
+ cacheMap.put(CU.cacheId(cacheName), cacheNodes);
+ }
+
+ cacheNodes.add(rich);
+ }
+
+ /**
* Updates topology version if current version is smaller than updated.
*
* @param updated Updated topology version.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4fce2805/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4fce2805/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4fce2805/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4fce2805/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 6ced5e6,99146aa..adfbc11
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@@ -315,10 -313,8 +315,10 @@@ public class GridServiceProcessor exten
busyLock.block();
+ U.shutdownNow(GridServiceProcessor.class, depExe, log);
+
if (!ctx.clientNode())
- ctx.event().removeLocalEventListener(topLsnr);
+ ctx.event().removeDiscoveryEventListener(topLsnr);
Collection<ServiceContextImpl> ctxs = new ArrayList<>();
@@@ -1576,19 -1586,16 +1576,22 @@@
if (!((CacheAffinityChangeMessage)msg).exchangeNeeded())
return;
}
+ else if (msg instanceof DynamicCacheChangeBatch) {
+ if (!((DynamicCacheChangeBatch)msg).exchangeNeeded())
+ return;
+ }
+ else
+ return;
}
else
- topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
+ topVer = new AffinityTopologyVersion((evt).topologyVersion(), 0);
- depExe.execute(new BusyRunnable() {
+ depExe.execute(new DepRunnable() {
@Override public void run0() {
- ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer);
+ // In case the cache instance isn't tracked by DiscoveryManager anymore.
+ discoCache.updateAlives(ctx.discovery());
+
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
if (oldest != null && oldest.isLocal()) {
final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/4fce2805/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
----------------------------------------------------------------------
[03/19] ignite git commit: IGNITE-3829: Additional fix.
Posted by nt...@apache.org.
IGNITE-3829: Additional fix.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7b0d2326
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7b0d2326
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7b0d2326
Branch: refs/heads/master
Commit: 7b0d2326ccc62afd5d162b056398f96d8d7c9100
Parents: 4dc624f
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Sep 5 10:19:48 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Sep 5 10:20:39 2016 +0300
----------------------------------------------------------------------
.../cache/binary/CacheObjectBinaryProcessorImpl.java | 11 ++++++++---
1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b0d2326/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index ecd27f7..82e67ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -709,10 +709,15 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
if (meta != null) {
String name = meta.affinityKeyFieldName();
- affKeyFields.putIfAbsent(meta.typeId(), new T1<>(meta.field(name)));
+ if (name != null) {
+ BinaryField field = meta.field(name);
- if (name != null)
- return po.field(name);
+ affKeyFields.putIfAbsent(meta.typeId(), new T1<>(field));
+
+ return field.value(po);
+ }
+ else
+ affKeyFields.putIfAbsent(meta.typeId(), new T1<BinaryField>(null));
}
else if (po instanceof BinaryObjectEx) {
int typeId = ((BinaryObjectEx)po).typeId();
[17/19] ignite git commit: ignite-4946 GridCacheP2PUndeploySelfTest
became failed
Posted by nt...@apache.org.
ignite-4946 GridCacheP2PUndeploySelfTest became failed
(cherry picked from commit d298e75)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cd0b9295
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cd0b9295
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cd0b9295
Branch: refs/heads/master
Commit: cd0b92950c6691c6fc1a26cb4f7e55f5ee459298
Parents: 20016a2
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Thu Apr 13 15:52:20 2017 +0300
Committer: Igor Seliverstov <gv...@gmail.com>
Committed: Thu Apr 13 16:44:07 2017 +0300
----------------------------------------------------------------------
.../eventstorage/GridEventStorageManager.java | 341 ++++++++++---------
1 file changed, 187 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/cd0b9295/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index a2c64ba..406da2d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.EventListener;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -33,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteEvents;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
@@ -78,10 +80,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUB
*/
public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> {
/** Local event listeners. */
- private final ConcurrentMap<Integer, Set<GridLocalEventListener>> lsnrs = new ConcurrentHashMap8<>();
-
- /** Internal discovery listeners. */
- private final ConcurrentMap<Integer, Set<DiscoveryEventListener>> discoLsnrs = new ConcurrentHashMap8<>();
+ private final ConcurrentMap<Integer, Set<EventListener>> lsnrs = new ConcurrentHashMap8<>();
/** Busy lock to control activity of threads. */
private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
@@ -200,7 +199,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
@Override public void printMemoryStats() {
int lsnrsCnt = 0;
- for (Set<GridLocalEventListener> lsnrs0 : lsnrs.values())
+ for (Set<EventListener> lsnrs0 : lsnrs.values())
lsnrsCnt += lsnrs0.size();
X.println(">>>");
@@ -238,7 +237,6 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
msgLsnr = null;
lsnrs.clear();
- discoLsnrs.clear();
}
/** {@inheritDoc} */
@@ -274,6 +272,26 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
* @param evt Event to record.
*/
public void record(Event evt) {
+ record0(evt);
+ }
+
+ /**
+ * Records discovery events.
+ *
+ * @param evt Event to record.
+ * @param discoCache Discovery cache.
+ */
+ public void record(DiscoveryEvent evt, DiscoCache discoCache) {
+ record0(evt, discoCache);
+ }
+
+ /**
+ * Records event if it's recordable.
+ *
+ * @param evt Event to record.
+ * @param params Additional parameters.
+ */
+ private void record0(Event evt, Object... params) {
assert evt != null;
if (!enterBusy())
@@ -297,31 +315,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
if (isRecordable(type))
- notifyListeners(evt);
- }
- finally {
- leaveBusy();
- }
- }
-
- /**
- * Records discovery events.
- *
- * @param evt Event to record.
- * @param discoCache Discovery cache.
- */
- public void record(DiscoveryEvent evt, DiscoCache discoCache) {
- assert evt != null;
-
- if (!enterBusy())
- return;
-
- try {
- // Notify internal discovery listeners first.
- notifyDiscoveryListeners(evt, discoCache);
-
- // Notify all other registered listeners.
- record(evt);
+ notifyListeners(lsnrs.get(evt.type()), evt, params);
}
finally {
leaveBusy();
@@ -571,6 +565,8 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
* @param types Event types to subscribe listener for.
*/
public void addLocalEventListener(IgnitePredicate<? extends Event> lsnr, int[] types) {
+ assert lsnr != null;
+
try {
ctx.resource().injectGeneric(lsnr);
}
@@ -578,7 +574,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
throw new IgniteException("Failed to inject resources to event listener: " + lsnr, e);
}
- addLocalEventListener(new UserListenerWrapper(lsnr), types);
+ addEventListener(new UserListenerWrapper(lsnr), types);
}
/**
@@ -594,20 +590,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
assert types != null;
assert types.length > 0;
- if (!enterBusy())
- return;
-
- try {
- for (int t : types) {
- getOrCreate(lsnrs, t).add(lsnr);
-
- if (!isRecordable(t))
- U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t));
- }
- }
- finally {
- leaveBusy();
- }
+ addEventListener(new LocalListenerWrapper(lsnr), types);
}
/**
@@ -620,27 +603,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
public void addLocalEventListener(GridLocalEventListener lsnr, int type, @Nullable int... types) {
assert lsnr != null;
- if (!enterBusy())
- return;
-
- try {
- getOrCreate(lsnrs, type).add(lsnr);
-
- if (!isRecordable(type))
- U.warn(log, "Added listener for disabled event type: " + U.gridEventName(type));
-
- if (types != null) {
- for (int t : types) {
- getOrCreate(lsnrs, t).add(lsnr);
-
- if (!isRecordable(t))
- U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t));
- }
- }
- }
- finally {
- leaveBusy();
- }
+ addEventListener(new LocalListenerWrapper(lsnr), type, types);
}
/**
@@ -656,12 +619,40 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
assert types != null;
assert types.length > 0;
+ addEventListener(new DiscoveryListenerWrapper(lsnr), types);
+ }
+
+ /**
+ * Adds discovery event listener.
+ *
+ * @param lsnr Listener to add.
+ * @param type Event type to subscribe listener for.
+ * @param types Additional event types to subscribe listener for.
+ */
+ public void addDiscoveryEventListener(DiscoveryEventListener lsnr, int type, @Nullable int... types) {
+ assert lsnr != null;
+
+ addEventListener(new DiscoveryListenerWrapper(lsnr), type, types);
+ }
+
+ /**
+ * Adds local event listener. Note that this method specifically disallow an empty
+ * array of event type to prevent accidental subscription for all system event that
+ * may lead to a drastic performance decrease.
+ *
+ * @param lsnr Listener to add.
+ * @param types Event types to subscribe listener for.
+ */
+ private void addEventListener(EventListener lsnr, int[] types) {
if (!enterBusy())
return;
try {
for (int t : types) {
- getOrCreate(discoLsnrs, t).add(lsnr);
+ getOrCreate(lsnrs, t).add(lsnr);
+
+ if (!isRecordable(t))
+ U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t));
}
}
finally {
@@ -670,24 +661,28 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
/**
- * Adds discovery event listener.
+ * Adds local event listener.
*
* @param lsnr Listener to add.
* @param type Event type to subscribe listener for.
* @param types Additional event types to subscribe listener for.
*/
- public void addDiscoveryEventListener(DiscoveryEventListener lsnr, int type, @Nullable int... types) {
- assert lsnr != null;
-
+ private void addEventListener(EventListener lsnr, int type, @Nullable int... types) {
if (!enterBusy())
return;
try {
- getOrCreate(discoLsnrs, type).add(lsnr);
+ getOrCreate(lsnrs, type).add(lsnr);
+
+ if (!isRecordable(type))
+ U.warn(log, "Added listener for disabled event type: " + U.gridEventName(type));
if (types != null) {
for (int t : types) {
- getOrCreate(discoLsnrs, t).add(lsnr);
+ getOrCreate(lsnrs, t).add(lsnr);
+
+ if (!isRecordable(t))
+ U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t));
}
}
}
@@ -696,6 +691,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
}
+
/**
* @param lsnrs Listeners map.
* @param type Event type.
@@ -727,7 +723,9 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
* @return Returns {@code true} if removed.
*/
public boolean removeLocalEventListener(IgnitePredicate<? extends Event> lsnr, @Nullable int... types) {
- return removeLocalEventListener(new UserListenerWrapper(lsnr), types);
+ assert lsnr != null;
+
+ return removeEventListener(new UserListenerWrapper(lsnr), types);
}
/**
@@ -741,33 +739,21 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
public boolean removeLocalEventListener(GridLocalEventListener lsnr, @Nullable int... types) {
assert lsnr != null;
- boolean found = false;
-
- if (F.isEmpty(types)) {
- for (Set<GridLocalEventListener> set : lsnrs.values())
- if (set.remove(lsnr))
- found = true;
- }
- else {
- assert types != null;
-
- for (int type : types) {
- Set<GridLocalEventListener> set = lsnrs.get(type);
-
- if (set != null && set.remove(lsnr))
- found = true;
- }
- }
-
- if (lsnr instanceof UserListenerWrapper)
- {
- IgnitePredicate p = ((UserListenerWrapper)lsnr).listener();
+ return removeEventListener(new LocalListenerWrapper(lsnr), types);
+ }
- if (p instanceof PlatformEventFilterListener)
- ((PlatformEventFilterListener)p).onClose();
- }
+ /**
+ * Removes listener for specified events, if any. If no event types provided - it
+ * remove the listener for all its registered events.
+ *
+ * @param lsnr Listener.
+ * @param types Event types.
+ * @return Returns {@code true} if removed.
+ */
+ public boolean removeDiscoveryEventListener(DiscoveryEventListener lsnr, @Nullable int... types) {
+ assert lsnr != null;
- return found;
+ return removeEventListener(new DiscoveryListenerWrapper(lsnr), types);
}
/**
@@ -778,13 +764,13 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
* @param types Event types.
* @return Returns {@code true} if removed.
*/
- public boolean removeDiscoveryEventListener(DiscoveryEventListener lsnr, @Nullable int... types) {
+ private boolean removeEventListener(EventListener lsnr, @Nullable int[] types) {
assert lsnr != null;
boolean found = false;
if (F.isEmpty(types)) {
- for (Set<DiscoveryEventListener> set : discoLsnrs.values())
+ for (Set<EventListener> set : lsnrs.values())
if (set.remove(lsnr))
found = true;
}
@@ -792,13 +778,21 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
assert types != null;
for (int type : types) {
- Set<DiscoveryEventListener> set = discoLsnrs.get(type);
+ Set<EventListener> set = lsnrs.get(type);
if (set != null && set.remove(lsnr))
found = true;
}
}
+ if (lsnr instanceof UserListenerWrapper)
+ {
+ IgnitePredicate p = ((UserListenerWrapper)lsnr).listener();
+
+ if (p instanceof PlatformEventFilterListener)
+ ((PlatformEventFilterListener)p).onClose();
+ }
+
return found;
}
@@ -862,62 +856,18 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
/**
- * @param evt Event to notify about.
- */
- private void notifyListeners(Event evt) {
- assert evt != null;
-
- notifyListeners(lsnrs.get(evt.type()), evt);
- }
-
- /**
* @param set Set of listeners.
* @param evt Grid event.
*/
- private void notifyListeners(@Nullable Collection<GridLocalEventListener> set, Event evt) {
- assert evt != null;
-
- if (!F.isEmpty(set)) {
- assert set != null;
-
- for (GridLocalEventListener lsnr : set) {
- try {
- lsnr.onEvent(evt);
- }
- catch (Throwable e) {
- U.error(log, "Unexpected exception in listener notification for event: " + evt, e);
-
- if (e instanceof Error)
- throw (Error)e;
- }
- }
- }
- }
-
- /**
- * @param evt Discovery event
- * @param cache Discovery cache.
- */
- private void notifyDiscoveryListeners(DiscoveryEvent evt, DiscoCache cache) {
- assert evt != null;
-
- notifyDiscoveryListeners(discoLsnrs.get(evt.type()), evt, cache);
- }
-
- /**
- * @param set Set of listeners.
- * @param evt Discovery event.
- * @param cache Discovery cache.
- */
- private void notifyDiscoveryListeners(@Nullable Collection<DiscoveryEventListener> set, DiscoveryEvent evt, DiscoCache cache) {
+ private void notifyListeners(@Nullable Collection<EventListener> set, Event evt, Object[] params) {
assert evt != null;
if (!F.isEmpty(set)) {
assert set != null;
- for (DiscoveryEventListener lsnr : set) {
+ for (EventListener lsnr : set) {
try {
- lsnr.onEvent(evt, cache);
+ ((ListenerWrapper)lsnr).onEvent(evt, params);
}
catch (Throwable e) {
U.error(log, "Unexpected exception in listener notification for event: " + evt, e);
@@ -1307,10 +1257,93 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
}
+ /** */
+ private abstract static class ListenerWrapper implements EventListener {
+ abstract void onEvent(Event evt, Object[] params);
+ }
+
+ /**
+ * Wraps local listener
+ */
+ private static final class LocalListenerWrapper extends ListenerWrapper {
+ /** */
+ private final GridLocalEventListener lsnr;
+
+ /**
+ * @param lsnr Listener.
+ */
+ private LocalListenerWrapper(GridLocalEventListener lsnr) {
+ this.lsnr = lsnr;
+ }
+
+ /** {@inheritDoc} */
+ @Override void onEvent(Event evt, Object[] params) {
+ lsnr.onEvent(evt);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ LocalListenerWrapper wrapper = (LocalListenerWrapper)o;
+
+ return lsnr.equals(wrapper.lsnr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return lsnr.hashCode();
+ }
+ }
+
+ /**
+ * Wraps discovery local listener
+ */
+ private static final class DiscoveryListenerWrapper extends ListenerWrapper {
+ /** */
+ private final DiscoveryEventListener lsnr;
+
+ /**
+ * @param lsnr Listener.
+ */
+ private DiscoveryListenerWrapper(DiscoveryEventListener lsnr) {
+ this.lsnr = lsnr;
+ }
+
+ /** {@inheritDoc} */
+ @Override void onEvent(Event evt, Object[] params) {
+ // No checks there since only DiscoveryManager produses DiscoveryEvents
+ // and it uses an overloaded method with additional parameters
+ lsnr.onEvent((DiscoveryEvent)evt, (DiscoCache)params[0]);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ DiscoveryListenerWrapper wrapper = (DiscoveryListenerWrapper)o;
+
+ return lsnr.equals(wrapper.lsnr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return lsnr.hashCode();
+ }
+ }
+
/**
- * Wraps user listener predicate provided via {@link org.apache.ignite.IgniteEvents#localListen(org.apache.ignite.lang.IgnitePredicate, int...)}.
+ * Wraps user listener predicate provided via {@link IgniteEvents#localListen(IgnitePredicate, int...)}.
*/
- private class UserListenerWrapper implements GridLocalEventListener {
+ private final class UserListenerWrapper extends ListenerWrapper {
/** */
private final IgnitePredicate<Event> lsnr;
@@ -1329,9 +1362,9 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
/** {@inheritDoc} */
- @Override public void onEvent(Event evt) {
+ @Override void onEvent(Event evt, Object[] params) {
if (!lsnr.apply(evt))
- removeLocalEventListener(this);
+ removeEventListener(this, null);
}
/** {@inheritDoc} */
[18/19] ignite git commit: Merge branch 'ignite-1.8.5-p1'
Posted by nt...@apache.org.
Merge branch 'ignite-1.8.5-p1'
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
# modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
# modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
# modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3d0b83d8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3d0b83d8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3d0b83d8
Branch: refs/heads/master
Commit: 3d0b83d8dc37faf3f2f7963a9e1cf7380e11150a
Parents: 227599f cd0b929
Author: dkarachentsev <dk...@gridgain.com>
Authored: Tue Apr 18 12:53:05 2017 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Tue Apr 18 12:53:05 2017 +0300
----------------------------------------------------------------------
modules/core/pom.xml | 3 ++
.../IgniteCacheExpiryPolicyAbstractTest.java | 44 ++++++++++++++++++++
2 files changed, 47 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d0b83d8/modules/core/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d0b83d8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
[02/19] ignite git commit: IGNITE-3829: Optimized affinity key field
name handling.
Posted by nt...@apache.org.
IGNITE-3829: Optimized affinity key field name handling.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4dc624fc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4dc624fc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4dc624fc
Branch: refs/heads/master
Commit: 4dc624fc9a8852f77f1fe7db4dc06a474b34c2eb
Parents: 8c56e45
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Sep 2 18:23:09 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Sep 2 18:26:03 2016 +0300
----------------------------------------------------------------------
.../binary/CacheObjectBinaryProcessorImpl.java | 35 ++++++++++++++++----
1 file changed, 28 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4dc624fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 0337874..ecd27f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -40,6 +40,7 @@ import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.BinaryBasicNameMapper;
+import org.apache.ignite.binary.BinaryField;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.binary.BinaryObjectException;
@@ -89,6 +90,7 @@ import org.apache.ignite.internal.util.lang.GridMapEntry;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T1;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -159,6 +161,9 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
/** Metadata updates collected before metadata cache is initialized. */
private final Map<Integer, BinaryMetadata> metaBuf = new ConcurrentHashMap<>();
+ /** Cached affinity key field names. */
+ private final ConcurrentHashMap<Integer, T1<BinaryField>> affKeyFields = new ConcurrentHashMap<>();
+
/**
* @param ctx Kernal context.
*/
@@ -684,22 +689,38 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
* @return Affinity key.
*/
public Object affinityKey(BinaryObject po) {
+ // Fast path for already cached field.
+ if (po instanceof BinaryObjectEx) {
+ int typeId = ((BinaryObjectEx)po).typeId();
+
+ T1<BinaryField> fieldHolder = affKeyFields.get(typeId);
+
+ if (fieldHolder != null) {
+ BinaryField field = fieldHolder.get();
+
+ return field != null ? field.value(po) : po;
+ }
+ }
+
+ // Slow path if affinity field is not cached yet.
try {
BinaryType meta = po instanceof BinaryObjectEx ? ((BinaryObjectEx)po).rawType() : po.type();
if (meta != null) {
- String affKeyFieldName = meta.affinityKeyFieldName();
+ String name = meta.affinityKeyFieldName();
+
+ affKeyFields.putIfAbsent(meta.typeId(), new T1<>(meta.field(name)));
- if (affKeyFieldName != null)
- return po.field(affKeyFieldName);
+ if (name != null)
+ return po.field(name);
}
else if (po instanceof BinaryObjectEx) {
- int id = ((BinaryObjectEx)po).typeId();
+ int typeId = ((BinaryObjectEx)po).typeId();
- String affKeyFieldName = binaryCtx.affinityKeyFieldName(id);
+ String name = binaryCtx.affinityKeyFieldName(typeId);
- if (affKeyFieldName != null)
- return po.field(affKeyFieldName);
+ if (name != null)
+ return po.field(name);
}
}
catch (BinaryObjectException e) {
[06/19] ignite git commit: Minors
Posted by nt...@apache.org.
Minors
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cf9f0a79
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cf9f0a79
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cf9f0a79
Branch: refs/heads/master
Commit: cf9f0a79da7540dc40c3ac860a94158e87e2d7ec
Parents: fe42459
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Thu Jan 12 17:50:02 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Thu Jan 12 17:50:02 2017 +0300
----------------------------------------------------------------------
.../cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/cf9f0a79/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index 4368a15..9739350 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -1026,6 +1026,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
CacheConfiguration ccfg = cacheConfiguration("testCache");
+ ccfg.setCacheStoreFactory(FactoryBuilder.factoryOf(GridCacheTestStore.class));
// ccfg.setExpiryPolicyFactory( CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 1)));
IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg);
[19/19] ignite git commit: Merge remote-tracking branch
'apache/master'
Posted by nt...@apache.org.
Merge remote-tracking branch 'apache/master'
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ea9a9dda
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ea9a9dda
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ea9a9dda
Branch: refs/heads/master
Commit: ea9a9dda78d8ba93aec69c7fc8827dbf50f20a75
Parents: 3d0b83d b2fb9be
Author: dkarachentsev <dk...@gridgain.com>
Authored: Tue Apr 18 13:00:42 2017 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Tue Apr 18 13:00:42 2017 +0300
----------------------------------------------------------------------
.../igfs/common/IgfsControlResponse.java | 28 +
.../internal/igfs/common/IgfsIpcCommand.java | 5 +-
.../ignite/internal/igfs/common/IgfsLogger.java | 60 +-
.../internal/igfs/common/IgfsMarshaller.java | 12 +
.../igfs/common/IgfsModeResolverRequest.java | 35 +
.../processors/cache/GridCacheAdapter.java | 41 +-
.../dht/atomic/GridDhtAtomicCache.java | 6 +-
.../dht/colocated/GridDhtColocatedCache.java | 6 +-
.../near/GridNearTransactionalCache.java | 2 +-
.../local/atomic/GridLocalAtomicCache.java | 6 +-
.../processors/hadoop/HadoopPayloadAware.java | 28 -
.../internal/processors/igfs/IgfsAsyncImpl.java | 5 -
.../ignite/internal/processors/igfs/IgfsEx.java | 7 -
.../processors/igfs/IgfsHandshakeResponse.java | 22 +-
.../internal/processors/igfs/IgfsImpl.java | 23 +-
.../processors/igfs/IgfsIpcHandler.java | 20 +-
.../processors/igfs/IgfsModeResolver.java | 91 ++-
.../internal/processors/igfs/IgfsPaths.java | 152 ----
.../resources/META-INF/classnames.properties | 1 -
.../internal/ClusterNodeMetricsSelfTest.java | 37 +-
...ridCacheReplicatedSynchronousCommitTest.java | 2 +-
...fsLocalSecondaryFileSystemProxySelfTest.java | 1 -
.../internal/processors/igfs/IgfsMock.java | 7 -
.../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 9 +-
.../hadoop/fs/v1/IgniteHadoopFileSystem.java | 698 ++++---------------
.../hadoop/fs/v2/IgniteHadoopFileSystem.java | 481 +++----------
...doopIgfsSecondaryFileSystemDelegateImpl.java | 11 +-
.../hadoop/impl/igfs/HadoopIgfsEx.java | 8 +
.../hadoop/impl/igfs/HadoopIgfsInProc.java | 23 +-
.../hadoop/impl/igfs/HadoopIgfsOutProc.java | 12 +
.../hadoop/impl/igfs/HadoopIgfsWrapper.java | 14 +
.../igfs/HadoopFIleSystemFactorySelfTest.java | 14 +-
.../IgniteHadoopFileSystemAbstractSelfTest.java | 10 +
.../IgniteHadoopFileSystemLoggerSelfTest.java | 32 +-
...condaryFileSystemInitializationSelfTest.java | 213 ------
.../testsuites/IgniteHadoopTestSuite.java | 3 -
36 files changed, 583 insertions(+), 1542 deletions(-)
----------------------------------------------------------------------
[09/19] ignite git commit: Merge branch ignite-1.6.5 into
ignite-1.8.5-p1
Posted by nt...@apache.org.
Merge branch ignite-1.6.5 into ignite-1.8.5-p1
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3be4e003
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3be4e003
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3be4e003
Branch: refs/heads/master
Commit: 3be4e00373ec5a2b49788d70eb0aebccc3cb6ccf
Parents: ae435fb cf9f0a7
Author: Alexander Fedotov <al...@gmail.com>
Authored: Fri Apr 7 14:59:00 2017 +0300
Committer: Alexander Fedotov <al...@gmail.com>
Committed: Fri Apr 7 15:02:32 2017 +0300
----------------------------------------------------------------------
.../IgniteCacheExpiryPolicyAbstractTest.java | 39 ++++++++++++++++++++
1 file changed, 39 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3be4e003/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index b234631,9739350..3339f65
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@@ -1018,45 -1009,43 +1019,83 @@@ public abstract class IgniteCacheExpiry
}
/**
+ * Put entry to server node and check how its expires in client NearCache.
+ *
+ * @throws Exception If failed.
+ */
+ public void testNearExpiresOnClient() throws Exception {
+ if(cacheMode() != PARTITIONED)
+ return;
+
+ factory = CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS,1));
+
+ nearCache = true;
+
+ startGrids();
+
+ IgniteConfiguration clientCfg = getConfiguration("client").setClientMode(true);
+
+ ((TcpDiscoverySpi)clientCfg.getDiscoverySpi()).setForceServerMode(false);
+
+ Ignite client = startGrid("client", clientCfg);
+
+ IgniteCache<Object, Object> cache = client.cache(null);
+
+ Integer key = 1;
+
+ // Put on server node.
+ jcache(0).put(key, 1);
+
+ // Make entry cached in client NearCache.
+ assertEquals(1, cache.get(key));
+
+ assertEquals(1, cache.localPeek(key, CachePeekMode.NEAR));
+
+ waitExpired(key);
+
+ // Check client NearCache.
+ assertNull(cache.localPeek(key, CachePeekMode.NEAR));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNearExpiresWithCacheStore() throws Exception {
+ factory = CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 1));
+
+ nearCache = true;
+
+ startGridsMultiThreaded(gridCount());
+
+ IgniteConfiguration clientCfg = getConfiguration("client").setClientMode(true);
+
+ ((TcpDiscoverySpi)clientCfg.getDiscoverySpi()).setForceServerMode(false);
+
+ Ignite client = startGrid("client", clientCfg);
+
+ CacheConfiguration ccfg = cacheConfiguration("testCache");
+
+ ccfg.setCacheStoreFactory(FactoryBuilder.factoryOf(GridCacheTestStore.class));
+ // ccfg.setExpiryPolicyFactory( CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 1)));
+
+ IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg);
+
+ Integer key = 1;
+
+ cache.put(key, 1);
+
+ assertEquals(1, cache.localPeek(key, CachePeekMode.NEAR));
+ assertEquals(1, cache.get(key));
+
+ waitExpired(key);
+
+ for(int i = 0; i < gridCount(); i++)
+ assertNull(jcache(i).localPeek(key, CachePeekMode.BACKUP, CachePeekMode.PRIMARY));
+
+ assertEquals(null, cache.get(key));
+ }
++
+ /**
* @return Test keys.
* @throws Exception If failed.
*/
[12/19] ignite git commit: Merge branch ignite-1.7.4-p1 into
ignite-1.8.5-p1
Posted by nt...@apache.org.
Merge branch ignite-1.7.4-p1 into ignite-1.8.5-p1
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/62dbba81
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/62dbba81
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/62dbba81
Branch: refs/heads/master
Commit: 62dbba81c009170ff6243a28d3ef12fa75b96225
Parents: 6954ff0 71412ce
Author: Alexander Fedotov <al...@gmail.com>
Authored: Fri Apr 7 15:46:11 2017 +0300
Committer: Alexander Fedotov <al...@gmail.com>
Committed: Fri Apr 7 15:46:11 2017 +0300
----------------------------------------------------------------------
----------------------------------------------------------------------