You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2016/06/15 12:05:48 UTC
[01/16] ignite git commit: ignite-3261 Store information about
AffinityKey class in metadata cache. (cherry picked from commit 2b64e7c)
Repository: ignite
Updated Branches:
refs/heads/master b500d3a56 -> b742f5fc8
ignite-3261 Store information about AffinityKey class in metadata cache.
(cherry picked from commit 2b64e7c)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/150b5c99
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/150b5c99
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/150b5c99
Branch: refs/heads/master
Commit: 150b5c99828f701f72cabf852dfbff0dca4ea0ca
Parents: fad73bf
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 14 09:45:10 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 14 10:35:43 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/binary/BinaryContext.java | 43 ++++++++++--
.../ignite/internal/binary/AffinityKey.java | 69 ++++++++++++++++++++
.../binary/GridBinaryAffinityKeySelfTest.java | 15 +++++
...aultBinaryMappersBinaryMetaDataSelfTest.java | 17 +++++
4 files changed, 137 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/150b5c99/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
index daf34ad..d39f9c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
@@ -259,7 +259,7 @@ public class BinaryContext {
registerPredefinedType(LinkedHashMap.class, 0);
// Classes with overriden default serialization flag.
- registerPredefinedType(AffinityKey.class, 0, affinityFieldName(AffinityKey.class));
+ registerPredefinedType(AffinityKey.class, 0, affinityFieldName(AffinityKey.class), false);
registerPredefinedType(GridMapEntry.class, 60);
registerPredefinedType(IgniteBiTuple.class, 61);
@@ -562,9 +562,37 @@ public class BinaryContext {
if (desc == null)
desc = registerClassDescriptor(cls, deserialize);
else if (!desc.registered()) {
- assert desc.userType();
-
- desc = registerUserClassDescriptor(desc);
+ if (!desc.userType()) {
+ BinaryClassDescriptor desc0 = new BinaryClassDescriptor(
+ this,
+ desc.describedClass(),
+ false,
+ desc.typeId(),
+ desc.typeName(),
+ desc.affFieldKeyName(),
+ desc.mapper(),
+ desc.initialSerializer(),
+ false,
+ true
+ );
+
+ if (descByCls.replace(cls, desc, desc0)) {
+ Collection<BinarySchema> schemas =
+ desc0.schema() != null ? Collections.singleton(desc.schema()) : null;
+
+ BinaryMetadata meta = new BinaryMetadata(desc0.typeId(),
+ desc0.typeName(),
+ desc0.fieldsMeta(),
+ desc0.affFieldKeyName(),
+ schemas, desc0.isEnum());
+
+ metaHnd.addMeta(desc0.typeId(), meta.wrap(this));
+
+ return desc0;
+ }
+ }
+ else
+ desc = registerUserClassDescriptor(desc);
}
return desc;
@@ -959,15 +987,16 @@ public class BinaryContext {
* @return GridBinaryClassDescriptor.
*/
public BinaryClassDescriptor registerPredefinedType(Class<?> cls, int id) {
- return registerPredefinedType(cls, id, null);
+ return registerPredefinedType(cls, id, null, true);
}
/**
* @param cls Class.
* @param id Type ID.
+ * @param affFieldName Affinity field name.
* @return GridBinaryClassDescriptor.
*/
- public BinaryClassDescriptor registerPredefinedType(Class<?> cls, int id, String affFieldName) {
+ public BinaryClassDescriptor registerPredefinedType(Class<?> cls, int id, String affFieldName, boolean registered) {
String simpleClsName = SIMPLE_NAME_LOWER_CASE_MAPPER.typeName(cls.getName());
if (id == 0)
@@ -983,7 +1012,7 @@ public class BinaryContext {
SIMPLE_NAME_LOWER_CASE_MAPPER,
new BinaryReflectiveSerializer(),
false,
- true /* registered */
+ registered /* registered */
);
predefinedTypeNames.put(simpleClsName, id);
http://git-wip-us.apache.org/repos/asf/ignite/blob/150b5c99/modules/core/src/test/java/org/apache/ignite/internal/binary/AffinityKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/AffinityKey.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/AffinityKey.java
new file mode 100644
index 0000000..1b4daee
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/AffinityKey.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.binary;
+
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class AffinityKey {
+ /** Key. */
+ private int key;
+
+ /** Affinity key. */
+ @AffinityKeyMapped
+ private int aff;
+
+ /**
+ * @param key Key.
+ * @param aff Affinity key.
+ */
+ public AffinityKey(int key, int aff) {
+ this.key = key;
+ this.aff = aff;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ AffinityKey that = (AffinityKey) o;
+
+ return key == that.key && aff == that.aff;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = key;
+
+ res = 31 * res + aff;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(AffinityKey.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/150b5c99/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java
index 06406e0..2b54f6b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java
@@ -139,6 +139,8 @@ public class GridBinaryAffinityKeySelfTest extends GridCommonAbstractTest {
assertEquals(i, aff.affinityKey(ignite.binary().toBinary(new TestObject(i))));
+ assertEquals(i, aff.affinityKey(new AffinityKey(0, i)));
+
BinaryObjectBuilder bldr = ignite.binary().builder("TestObject2");
bldr.setField("affKey", i);
@@ -162,6 +164,8 @@ public class GridBinaryAffinityKeySelfTest extends GridCommonAbstractTest {
assertEquals(affProc.mapKeyToNode(null, i), affProc.mapKeyToNode(null, new TestObject(i)));
assertEquals(affProc.mapKeyToNode(null, i), affProc.mapKeyToNode(null, cacheObj));
+
+ assertEquals(affProc.mapKeyToNode(null, new AffinityKey(0, i)), affProc.mapKeyToNode(null, i));
}
}
@@ -184,6 +188,17 @@ public class GridBinaryAffinityKeySelfTest extends GridCommonAbstractTest {
});
assertEquals(aff.mapKeyToNode(i).id(), nodeId.get());
+
+ grid(0).compute().affinityRun(null, new AffinityKey(0, i), new IgniteRunnable() {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ @Override public void run() {
+ nodeId.set(ignite.configuration().getNodeId());
+ }
+ });
+
+ assertEquals(aff.mapKeyToNode(i).id(), nodeId.get());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/150b5c99/modules/core/src/test/java/org/apache/ignite/internal/binary/GridDefaultBinaryMappersBinaryMetaDataSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/GridDefaultBinaryMappersBinaryMetaDataSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/GridDefaultBinaryMappersBinaryMetaDataSelfTest.java
index 041238f..7010463 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/GridDefaultBinaryMappersBinaryMetaDataSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/GridDefaultBinaryMappersBinaryMetaDataSelfTest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteBinary;
import org.apache.ignite.binary.BinaryBasicIdMapper;
import org.apache.ignite.binary.BinaryNameMapper;
import org.apache.ignite.binary.BinaryBasicNameMapper;
+import org.apache.ignite.cache.affinity.AffinityKey;
import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -149,6 +150,22 @@ public class GridDefaultBinaryMappersBinaryMetaDataSelfTest extends GridCommonAb
else
assert false : meta.typeName();
}
+
+ grid().cache(null).put(new AffinityKey<>(1, 1), 1);
+
+ metas = binaries().types();
+
+ assertEquals(3, metas.size());
+
+ for (BinaryType meta : metas) {
+ if (AffinityKey.class.getSimpleName().equals(meta.typeName())) {
+ assertEquals("affKey", meta.affinityKeyFieldName());
+
+ return;
+ }
+ }
+
+ fail("Failed to find metadata for AffinityKey");
}
/**
[10/16] ignite git commit: IGNITE-3305 - Fixed SYNC rebalance mode
for dynamically started cache.
Posted by vk...@apache.org.
IGNITE-3305 - Fixed SYNC rebalance mode for dynamically started cache.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/84547ef2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/84547ef2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/84547ef2
Branch: refs/heads/master
Commit: 84547ef2a198d639ac213ecf2f51de539b3f19d0
Parents: e9a33f2
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Mon Jun 13 17:52:39 2016 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Jun 14 13:12:38 2016 -0700
----------------------------------------------------------------------
.../processors/cache/GridCacheProcessor.java | 24 ++--
.../IgniteCacheSyncRebalanceModeSelfTest.java | 114 +++++++++++++++++++
.../testsuites/IgniteCacheTestSuite5.java | 2 +
3 files changed, 131 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/84547ef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 37c3cf1..9451254 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -731,9 +731,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void onKernalStart() throws IgniteCheckedException {
- try {
- ClusterNode locNode = ctx.discovery().localNode();
+ ClusterNode locNode = ctx.discovery().localNode();
+ try {
if (!ctx.config().isDaemon() && !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
for (ClusterNode n : ctx.discovery().remoteNodes()) {
if (n.attribute(ATTR_CONSISTENCY_CHECK_SKIPPED))
@@ -822,15 +822,21 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ctx.cacheObjects().onUtilityCacheStarted();
// Wait for caches in SYNC preload mode.
- for (CacheConfiguration cfg : ctx.config().getCacheConfiguration()) {
- GridCacheAdapter cache = caches.get(maskNull(cfg.getName()));
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ CacheConfiguration cfg = desc.cacheConfiguration();
- if (cache != null) {
- if (cfg.getRebalanceMode() == SYNC) {
- CacheMode cacheMode = cfg.getCacheMode();
+ IgnitePredicate filter = cfg.getNodeFilter();
+
+ if (desc.locallyConfigured() || desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter)) {
+ GridCacheAdapter cache = caches.get(maskNull(cfg.getName()));
- if (cacheMode == REPLICATED || (cacheMode == PARTITIONED && cfg.getRebalanceDelay() >= 0))
- cache.preloader().syncFuture().get();
+ if (cache != null) {
+ if (cfg.getRebalanceMode() == SYNC) {
+ CacheMode cacheMode = cfg.getCacheMode();
+
+ if (cacheMode == REPLICATED || (cacheMode == PARTITIONED && cfg.getRebalanceDelay() >= 0))
+ cache.preloader().syncFuture().get();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/84547ef2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java
new file mode 100644
index 0000000..9b0637e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.replicated;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class IgniteCacheSyncRebalanceModeSelfTest extends GridCommonAbstractTest {
+ /** Entry count. */
+ public static final int CNT = 100_000;
+ public static final String STATIC_CACHE_NAME = "static";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration ccfg = new CacheConfiguration(STATIC_CACHE_NAME);
+
+ ccfg.setCacheMode(CacheMode.REPLICATED);
+ ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testStaticCache() throws Exception {
+ IgniteEx ignite = startGrid(0);
+
+ IgniteCache<Object, Object> cache = ignite.cache(STATIC_CACHE_NAME);
+
+ try (IgniteDataStreamer<Object, Object> streamer = ignite.dataStreamer(STATIC_CACHE_NAME)) {
+ streamer.allowOverwrite(true);
+
+ for (int i = 0; i < 100_000; i++)
+ streamer.addData(i, i);
+ }
+
+ assertEquals(CNT, cache.localSize());
+
+ Ignite ignite2 = startGrid(1);
+
+ assertEquals(CNT, ignite2.cache(STATIC_CACHE_NAME).localSize(CachePeekMode.PRIMARY, CachePeekMode.BACKUP));
+
+ for (int i = 0; i < CNT; i++)
+ assertEquals(i, ignite2.cache(STATIC_CACHE_NAME).localPeek(i));
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testDynamicCache() throws Exception {
+ IgniteEx ignite = startGrid(0);
+
+ String cacheName = "dynamic";
+
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(cacheName);
+
+ ccfg.setCacheMode(CacheMode.REPLICATED);
+ ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+
+ IgniteCache<Object, Object> cache = ignite.createCache(ccfg);
+
+ try (IgniteDataStreamer<Object, Object> streamer = ignite.dataStreamer(cacheName)) {
+ streamer.allowOverwrite(true);
+
+ for (int i = 0; i < 100_000; i++)
+ streamer.addData(i, i);
+ }
+
+ assertEquals(CNT, cache.localSize());
+
+ Ignite ignite2 = startGrid(1);
+
+ assertEquals(CNT, ignite2.cache(cacheName).localSize(CachePeekMode.PRIMARY, CachePeekMode.BACKUP));
+
+ for (int i = 0; i < CNT; i++)
+ assertEquals(i, ignite2.cache(cacheName).localPeek(i));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/84547ef2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
index a263c0d..41e0ed1 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheStoreCollectionTes
import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentFairAffinityTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentNodeJoinValidationTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentTest;
+import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheSyncRebalanceModeSelfTest;
import org.apache.ignite.internal.processors.cache.store.IgniteCacheWriteBehindNoUpdateSelfTest;
/**
@@ -52,6 +53,7 @@ public class IgniteCacheTestSuite5 extends TestSuite {
suite.addTestSuite(CacheLateAffinityAssignmentFairAffinityTest.class);
suite.addTestSuite(CacheLateAffinityAssignmentNodeJoinValidationTest.class);
suite.addTestSuite(EntryVersionConsistencyReadThroughTest.class);
+ suite.addTestSuite(IgniteCacheSyncRebalanceModeSelfTest.class);
suite.addTest(IgniteCacheReadThroughEvictionsVariationsSuite.suite());
[03/16] ignite git commit: ignite-114 Load value from store for cache
'invoke' (cherry picked from commit e10ffef)
Posted by vk...@apache.org.
ignite-114 Load value from store for cache 'invoke'
(cherry picked from commit e10ffef)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e90c9c88
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e90c9c88
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e90c9c88
Branch: refs/heads/master
Commit: e90c9c8889235e780bb9f37ed0b0ca7a8d30136d
Parents: 150b5c9
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 14 10:05:20 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 14 10:42:27 2016 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 34 +-
.../processors/cache/GridCacheEntryEx.java | 9 +-
.../processors/cache/GridCacheMapEntry.java | 100 +++--
.../processors/cache/GridCacheUtils.java | 3 +
.../distributed/dht/GridDhtCacheAdapter.java | 10 +-
.../distributed/dht/GridDhtLockFuture.java | 18 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 2 -
.../distributed/dht/GridDhtTxPrepareFuture.java | 19 +-
.../dht/GridPartitionedGetFuture.java | 2 -
.../dht/GridPartitionedSingleGetFuture.java | 2 -
.../dht/atomic/GridDhtAtomicCache.java | 8 -
.../dht/colocated/GridDhtColocatedCache.java | 4 +-
.../dht/preloader/GridDhtForceKeysFuture.java | 3 +-
.../dht/preloader/GridDhtPartitionDemander.java | 3 +-
.../distributed/near/GridNearGetFuture.java | 4 -
.../cache/distributed/near/GridNearTxLocal.java | 7 +-
.../local/atomic/GridLocalAtomicCache.java | 8 -
.../cache/transactions/IgniteTxAdapter.java | 2 -
.../cache/transactions/IgniteTxEntry.java | 24 +-
.../cache/transactions/IgniteTxHandler.java | 46 +++
.../transactions/IgniteTxLocalAdapter.java | 34 +-
.../datastreamer/DataStreamerImpl.java | 3 +-
.../processors/cache/GridCacheTestEntryEx.java | 5 +-
.../cache/IgniteCacheAbstractTest.java | 2 +-
...gniteCacheInvokeReadThroughAbstractTest.java | 382 +++++++++++++++++++
...iteCacheInvokeReadThroughSingleNodeTest.java | 106 +++++
.../cache/IgniteCacheInvokeReadThroughTest.java | 182 +++++----
.../IgniteCacheReadThroughStoreCallTest.java | 288 ++++++++++++++
.../IgniteCacheLoaderWriterAbstractTest.java | 10 +
.../testsuites/IgniteCacheTestSuite4.java | 4 +
30 files changed, 1110 insertions(+), 214 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 0b3b2da..2212926 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -3531,8 +3531,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
GridCacheEntryEx entry = entryEx(key, false);
try {
- entry.initialValue(cacheVal, ver, ttl, CU.EXPIRE_TIME_CALCULATE, false, topVer,
- replicate ? DR_LOAD : DR_NONE);
+ entry.initialValue(cacheVal,
+ ver,
+ ttl,
+ CU.EXPIRE_TIME_CALCULATE,
+ false,
+ topVer,
+ replicate ? DR_LOAD : DR_NONE,
+ true);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to put cache value: " + entry, e);
@@ -4908,19 +4914,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
boolean deserializeBinary)
throws IgniteCheckedException, GridCacheEntryRemovedException {
CacheObject val = entry.innerGet(
- null,
- null,
- false,
- false,
- false,
- true,
- false,
- false,
- false,
- null,
- null,
- null,
- null,
+ /*ver*/null,
+ /*tx*/null,
+ /*swap*/false,
+ /*readThrough*/false,
+ /*metrics*/false,
+ /*evt*/false,
+ /*tmp*/false,
+ /*subjId*/null,
+ /*transformClo*/null,
+ /*taskName*/null,
+ /*expiryPlc*/null,
!deserializeBinary);
if (val == null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 646e6bc..616854f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -279,9 +279,6 @@ public interface GridCacheEntryEx {
* @param tx Ongoing transaction (possibly null).
* @param readSwap Flag indicating whether to check swap memory.
* @param readThrough Flag indicating whether to read through.
- * @param failFast If {@code true}, then throw {@link GridCacheFilterFailedException} if
- * filter didn't pass.
- * @param unmarshal Unmarshal flag.
* @param updateMetrics If {@code true} then metrics should be updated.
* @param evt Flag to signal event notification.
* @param tmp If {@code true} can return temporary instance which is valid while entry lock is held,
@@ -300,8 +297,6 @@ public interface GridCacheEntryEx {
@Nullable IgniteInternalTx tx,
boolean readSwap,
boolean readThrough,
- boolean failFast,
- boolean unmarshal,
boolean updateMetrics,
boolean evt,
boolean tmp,
@@ -676,6 +671,7 @@ public interface GridCacheEntryEx {
* @param preload Flag indicating whether entry is being preloaded.
* @param topVer Topology version.
* @param drType DR type.
+ * @param fromStore {@code True} if value was loaded from store.
* @return {@code True} if initial value was set.
* @throws IgniteCheckedException In case of error.
* @throws GridCacheEntryRemovedException If entry was removed.
@@ -686,7 +682,8 @@ public interface GridCacheEntryEx {
long expireTime,
boolean preload,
AffinityTopologyVersion topVer,
- GridDrType drType) throws IgniteCheckedException, GridCacheEntryRemovedException;
+ GridDrType drType,
+ boolean fromStore) throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
* Sets new value if current version is <tt>0</tt> using swap entry data.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 12bd556..6d321bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -750,8 +750,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
@Nullable IgniteInternalTx tx,
boolean readSwap,
boolean readThrough,
- boolean failFast,
- boolean unmarshal,
boolean updateMetrics,
boolean evt,
boolean tmp,
@@ -766,7 +764,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
readSwap,
readThrough,
evt,
- unmarshal,
updateMetrics,
tmp,
subjId,
@@ -796,7 +793,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
readSwap,
false,
evt,
- unmarshal,
updateMetrics,
false,
subjId,
@@ -815,7 +811,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean readSwap,
boolean readThrough,
boolean evt,
- boolean unmarshal,
boolean updateMetrics,
boolean tmp,
UUID subjId,
@@ -1987,6 +1982,51 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (isStartVersion())
unswap(retval, false);
+ // Prepare old value.
+ oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val;
+
+ // Possibly read value from store.
+ boolean readFromStore = false;
+
+ Object old0 = null;
+
+ if (readThrough && needVal && oldVal == null && (cctx.readThrough() &&
+ (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) {
+ old0 = readThrough(null, key, false, subjId, taskName);
+
+ oldVal = cctx.toCacheObject(old0);
+
+ readFromStore = true;
+
+ // Detach value before index update.
+ oldVal = cctx.kernalContext().cacheObjects().prepareForCache(oldVal, cctx);
+
+ // Calculate initial TTL and expire time.
+ long initTtl;
+ long initExpireTime;
+
+ if (expiryPlc != null && oldVal != null) {
+ IgniteBiTuple<Long, Long> initTtlAndExpireTime = initialTtlAndExpireTime(expiryPlc);
+
+ initTtl = initTtlAndExpireTime.get1();
+ initExpireTime = initTtlAndExpireTime.get2();
+ }
+ else {
+ initTtl = CU.TTL_ETERNAL;
+ initExpireTime = CU.EXPIRE_TIME_ETERNAL;
+ }
+
+ if (oldVal != null)
+ updateIndex(oldVal, initExpireTime, ver, null);
+ else
+ clearIndex(null);
+
+ update(oldVal, initExpireTime, initTtl, ver, true);
+
+ if (deletedUnlocked() && oldVal != null && !isInternal())
+ deletedUnlocked(false);
+ }
+
Object transformClo = null;
// Request-level conflict resolution is needed, i.e. we do not know who will win in advance.
@@ -2194,51 +2234,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
"Invalid version for inner update [isNew=" + isNew() + ", entry=" + this + ", newVer=" + newVer + ']';
}
- // Prepare old value and value bytes.
- oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val;
-
- // Possibly read value from store.
- boolean readFromStore = false;
-
- Object old0 = null;
-
- if (readThrough && needVal && oldVal == null && (cctx.readThrough() &&
- (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) {
- old0 = readThrough(null, key, false, subjId, taskName);
-
- oldVal = cctx.toCacheObject(old0);
-
- readFromStore = true;
-
- // Detach value before index update.
- oldVal = cctx.kernalContext().cacheObjects().prepareForCache(oldVal, cctx);
-
- // Calculate initial TTL and expire time.
- long initTtl;
- long initExpireTime;
-
- if (expiryPlc != null && oldVal != null) {
- IgniteBiTuple<Long, Long> initTtlAndExpireTime = initialTtlAndExpireTime(expiryPlc);
-
- initTtl = initTtlAndExpireTime.get1();
- initExpireTime = initTtlAndExpireTime.get2();
- }
- else {
- initTtl = CU.TTL_ETERNAL;
- initExpireTime = CU.EXPIRE_TIME_ETERNAL;
- }
-
- if (oldVal != null)
- updateIndex(oldVal, initExpireTime, ver, null);
- else
- clearIndex(null);
-
- update(oldVal, initExpireTime, initTtl, ver, true);
-
- if (deletedUnlocked() && oldVal != null && !isInternal())
- deletedUnlocked(false);
- }
-
// Apply metrics.
if (metrics && cctx.cache().configuration().isStatisticsEnabled() && needVal) {
// PutIfAbsent methods mustn't update hit/miss statistics
@@ -3417,7 +3412,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
long expireTime,
boolean preload,
AffinityTopologyVersion topVer,
- GridDrType drType)
+ GridDrType drType,
+ boolean fromStore)
throws IgniteCheckedException, GridCacheEntryRemovedException {
synchronized (this) {
checkObsolete();
@@ -3470,7 +3466,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
cctx.dataStructures().onEntryUpdated(key, false, true);
}
- if (cctx.store().isLocal()) {
+ if (!fromStore && cctx.store().isLocal()) {
if (val != null)
cctx.store().put(null, key, val, ver);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index e7010f5..feaa618 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -149,6 +149,9 @@ public class GridCacheUtils {
/** Keep serialized flag. */
public static final int KEEP_BINARY_FLAG_MASK = 0x2;
+ /** Flag indicating that old value for 'invoke' operation was non null on primary node. */
+ public static final int OLD_VAL_ON_PRIMARY = 0x4;
+
/** Empty predicate array. */
private static final IgnitePredicate[] EMPTY = new IgnitePredicate[0];
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 2ab6303..14468eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -546,8 +546,14 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
entry = entryEx(key, false);
- entry.initialValue(cacheVal, ver, ttl, CU.EXPIRE_TIME_CALCULATE, false, topVer,
- replicate ? DR_LOAD : DR_NONE);
+ entry.initialValue(cacheVal,
+ ver,
+ ttl,
+ CU.EXPIRE_TIME_CALCULATE,
+ false,
+ topVer,
+ replicate ? DR_LOAD : DR_NONE,
+ false);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to put cache value: " + entry, e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index b52376c..38a7e19 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -1048,7 +1048,14 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
try {
CacheObject val0 = cctx.toCacheObject(val);
- entry0.initialValue(val0, ver, 0, 0, false, topVer, GridDrType.DR_LOAD);
+ entry0.initialValue(val0,
+ ver,
+ 0,
+ 0,
+ false,
+ topVer,
+ GridDrType.DR_LOAD,
+ true);
}
catch (GridCacheEntryRemovedException e) {
assert false : "Should not get removed exception while holding lock on entry " +
@@ -1205,8 +1212,13 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
try {
GridCacheEntryEx entry = cctx.cache().entryEx(info.key(), topVer);
- if (entry.initialValue(info.value(), info.version(), info.ttl(),
- info.expireTime(), true, topVer, replicate ? DR_PRELOAD : DR_NONE)) {
+ if (entry.initialValue(info.value(),
+ info.version(),
+ info.ttl(),
+ info.expireTime(),
+ true, topVer,
+ replicate ? DR_PRELOAD : DR_NONE,
+ false)) {
if (rec && !entry.isInternal())
cctx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(),
(IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, info.value(), true, null,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 0a99621..d5927bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -1184,8 +1184,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
tx,
/*swap*/true,
/*read-through*/false,
- /*fail-fast.*/false,
- /*unmarshal*/false,
/*update-metrics*/true,
/*event notification*/req.returnValue(i),
/*temporary*/false,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index e1baabb..746dbb9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -349,9 +349,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM) {
cached.unswap(retVal);
- boolean readThrough = (retVal || hasFilters) &&
- cacheCtx.config().isLoadPreviousValue() &&
- !txEntry.skipStore();
+ boolean readThrough = !txEntry.skipStore() &&
+ (txEntry.op() == TRANSFORM || ((retVal || hasFilters) && cacheCtx.config().isLoadPreviousValue()));
boolean evt = retVal || txEntry.op() == TRANSFORM;
@@ -367,8 +366,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
tx,
/*swap*/true,
readThrough,
- /*fail fast*/false,
- /*unmarshal*/true,
/*metrics*/retVal,
/*event*/evt,
/*tmp*/false,
@@ -392,6 +389,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
boolean modified = false;
+ txEntry.oldValueOnPrimary(val != null);
+
for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) {
CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(key, val,
txEntry.cached().version(), keepBinary, txEntry.cached());
@@ -1573,8 +1572,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
GridDrType drType = cacheCtx.isDrEnabled() ? GridDrType.DR_PRELOAD : GridDrType.DR_NONE;
try {
- if (entry.initialValue(info.value(), info.version(),
- info.ttl(), info.expireTime(), true, topVer, drType)) {
+ if (entry.initialValue(info.value(),
+ info.version(),
+ info.ttl(),
+ info.expireTime(),
+ true,
+ topVer,
+ drType,
+ false)) {
if (rec && !entry.isInternal())
cacheCtx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(),
(IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, info.value(), true, null,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 9561ad8..2e22d9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -473,8 +473,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
null,
/*swap*/true,
/*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
/**update-metrics*/false,
/*event*/!skipVals,
/*temporary*/false,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index fd59f48..aeb7eba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -397,8 +397,6 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
null,
/*swap*/true,
/*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
/**update-metrics*/false,
/*event*/!skipVals,
/*temporary*/false,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 7460495..9291153 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1371,8 +1371,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
null,
/*swap*/true,
/*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
/**update-metrics*/false,
/*event*/!skipVals,
/*temporary*/false,
@@ -1848,8 +1846,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
null,
/*read swap*/true,
/*read through*/true,
- /*fail fast*/false,
- /*unmarshal*/true,
/*metrics*/true,
/*event*/true,
/*temporary*/true,
@@ -2008,8 +2004,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
null,
/*read swap*/true,
/*read through*/ctx.loadPreviousValue(),
- /*fail fast*/false,
- /*unmarshal*/true,
/*metrics*/true,
/*event*/true,
/*temporary*/true,
@@ -2055,8 +2049,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
null,
/*read swap*/true,
/*read through*/ctx.loadPreviousValue(),
- /*fail fast*/false,
- /*unmarshal*/true,
/*metrics*/true,
/*event*/true,
/*temporary*/true,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index d0850e7..3376510 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -508,8 +508,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
null,
/*swap*/true,
/*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
/**update-metrics*/false,
/*event*/!skipVals,
/*temporary*/false,
@@ -1054,7 +1052,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
IgniteInternalFuture<GridCacheReturn> txFut = tx.lockAllAsync(cacheCtx,
keys,
- tx.implicit(),
+ retval,
txRead,
accessTtl,
skipStore,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 4da1f38..7cbd77c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -552,7 +552,8 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
info.expireTime(),
true,
topVer,
- replicate ? DR_PRELOAD : DR_NONE
+ replicate ? DR_PRELOAD : DR_NONE,
+ false
)) {
if (rec && !entry.isInternal())
cctx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 821d2e4..57d5229 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -688,7 +688,8 @@ public class GridDhtPartitionDemander {
entry.expireTime(),
true,
topVer,
- cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
+ cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE,
+ false
)) {
cctx.evicts().touch(cached, topVer); // Start tracking.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 7d29381..290c08e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -463,8 +463,6 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
tx,
/*swap*/false,
/*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
/*metrics*/true,
/*events*/!skipVals,
/*temporary*/false,
@@ -605,8 +603,6 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
tx,
/*swap*/true,
/*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
/*update-metrics*/false,
/*events*/!nearRead && !skipVals,
/*temporary*/false,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 7500d8f..db736a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -1147,16 +1147,17 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/**
* @param cacheCtx Cache context.
* @param keys Keys.
- * @param implicit Implicit flag.
+ * @param retval Return value flag.
* @param read Read flag.
* @param accessTtl Access ttl.
* @param <K> Key type.
* @param skipStore Skip store flag.
+ * @param keepBinary Keep binary flag.
* @return Future with respond.
*/
public <K> IgniteInternalFuture<GridCacheReturn> lockAllAsync(GridCacheContext cacheCtx,
final Collection<? extends K> keys,
- boolean implicit,
+ boolean retval,
boolean read,
long accessTtl,
boolean skipStore,
@@ -1190,7 +1191,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
this,
isInvalidate(),
read,
- /*retval*/false,
+ retval,
isolation,
accessTtl,
CU.empty0(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index e1d12b6..ac08f8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -550,8 +550,6 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
null,
/*swap*/swapOrOffheap,
/*read-through*/false,
- /*fail-fast*/false,
- /*unmarshal*/true,
/**update-metrics*/true,
/**event*/!skipVals,
/**temporary*/false,
@@ -1111,8 +1109,6 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
null,
/*swap*/true,
/*read-through*/true,
- /*fail-fast*/false,
- /*unmarshal*/true,
/**update-metrics*/true,
/**event*/true,
/**temporary*/true,
@@ -1235,8 +1231,6 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
null,
/*swap*/true,
/*read-through*/ctx.loadPreviousValue(),
- /*fail-fast*/false,
- /*unmarshal*/true,
/**update-metrics*/true,
/**event*/true,
/**temporary*/true,
@@ -1272,8 +1266,6 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
null,
/*swap*/true,
/*read-through*/ctx.loadPreviousValue(),
- /*fail-fast*/false,
- /*unmarshal*/true,
/**update-metrics*/true,
/**event*/true,
/**temporary*/true,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 86a1989..ece013e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1493,8 +1493,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
this,
/*swap*/false,
/*read through*/false,
- /*fail fast*/true,
- /*unmarshal*/true,
/*metrics*/metrics,
/*event*/recordEvt,
/*temporary*/true,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 8d50e2b..87b2525 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -57,6 +58,7 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.KEEP_BINARY_FLAG_MASK;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.OLD_VAL_ON_PRIMARY;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK;
/**
@@ -192,9 +194,11 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
private byte[] expiryPlcBytes;
/**
- * Additional flags.
- * GridCacheUtils.SKIP_STORE_FLAG_MASK - for skipStore flag value.
- * GridCacheUtils.KEEP_BINARY_FLAG_MASK - for withKeepBinary flag.
+ * Additional flags:
+ * <ul>
+ * <li>{@link GridCacheUtils#SKIP_STORE_FLAG_MASK} - for skipStore flag value.</li>
+ * <li>{@link GridCacheUtils#KEEP_BINARY_FLAG_MASK} - for withKeepBinary flag.</li>
+ * </ul>
*/
private byte flags;
@@ -493,6 +497,20 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
}
/**
+ * @param oldValOnPrimary {@code True} If old value for 'invoke' operation was non null on primary node.
+ */
+ public void oldValueOnPrimary(boolean oldValOnPrimary) {
+ setFlag(oldValOnPrimary, OLD_VAL_ON_PRIMARY);
+ }
+
+ /**
+ * @return {@code True} If old value for 'invoke' operation was non null on primary node.
+ */
+ public boolean oldValueOnPrimary() {
+ return isFlag(OLD_VAL_ON_PRIMARY);
+ }
+
+ /**
* Sets keep binary flag value.
*
* @param keepBinary Keep binary flag value.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index a764d5d..7039399 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -26,9 +26,11 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -76,6 +78,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
import static org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture.FINISH_NEAR_ONE_PHASE_SINCE;
import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.USER_FINISH;
@@ -1193,6 +1196,49 @@ public class IgniteTxHandler {
if (info != null && !info.isNew() && !info.isDeleted())
res.addPreloadEntry(info);
}
+
+ if (cacheCtx.readThroughConfigured() &&
+ !entry.skipStore() &&
+ entry.op() == TRANSFORM &&
+ entry.oldValueOnPrimary() &&
+ !entry.hasValue()) {
+ while (true) {
+ try {
+ GridCacheEntryEx cached = entry.cached();
+
+ if (cached == null)
+ cached = cacheCtx.cache().entryEx(entry.key(), req.topologyVersion());
+
+ CacheObject val = cached.innerGet(
+ /*ver*/null,
+ tx,
+ /*readSwap*/true,
+ /*readThrough*/false,
+ /*updateMetrics*/false,
+ /*evt*/false,
+ /*tmp*/false,
+ tx.subjectId(),
+ /*transformClo*/null,
+ tx.resolveTaskName(),
+ /*expiryPlc*/null,
+ /*keepBinary*/true);
+
+ if (val == null)
+ val = cacheCtx.toCacheObject(cacheCtx.store().load(null, entry.key()));
+
+ if (val != null)
+ entry.readValue(val);
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Got entry removed exception, will retry: " + entry.txKey());
+
+ entry.cached(null);
+ }
+ }
+ }
}
catch (GridDhtInvalidPartitionException e) {
tx.addInvalidPartition(cacheCtx, e.partition());
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index f9ef423..d51d873 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1249,8 +1249,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
this,
/*swap*/true,
/*read-through*/false,
- /*fail fast*/true,
- /*unmarshal*/true,
/*metrics*/true,
/*event*/!skipVals,
/*temporary*/false,
@@ -1334,9 +1332,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
null,
this,
/*swap*/true,
- /*no read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
+ /*read-through*/false,
/*metrics*/true,
/*event*/true,
/*temporary*/false,
@@ -1683,8 +1679,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
IgniteTxLocalAdapter.this,
cacheCtx.isSwapOrOffheapEnabled(),
/*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
/*metrics*/true,
/*events*/!skipVals,
/*temporary*/false,
@@ -2025,7 +2019,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
needReadVer,
singleRmv,
hasFilters,
- skipStore,
+ /*read through*/(entryProcessor != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
retval,
keepBinary);
}
@@ -2194,7 +2188,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
needReadVer,
singleRmv,
hasFilters,
- skipStore,
+ /*read through*/(invokeMap != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
retval,
keepBinary);
}
@@ -2214,7 +2208,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
* @param needReadVer Read version flag.
* @param singleRmv {@code True} for single remove operation.
* @param hasFilters {@code True} if filters not empty.
- * @param skipStore Skip store flag.
+ * @param readThrough Read through flag.
* @param retval Return value flag.
* @return Load future.
*/
@@ -2227,7 +2221,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
final boolean needReadVer,
final boolean singleRmv,
final boolean hasFilters,
- final boolean skipStore,
+ final boolean readThrough,
final boolean retval,
final boolean keepBinary) {
GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c =
@@ -2260,6 +2254,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
if (e.op() == TRANSFORM) {
GridCacheVersion ver;
+ e.readValue(cacheVal);
+
try {
ver = e.cached().version();
}
@@ -2286,7 +2282,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
return loadMissing(
cacheCtx,
topVer,
- /*read through*/cacheCtx.config().isLoadPreviousValue() && !skipStore,
+ readThrough,
/*async*/true,
keys,
/*skipVals*/singleRmv,
@@ -2398,8 +2394,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
this,
/*swap*/false,
/*read-through*/false,
- /*fail-fast*/false,
- /*unmarshal*/retval || needVal,
/*metrics*/retval,
/*events*/retval,
/*temporary*/false,
@@ -2696,16 +2690,16 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
if (retval || invoke) {
if (!cacheCtx.isNear()) {
if (!hasPrevVal) {
- boolean readThrough =
- (invoke || cacheCtx.loadPreviousValue()) && !txEntry.skipStore();
+ // For non-local cache should read from store after lock on primary.
+ boolean readThrough = cacheCtx.isLocal() &&
+ (invoke || cacheCtx.loadPreviousValue()) &&
+ !txEntry.skipStore();
v = cached.innerGet(
null,
this,
/*swap*/true,
readThrough,
- /*failFast*/false,
- /*unmarshal*/true,
/*metrics*/!invoke,
/*event*/!invoke && !dht(),
/*temporary*/false,
@@ -2937,7 +2931,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
timeout,
this,
- false,
+ /*read*/entryProcessor != null, // Needed to force load from store.
retval,
isolation,
isInvalidate(),
@@ -3115,7 +3109,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
timeout,
this,
- false,
+ /*read*/invokeMap != null, // Needed to force load from store.
retval,
isolation,
isInvalidate(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 4599060..9dc6a7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1621,7 +1621,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
expiryTime,
false,
topVer,
- GridDrType.DR_LOAD);
+ GridDrType.DR_LOAD,
+ false);
cctx.evicts().touch(entry, topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 348b6c9..400fb14 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -432,8 +432,6 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
@Nullable IgniteInternalTx tx,
boolean readSwap,
boolean readThrough,
- boolean failFast,
- boolean unmarshal,
boolean updateMetrics,
boolean evt,
boolean tmp,
@@ -667,7 +665,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
long expireTime,
boolean preload,
AffinityTopologyVersion topVer,
- GridDrType drType
+ GridDrType drType,
+ boolean fromStore
) throws IgniteCheckedException, GridCacheEntryRemovedException {
assert false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
index ce60232..45b6e9d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
@@ -54,7 +54,7 @@ public abstract class IgniteCacheAbstractTest extends GridCommonAbstractTest {
private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
/** */
- protected static final Map<Object, Object> storeMap = new ConcurrentHashMap8<>();
+ public static final Map<Object, Object> storeMap = new ConcurrentHashMap8<>();
/**
* @return Grids count to start.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughAbstractTest.java
new file mode 100644
index 0000000..294ebea
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughAbstractTest.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import javax.cache.configuration.Factory;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public abstract class IgniteCacheInvokeReadThroughAbstractTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static volatile boolean failed;
+
+ /** */
+ protected boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ failed = false;
+
+ startNodes();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ IgniteCacheAbstractTest.storeMap.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @return Store factory.
+ */
+ protected Factory<CacheStore> cacheStoreFactory() {
+ return new IgniteCacheAbstractTest.TestStoreFactory();
+ }
+
+ /**
+ * @param data Data.
+ * @param cacheName Cache name.
+ * @throws Exception If failed.
+ */
+ protected void putDataInStore(Map<Object, Object> data, String cacheName) throws Exception {
+ IgniteCacheAbstractTest.storeMap.putAll(data);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ protected abstract void startNodes() throws Exception;
+
+ /**
+ * @param ccfg Cache configuration.
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ protected void invokeReadThrough(CacheConfiguration ccfg) throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ ignite0.createCache(ccfg);
+
+ try {
+ int key = 0;
+
+ for (Ignite node : G.allGrids()) {
+ if (node.configuration().isClientMode() && ccfg.getNearConfiguration() != null)
+ node.createNearCache(ccfg.getName(), ccfg.getNearConfiguration());
+ }
+
+ for (Ignite node : G.allGrids()) {
+ log.info("Test for node: " + node.name());
+
+ IgniteCache<Object, Object> cache = node.cache(ccfg.getName());
+
+ for (int i = 0; i < 50; i++)
+ checkReadThrough(cache, key++, null, null);
+
+ Set<Object> keys = new HashSet<>();
+
+ for (int i = 0; i < 5; i++)
+ keys.add(key++);
+
+ checkReadThroughInvokeAll(cache, keys, null, null);
+
+ keys = new HashSet<>();
+
+ for (int i = 0; i < 100; i++)
+ keys.add(key++);
+
+ checkReadThroughInvokeAll(cache, keys, null, null);
+
+ if (ccfg.getAtomicityMode() == TRANSACTIONAL) {
+ for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ log.info("Test tx [concurrency=" + concurrency + ", isolation=" + isolation + ']');
+
+ for (int i = 0; i < 50; i++)
+ checkReadThrough(cache, key++, concurrency, isolation);
+
+ keys = new HashSet<>();
+
+ for (int i = 0; i < 5; i++)
+ keys.add(key++);
+
+ checkReadThroughInvokeAll(cache, keys, concurrency, isolation);
+
+ keys = new HashSet<>();
+
+ for (int i = 0; i < 100; i++)
+ keys.add(key++);
+
+ checkReadThroughInvokeAll(cache, keys, concurrency, isolation);
+ }
+ }
+
+ for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ log.info("Test tx2 [concurrency=" + concurrency + ", isolation=" + isolation + ']');
+
+ for (int i = 0; i < 50; i++)
+ checkReadThroughGetAndInvoke(cache, key++, concurrency, isolation);
+ }
+ }
+ }
+ }
+
+ ignite0.cache(ccfg.getName()).removeAll();
+ }
+ finally {
+ ignite0.destroyCache(ccfg.getName());
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ * @param key Key.
+ * @param concurrency Transaction concurrency.
+ * @param isolation Transaction isolation.
+ * @throws Exception If failed.
+ */
+ private void checkReadThrough(IgniteCache<Object, Object> cache,
+ Object key,
+ @Nullable TransactionConcurrency concurrency,
+ @Nullable TransactionIsolation isolation) throws Exception {
+ putDataInStore(Collections.singletonMap(key, key), cache.getName());
+
+ Transaction tx = isolation != null ? cache.unwrap(Ignite.class).transactions().txStart(concurrency, isolation)
+ : null;
+
+ try {
+ Object ret = cache.invoke(key, new TestEntryProcessor());
+
+ assertEquals(key, ret);
+
+ if (tx != null)
+ tx.commit();
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+
+ checkValue(cache.getName(), key, (Integer)key + 1);
+ }
+
+ /**
+ * @param cache Cache.
+ * @param key Key.
+ * @param concurrency Transaction concurrency.
+ * @param isolation Transaction isolation.
+ * @throws Exception If failed.
+ */
+ private void checkReadThroughGetAndInvoke(IgniteCache<Object, Object> cache,
+ Object key,
+ TransactionConcurrency concurrency,
+ TransactionIsolation isolation) throws Exception {
+ putDataInStore(Collections.singletonMap(key, key), cache.getName());
+
+ try (Transaction tx = cache.unwrap(Ignite.class).transactions().txStart(concurrency, isolation)) {
+ cache.get(key);
+
+ Object ret = cache.invoke(key, new TestEntryProcessor());
+
+ assertEquals(key, ret);
+
+ tx.commit();
+ }
+
+ checkValue(cache.getName(), key, (Integer)key + 1);
+ }
+
+ /**
+ * @param cache Cache.
+ * @param keys Key.
+ * @param concurrency Transaction concurrency.
+ * @param isolation Transaction isolation.
+ * @throws Exception If failed.
+ */
+ private void checkReadThroughInvokeAll(IgniteCache<Object, Object> cache,
+ Set<Object> keys,
+ @Nullable TransactionConcurrency concurrency,
+ @Nullable TransactionIsolation isolation) throws Exception {
+ Map<Object, Object> data = U.newHashMap(keys.size());
+
+ for (Object key : keys)
+ data.put(key, key);
+
+ putDataInStore(data, cache.getName());
+
+ Transaction tx = isolation != null ? cache.unwrap(Ignite.class).transactions().txStart(concurrency, isolation)
+ : null;
+
+ try {
+ Map<Object, EntryProcessorResult<Object>> ret = cache.invokeAll(keys, new TestEntryProcessor());
+
+ assertEquals(ret.size(), keys.size());
+
+ for (Object key : keys) {
+ EntryProcessorResult<Object> res = ret.get(key);
+
+ assertNotNull(res);
+ assertEquals(key, res.get());
+ }
+
+ if (tx != null)
+ tx.commit();
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+
+ for (Object key : keys)
+ checkValue(cache.getName(), key, (Integer)key + 1);
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param key Key.
+ * @param val Expected value.
+ */
+ private void checkValue(String cacheName, Object key, Object val) {
+ for (Ignite ignite : G.allGrids()) {
+ assertEquals("Unexpected value for node: " + ignite.name(),
+ val,
+ ignite.cache(cacheName).get(key));
+ }
+
+ assertFalse(failed);
+ }
+
+ /**
+ * @param cacheMode Cache mode.
+ * @param atomicityMode Atomicity mode.
+ * @param memoryMode Memory mode.
+ * @param backups Number of backups.
+ * @param nearCache Near cache flag.
+ * @return Cache configuration.
+ */
+ @SuppressWarnings("unchecked")
+ protected CacheConfiguration cacheConfiguration(CacheMode cacheMode,
+ CacheAtomicityMode atomicityMode,
+ CacheMemoryMode memoryMode,
+ int backups,
+ boolean nearCache) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setReadThrough(true);
+ ccfg.setWriteThrough(true);
+ ccfg.setCacheStoreFactory(cacheStoreFactory());
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setCacheMode(cacheMode);
+ ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
+ ccfg.setAtomicWriteOrderMode(PRIMARY);
+ ccfg.setMemoryMode(memoryMode);
+
+ if (nearCache)
+ ccfg.setNearConfiguration(new NearCacheConfiguration());
+
+ if (cacheMode == PARTITIONED)
+ ccfg.setBackups(backups);
+
+ return ccfg;
+ }
+
+ /**
+ *
+ */
+ static class TestEntryProcessor implements EntryProcessor<Object, Object, Object> {
+ /** {@inheritDoc} */
+ @Override public Object process(MutableEntry<Object, Object> entry, Object... args) {
+ if (!entry.exists()) {
+ failed = true;
+
+ fail();
+ }
+
+ Integer val = (Integer)entry.getValue();
+
+ if (!val.equals(entry.getKey())) {
+ failed = true;
+
+ assertEquals(val, entry.getKey());
+ }
+
+ entry.setValue(val + 1);
+
+ return val;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughSingleNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughSingleNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughSingleNodeTest.java
new file mode 100644
index 0000000..b451abf
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughSingleNodeTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public class IgniteCacheInvokeReadThroughSingleNodeTest extends IgniteCacheInvokeReadThroughAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected void startNodes() throws Exception {
+ startGrid(0);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomic() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED, 1, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomic_Offheap() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, OFFHEAP_TIERED, 1, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomicNearCache() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED, 1, true));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomicReplicated() throws Exception {
+ invokeReadThrough(cacheConfiguration(REPLICATED, ATOMIC, ONHEAP_TIERED, 0, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomicLocal() throws Exception {
+ invokeReadThrough(cacheConfiguration(LOCAL, ATOMIC, ONHEAP_TIERED, 0, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTx() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED, 1, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTx_Offheap() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, OFFHEAP_TIERED, 1, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTxNearCache() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED, 1, true));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTxReplicated() throws Exception {
+ invokeReadThrough(cacheConfiguration(REPLICATED, TRANSACTIONAL, ONHEAP_TIERED, 0, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTxLocal() throws Exception {
+ invokeReadThrough(cacheConfiguration(LOCAL, TRANSACTIONAL, ONHEAP_TIERED, 0, false));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
index 2464e81..9578227 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
@@ -17,111 +17,163 @@
package org.apache.ignite.internal.processors.cache;
-import javax.cache.configuration.Factory;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.MutableEntry;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.store.CacheStore;
-import org.apache.ignite.configuration.NearCacheConfiguration;
-
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
/**
*
*/
-public class IgniteCacheInvokeReadThroughTest extends IgniteCacheAbstractTest {
+public class IgniteCacheInvokeReadThroughTest extends IgniteCacheInvokeReadThroughAbstractTest {
/** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-114");
- }
+ @Override protected void startNodes() throws Exception {
+ startGridsMultiThreaded(4);
- /** */
- private static volatile boolean failed;
+ client = true;
- /** {@inheritDoc} */
- @Override protected int gridCount() {
- return 3;
+ startGrid(4);
}
- /** {@inheritDoc} */
- @Override protected CacheMode cacheMode() {
- return PARTITIONED;
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomic0() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED, 0, false));
}
- /** {@inheritDoc} */
- @Override protected CacheAtomicityMode atomicityMode() {
- return TRANSACTIONAL;
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomic1() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED, 1, false));
}
- /** {@inheritDoc} */
- @Override protected NearCacheConfiguration nearConfiguration() {
- return null;
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomic2() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED, 2, false));
}
- /** {@inheritDoc} */
- @Override protected Factory<CacheStore> cacheStoreFactory() {
- return new TestStoreFactory();
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomicNearCache() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED, 1, true));
}
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
- failed = false;
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomicReplicated() throws Exception {
+ invokeReadThrough(cacheConfiguration(REPLICATED, ATOMIC, ONHEAP_TIERED, 0, false));
}
/**
* @throws Exception If failed.
*/
- public void testInvokeReadThrough() throws Exception {
- IgniteCache<Integer, Integer> cache = jcache(0);
-
- checkReadThrough(cache, primaryKey(cache));
+ public void testInvokeReadThroughAtomic0_Offheap() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, OFFHEAP_TIERED, 0, false));
+ }
- checkReadThrough(cache, backupKey(cache));
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomic1_Offheap() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, OFFHEAP_TIERED, 1, false));
+ }
- checkReadThrough(cache, nearKey(cache));
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomic2_Offheap() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, OFFHEAP_TIERED, 2, false));
}
/**
- * @param cache Cache.
- * @param key Key.
+ * @throws Exception If failed.
*/
- private void checkReadThrough(IgniteCache<Integer, Integer> cache, Integer key) {
- log.info("Test [key=" + key + ']');
+ public void testInvokeReadThroughAtomicNearCache_Offheap() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, OFFHEAP_TIERED, 1, true));
+ }
- storeMap.put(key, key);
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomicReplicated_Offheap() throws Exception {
+ invokeReadThrough(cacheConfiguration(REPLICATED, ATOMIC, OFFHEAP_TIERED, 0, false));
+ }
- Object ret = cache.invoke(key, new EntryProcessor<Integer, Integer, Object>() {
- @Override public Object process(MutableEntry<Integer, Integer> entry, Object... args) {
- if (!entry.exists()) {
- failed = true;
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTx0() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED, 0, false));
+ }
- fail();
- }
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTx1() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED, 1, false));
+ }
- Integer val = entry.getValue();
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTx2() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED, 2, false));
+ }
- if (!val.equals(entry.getKey())) {
- failed = true;
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTxNearCache() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED, 1, true));
+ }
- assertEquals(val, entry.getKey());
- }
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTxReplicated() throws Exception {
+ invokeReadThrough(cacheConfiguration(REPLICATED, TRANSACTIONAL, ONHEAP_TIERED, 0, false));
+ }
- entry.setValue(val + 1);
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTx0_Offheap() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, OFFHEAP_TIERED, 0, false));
+ }
- return val;
- }
- });
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTx1_Offheap() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, OFFHEAP_TIERED, 1, false));
+ }
- assertEquals(key, ret);
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTx2_Offheap() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, OFFHEAP_TIERED, 2, false));
+ }
- for (int i = 0; i < gridCount(); i++)
- assertEquals("Unexpected value for node: " + i, key + 1, jcache(i).get(key));
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTxNearCache_Offheap() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, OFFHEAP_TIERED, 1, true));
+ }
- assertFalse(failed);
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTxReplicated_Offheap() throws Exception {
+ invokeReadThrough(cacheConfiguration(REPLICATED, TRANSACTIONAL, OFFHEAP_TIERED, 0, false));
}
}
[14/16] ignite git commit: Fixed compilation
Posted by vk...@apache.org.
Fixed compilation
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e9546aa7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e9546aa7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e9546aa7
Branch: refs/heads/master
Commit: e9546aa76be97dde30d3d4777b5770d15cc4d1fa
Parents: 10916ed
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Jun 15 13:44:23 2016 +0300
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Jun 15 13:52:16 2016 +0300
----------------------------------------------------------------------
.../spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e9546aa7/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index fb15ff1..f198f0c 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -32,7 +32,6 @@ import org.apache.spark.sql._
import org.apache.spark._
import scala.collection.JavaConversions._
-import scala.reflect.macros.whitebox
/**
* Ignite RDD. Represents Ignite cache as Spark RDD abstraction.
[12/16] ignite git commit: IGNITE-3215 - Added
IgniteRDD.withKeepBinary method
Posted by vk...@apache.org.
IGNITE-3215 - Added IgniteRDD.withKeepBinary method
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6faa1f22
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6faa1f22
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6faa1f22
Branch: refs/heads/master
Commit: 6faa1f229bf24d5aa5dc6fc99023283639ecacfe
Parents: ab7fa49
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Tue Jun 14 18:09:10 2016 +0300
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Jun 15 12:21:14 2016 +0300
----------------------------------------------------------------------
.../org/apache/ignite/spark/IgniteContext.scala | 4 +--
.../org/apache/ignite/spark/IgniteRDD.scala | 19 ++++++++---
.../apache/ignite/spark/JavaIgniteContext.scala | 4 +--
.../org/apache/ignite/spark/JavaIgniteRDD.scala | 2 ++
.../ignite/spark/impl/IgniteAbstractRDD.scala | 15 ++++++---
.../apache/ignite/spark/impl/IgniteSqlRDD.scala | 5 +--
.../spark/impl/JavaIgniteAbstractRDD.scala | 34 --------------------
.../org/apache/ignite/spark/IgniteRDDSpec.scala | 25 ++++++++++++--
8 files changed, 58 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6faa1f22/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
index 182605c..c63a370 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -106,7 +106,7 @@ class IgniteContext[K, V](
* @return `IgniteRDD` instance.
*/
def fromCache(cacheName: String): IgniteRDD[K, V] = {
- new IgniteRDD[K, V](this, cacheName, null)
+ new IgniteRDD[K, V](this, cacheName, null, false)
}
/**
@@ -117,7 +117,7 @@ class IgniteContext[K, V](
* @return `IgniteRDD` instance.
*/
def fromCache(cacheCfg: CacheConfiguration[K, V]) = {
- new IgniteRDD[K, V](this, cacheCfg.getName, cacheCfg)
+ new IgniteRDD[K, V](this, cacheCfg.getName, cacheCfg, false)
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/6faa1f22/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index fa96212..cad96b9 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -45,8 +45,9 @@ import scala.collection.JavaConversions._
class IgniteRDD[K, V] (
val ic: IgniteContext[K, V],
val cacheName: String,
- val cacheCfg: CacheConfiguration[K, V]
-) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg) {
+ val cacheCfg: CacheConfiguration[K, V],
+ val keepBinary: Boolean
+) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg, keepBinary) {
/**
* Computes iterator based on given partition.
*
@@ -127,7 +128,8 @@ class IgniteRDD[K, V] (
qry.setArgs(args.map(_.asInstanceOf[Object]):_*)
- new IgniteSqlRDD[(K, V), Cache.Entry[K, V], K, V](ic, cacheName, cacheCfg, qry, entry \u21d2 (entry.getKey, entry.getValue))
+ new IgniteSqlRDD[(K, V), Cache.Entry[K, V], K, V](ic, cacheName, cacheCfg, qry,
+ entry \u21d2 (entry.getKey, entry.getValue), keepBinary)
}
/**
@@ -144,7 +146,8 @@ class IgniteRDD[K, V] (
val schema = buildSchema(ensureCache().query(qry).asInstanceOf[QueryCursorEx[java.util.List[_]]].fieldsMeta())
- val rowRdd = new IgniteSqlRDD[Row, java.util.List[_], K, V](ic, cacheName, cacheCfg, qry, list \u21d2 Row.fromSeq(list))
+ val rowRdd = new IgniteSqlRDD[Row, java.util.List[_], K, V](
+ ic, cacheName, cacheCfg, qry, list \u21d2 Row.fromSeq(list), keepBinary)
ic.sqlContext.createDataFrame(rowRdd, schema)
}
@@ -290,6 +293,14 @@ class IgniteRDD[K, V] (
ensureCache().removeAll()
}
+ def withKeepBinary[K1, V1](): IgniteRDD[K1, V1] = {
+ new IgniteRDD[K1, V1](
+ ic.asInstanceOf[IgniteContext[K1, V1]],
+ cacheName,
+ cacheCfg.asInstanceOf[CacheConfiguration[K1, V1]],
+ true)
+ }
+
/**
* Builds spark schema from query metadata.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/6faa1f22/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
index 44b1cd9..25184e7 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
@@ -52,10 +52,10 @@ class JavaIgniteContext[K, V](
}
def fromCache(cacheName: String): JavaIgniteRDD[K, V] =
- JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null))
+ JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null, false))
def fromCache(cacheCfg: CacheConfiguration[K, V]) =
- JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheCfg.getName, cacheCfg))
+ JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheCfg.getName, cacheCfg, false))
def ignite(): Ignite = ic.ignite()
http://git-wip-us.apache.org/repos/asf/ignite/blob/6faa1f22/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
index cac0e15..1efc6ae 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
@@ -96,6 +96,8 @@ class JavaIgniteRDD[K, V](override val rdd: IgniteRDD[K, V])
savePairs(jrdd, f, overwrite = false)
def clear(): Unit = rdd.clear()
+
+ def withKeepBinary[K1, V1](): JavaIgniteRDD[K1, V1] = new JavaIgniteRDD[K1, V1](rdd.withKeepBinary[K1, V1]())
}
object JavaIgniteRDD {
http://git-wip-us.apache.org/repos/asf/ignite/blob/6faa1f22/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala
index 25b3b56..9d5171c 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala
@@ -27,13 +27,20 @@ import scala.reflect.ClassTag
abstract class IgniteAbstractRDD[R:ClassTag, K, V] (
ic: IgniteContext[K, V],
cacheName: String,
- cacheCfg: CacheConfiguration[K, V]
+ cacheCfg: CacheConfiguration[K, V],
+ keepBinary: Boolean
) extends RDD[R] (ic.sparkContext, deps = Nil) {
protected def ensureCache(): IgniteCache[K, V] = {
// Make sure to deploy the cache
- if (cacheCfg != null)
- ic.ignite().getOrCreateCache(cacheCfg)
+ val cache =
+ if (cacheCfg != null)
+ ic.ignite().getOrCreateCache(cacheCfg)
+ else
+ ic.ignite().getOrCreateCache(cacheName)
+
+ if (keepBinary)
+ cache.withKeepBinary()
else
- ic.ignite().getOrCreateCache(cacheName)
+ cache.asInstanceOf[IgniteCache[K, V]]
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6faa1f22/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
index 762a6ed..b4579aa 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
@@ -29,8 +29,9 @@ class IgniteSqlRDD[R: ClassTag, T, K, V](
cacheName: String,
cacheCfg: CacheConfiguration[K, V],
qry: Query[T],
- conv: (T) \u21d2 R
-) extends IgniteAbstractRDD[R, K, V](ic, cacheName, cacheCfg) {
+ conv: (T) \u21d2 R,
+ keepBinary: Boolean
+) extends IgniteAbstractRDD[R, K, V](ic, cacheName, cacheCfg, keepBinary) {
override def compute(split: Partition, context: TaskContext): Iterator[R] = {
new IgniteQueryIterator[T, R](ensureCache().query(qry).iterator(), conv)
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6faa1f22/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
deleted file mode 100644
index 13bd3e8..0000000
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spark.impl
-
-import org.apache.ignite.IgniteCache
-import org.apache.ignite.spark.IgniteRDD
-import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike}
-
-abstract class JavaIgniteAbstractRDD[K, V](val rdd: IgniteRDD[K, V])
- extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
-
- protected def ensureCache(): IgniteCache[K, V] = {
- // Make sure to deploy the cache
- if (rdd.cacheCfg != null)
- rdd.ic.ignite().getOrCreateCache(rdd.cacheCfg)
- else
- rdd.ic.ignite().getOrCreateCache(rdd.cacheName)
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6faa1f22/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala
index 15a51ae..dff82f4 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala
@@ -18,7 +18,7 @@
package org.apache.ignite.spark
import org.apache.ignite.Ignition
-import org.apache.ignite.cache.query.annotations.{QueryTextField, QuerySqlField}
+import org.apache.ignite.cache.query.annotations.{QuerySqlField, QueryTextField}
import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder
@@ -26,9 +26,10 @@ import org.apache.spark.SparkContext
import org.junit.runner.RunWith
import org.scalatest._
import org.scalatest.junit.JUnitRunner
-import scala.collection.JavaConversions._
+import scala.collection.JavaConversions._
import IgniteRDDSpec._
+import org.apache.ignite.binary.BinaryObject
import scala.annotation.meta.field
@@ -294,6 +295,26 @@ class IgniteRDDSpec extends FunSpec with Matchers with BeforeAndAfterAll with Be
sc.stop()
}
}
+
+ it("should properly work with binary objects") {
+ val sc = new SparkContext("local[*]", "test")
+
+ try {
+ val ic = new IgniteContext[String, Entity](sc, () \u21d2 configuration("client", client = true))
+
+ val cache = ic.fromCache(PARTITIONED_CACHE_NAME)
+
+ cache.savePairs(sc.parallelize(0 until 10, 2).map(i \u21d2 (String.valueOf(i),
+ new Entity(i, "name" + i, i * 100))))
+
+ val res = cache.withKeepBinary[String, BinaryObject]().map(t \u21d2 t._2.field[Int]("salary")).collect()
+
+ println(res)
+ }
+ finally {
+ sc.stop()
+ }
+ }
}
override protected def beforeEach() = {
[08/16] ignite git commit: Merge branch 'gridgain-7.6.1' of
https://github.com/gridgain/apache-ignite into gridgain-7.6.1
Posted by vk...@apache.org.
Merge branch 'gridgain-7.6.1' of https://github.com/gridgain/apache-ignite into gridgain-7.6.1
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fda4110b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fda4110b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fda4110b
Branch: refs/heads/master
Commit: fda4110baf5cba8c6d32d0cdd3d7e2c8945af2e5
Parents: 6094ee0 3a4f587
Author: Denis Magda <dm...@gridgain.com>
Authored: Tue Jun 14 13:25:38 2016 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Tue Jun 14 13:25:38 2016 +0300
----------------------------------------------------------------------
.../affinity/fair/FairAffinityFunction.java | 2 +
.../rendezvous/RendezvousAffinityFunction.java | 2 +
.../configuration/CacheConfiguration.java | 3 +
.../ignite/internal/MarshallerContextImpl.java | 51 ++-
.../ignite/internal/binary/BinaryContext.java | 43 ++-
.../GridClientConnectionManagerAdapter.java | 25 +-
.../connection/GridClientNioTcpConnection.java | 3 +
.../GridClientOptimizedMarshaller.java | 4 +-
.../GridClientZipOptimizedMarshaller.java | 167 ++++++++
.../impl/GridTcpRouterNioListenerAdapter.java | 11 +-
.../processors/cache/GridCacheAdapter.java | 46 ++-
.../processors/cache/GridCacheEntryEx.java | 9 +-
.../processors/cache/GridCacheMapEntry.java | 100 +++--
.../processors/cache/GridCacheProcessor.java | 4 +
.../processors/cache/GridCacheUtils.java | 3 +
.../binary/CacheObjectBinaryProcessorImpl.java | 29 +-
.../distributed/dht/GridDhtCacheAdapter.java | 10 +-
.../distributed/dht/GridDhtLockFuture.java | 18 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 2 -
.../distributed/dht/GridDhtTxPrepareFuture.java | 19 +-
.../dht/GridPartitionedGetFuture.java | 2 -
.../dht/GridPartitionedSingleGetFuture.java | 2 -
.../dht/atomic/GridDhtAtomicCache.java | 8 -
.../dht/colocated/GridDhtColocatedCache.java | 4 +-
.../dht/preloader/GridDhtForceKeysFuture.java | 3 +-
.../dht/preloader/GridDhtPartitionDemander.java | 3 +-
.../dht/preloader/GridDhtPartitionMap2.java | 7 +-
.../distributed/near/GridNearGetFuture.java | 4 -
.../cache/distributed/near/GridNearTxLocal.java | 7 +-
.../local/atomic/GridLocalAtomicCache.java | 8 -
.../continuous/CacheContinuousQueryManager.java | 135 +++++++
.../cache/transactions/IgniteTxAdapter.java | 2 -
.../cache/transactions/IgniteTxEntry.java | 24 +-
.../cache/transactions/IgniteTxHandler.java | 46 +++
.../transactions/IgniteTxLocalAdapter.java | 34 +-
.../cacheobject/IgniteCacheObjectProcessor.java | 7 +
.../IgniteCacheObjectProcessorImpl.java | 5 +
.../continuous/GridContinuousProcessor.java | 216 ++++++++---
.../datastreamer/DataStreamerImpl.java | 3 +-
.../message/GridClientHandshakeRequest.java | 4 +-
.../protocols/tcp/GridTcpRestNioListener.java | 19 +-
.../rest/protocols/tcp/GridTcpRestProtocol.java | 12 +-
.../service/GridServiceProcessor.java | 305 ++++++++-------
.../ignite/internal/util/nio/GridNioServer.java | 10 +-
.../ignite/internal/visor/cache/VisorCache.java | 56 +--
.../visor/cache/VisorCachePartition.java | 89 +++++
.../visor/cache/VisorCachePartitions.java | 88 +++++
.../visor/cache/VisorCachePartitionsTask.java | 152 ++++++++
.../internal/visor/cache/VisorCacheV3.java | 68 +---
.../ignite/spi/discovery/tcp/ServerImpl.java | 4 +-
...eClientReconnectContinuousProcessorTest.java | 60 ++-
.../ignite/internal/binary/AffinityKey.java | 69 ++++
.../binary/GridBinaryAffinityKeySelfTest.java | 15 +
...aultBinaryMappersBinaryMetaDataSelfTest.java | 17 +
.../processors/cache/GridCacheTestEntryEx.java | 5 +-
.../cache/IgniteCacheAbstractTest.java | 2 +-
...gniteCacheInvokeReadThroughAbstractTest.java | 382 +++++++++++++++++++
...iteCacheInvokeReadThroughSingleNodeTest.java | 106 +++++
.../cache/IgniteCacheInvokeReadThroughTest.java | 182 +++++----
.../IgniteCacheReadThroughStoreCallTest.java | 288 ++++++++++++++
.../IgniteCacheLoaderWriterAbstractTest.java | 10 +
...ridCacheContinuousQueryAbstractSelfTest.java | 2 +-
.../IgniteNoCustomEventsOnNodeStart.java | 85 +++++
.../service/GridServiceClientNodeTest.java | 102 ++++-
.../testsuites/IgniteCacheTestSuite2.java | 3 +
.../testsuites/IgniteCacheTestSuite4.java | 4 +
66 files changed, 2609 insertions(+), 601 deletions(-)
----------------------------------------------------------------------
[06/16] ignite git commit: Tcp discovery minor: create buffered input
stream with size = socket receiveBufferSize. (cherry picked from commit
54425bf)
Posted by vk...@apache.org.
Tcp discovery minor: create buffered input stream with size = socket receiveBufferSize.
(cherry picked from commit 54425bf)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3a4f587b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3a4f587b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3a4f587b
Branch: refs/heads/master
Commit: 3a4f587b457bd1bb86665bf51757a2aa3f986f07
Parents: c88ce20
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 14 11:42:52 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 14 11:50:51 2016 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3a4f587b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index a9a86bc..37a1539 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -5206,7 +5206,9 @@ class ServerImpl extends TcpDiscoveryImpl {
for (IgniteInClosure<Socket> connLsnr : spi.incomeConnLsnrs)
connLsnr.apply(sock);
- in = new BufferedInputStream(sock.getInputStream());
+ int rcvBufSize = sock.getReceiveBufferSize();
+
+ in = new BufferedInputStream(sock.getInputStream(), rcvBufSize > 0 ? rcvBufSize : 8192);
byte[] buf = new byte[4];
int read = 0;
[05/16] ignite git commit: ignite-3038 Do not use custom discovery
events to start continuous queries for system caches (cherry picked from
commit 7f878c5)
Posted by vk...@apache.org.
ignite-3038 Do not use custom discovery events to start continuous queries for system caches
(cherry picked from commit 7f878c5)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c88ce20a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c88ce20a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c88ce20a
Branch: refs/heads/master
Commit: c88ce20a62e315418cac014c3419ce254c591bee
Parents: e5fcebb
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 14 11:16:29 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 14 11:16:29 2016 +0300
----------------------------------------------------------------------
.../processors/continuous/IgniteNoCustomEventsOnNodeStart.java | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c88ce20a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
index cbeb23f..5ecc27a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.continuous;
-import com.mchange.v2.c3p0.util.TestUtils;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
[11/16] ignite git commit: IGNITE-3273 - fixed
Posted by vk...@apache.org.
IGNITE-3273 - fixed
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ab7fa496
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ab7fa496
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ab7fa496
Branch: refs/heads/master
Commit: ab7fa496459d9e8560c56d0daa9c56c171dea405
Parents: 84547ef
Author: Sergi Vladykin <se...@gmail.com>
Authored: Tue Jun 14 21:47:11 2016 +0300
Committer: Sergi Vladykin <se...@gmail.com>
Committed: Wed Jun 15 10:29:39 2016 +0300
----------------------------------------------------------------------
.../processors/query/h2/sql/GridSqlConst.java | 5 ++
.../processors/query/h2/sql/GridSqlJoin.java | 17 +++--
.../processors/query/h2/sql/GridSqlType.java | 5 ++
.../query/IgniteSqlSplitterSelfTest.java | 75 ++++++++++++++++++++
4 files changed, 93 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab7fa496/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlConst.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlConst.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlConst.java
index 36c95ce..976eb2c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlConst.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlConst.java
@@ -19,12 +19,17 @@ package org.apache.ignite.internal.processors.query.h2.sql;
import java.util.Collections;
import org.h2.value.Value;
+import org.h2.value.ValueBoolean;
/**
* Constant value.
*/
public class GridSqlConst extends GridSqlElement implements GridSqlValue {
/** */
+ public static final GridSqlElement TRUE = new GridSqlConst(ValueBoolean.get(true))
+ .resultType(GridSqlType.BOOLEAN);
+
+ /** */
private final Value val;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab7fa496/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlJoin.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlJoin.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlJoin.java
index 0148404..f1ad2e5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlJoin.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlJoin.java
@@ -36,13 +36,15 @@ public class GridSqlJoin extends GridSqlElement {
* @param on Join condition.
*/
public GridSqlJoin(GridSqlElement leftTbl, GridSqlElement rightTbl, boolean leftOuter, @Nullable GridSqlElement on) {
- super(new ArrayList<GridSqlElement>(on == null ? 2 : 3));
+ super(new ArrayList<GridSqlElement>(3));
addChild(leftTbl);
addChild(rightTbl);
- if (on != null)
- addChild(on);
+ if (on == null) // To avoid query nesting issues in FROM clause we need to always generate ON condition.
+ on = GridSqlConst.TRUE;
+
+ addChild(on);
this.leftOuter = leftOuter;
}
@@ -64,8 +66,8 @@ public class GridSqlJoin extends GridSqlElement {
/**
* @return {@code JOIN ON} condition.
*/
- @Nullable public GridSqlElement on() {
- return size() < 3 ? null : child(2);
+ public GridSqlElement on() {
+ return child(2);
}
/** {@inheritDoc} */
@@ -78,10 +80,7 @@ public class GridSqlJoin extends GridSqlElement {
buff.append(rightTable().getSQL());
- GridSqlElement on = on();
-
- if (on != null)
- buff.append(" \n ON ").append(StringUtils.unEnclose(on.getSQL()));
+ buff.append(" \n ON ").append(StringUtils.unEnclose(on().getSQL()));
return buff.toString();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab7fa496/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlType.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlType.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlType.java
index febf174..efe9138 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlType.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlType.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.h2.expression.Expression;
import org.h2.table.Column;
import org.h2.value.Value;
+import org.h2.value.ValueBoolean;
import org.h2.value.ValueDouble;
import org.h2.value.ValueLong;
@@ -43,6 +44,10 @@ public final class GridSqlType {
/** */
public static final GridSqlType UUID = new GridSqlType(Value.UUID, 0, Integer.MAX_VALUE, 36, "UUID");
+ /** */
+ public static final GridSqlType BOOLEAN = new GridSqlType(Value.BOOLEAN, 0, ValueBoolean.PRECISION,
+ ValueBoolean.DISPLAY_SIZE, "BOOLEAN");
+
/** H2 type. */
private final int type;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab7fa496/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index d0e2780..fd52469 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -313,6 +313,81 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
}
/**
+ *
+ */
+ public void testImplicitJoinConditionGeneration() {
+ IgniteCache<Integer, Person> p = ignite(0).createCache(cacheConfig("P", true, Integer.class, Person.class));
+ IgniteCache<Integer, Department> d = ignite(0).createCache(cacheConfig("D", true, Integer.class, Department.class));
+ IgniteCache<Integer, Org> o = ignite(0).createCache(cacheConfig("O", true, Integer.class, Org.class));
+
+ try {
+ info("Plan: " + p.query(new SqlFieldsQuery(
+ "explain select P.Person.*,dep.*,org.* " +
+ "from P.Person inner join D.Department dep ON dep.id=P.Person.depId " +
+ "left join O.Org org ON org.id=dep.orgId"
+ )).getAll());
+
+ assertEquals(0, p.query(new SqlFieldsQuery(
+ "select P.Person.*,dep.*,org.* " +
+ "from P.Person inner join D.Department dep ON dep.id=P.Person.depId " +
+ "left join O.Org org ON org.id=dep.orgId"
+ )).getAll().size());
+ }
+ finally {
+ p.destroy();
+ d.destroy();
+ o.destroy();
+ }
+ }
+
+ /**
+ *
+ */
+ public static class Person {
+ /** */
+ @QuerySqlField
+ private int id;
+
+ /** */
+ @QuerySqlField
+ private String name;
+
+ /** */
+ @QuerySqlField
+ private int depId;
+ }
+
+ /**
+ *
+ */
+ public static class Org {
+ /** */
+ @QuerySqlField(index = true)
+ private int id;
+
+ /** */
+ @QuerySqlField
+ private String name;
+ }
+
+ /**
+ *
+ */
+ public static class Department {
+ /** */
+ @QuerySqlField(index = true)
+ private int id;
+
+ /** */
+ @QuerySqlField(index = true)
+ private int orgId;
+
+ /** */
+ @QuerySqlField
+ private String name;
+ }
+
+ /**
* Test value.
*/
private static class GroupIndexTestValue implements Serializable {
[13/16] ignite git commit: IGNITE-3215 - Added
IgniteRDD.withKeepBinary method (ScalaDoc)
Posted by vk...@apache.org.
IGNITE-3215 - Added IgniteRDD.withKeepBinary method (ScalaDoc)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/10916ed0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/10916ed0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/10916ed0
Branch: refs/heads/master
Commit: 10916ed03688f820cef93866c8ea71226bba3716
Parents: 6faa1f2
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Jun 15 12:19:44 2016 +0300
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Jun 15 12:21:22 2016 +0300
----------------------------------------------------------------------
.../src/main/scala/org/apache/ignite/spark/IgniteRDD.scala | 7 +++++++
1 file changed, 7 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/10916ed0/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index cad96b9..fb15ff1 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql._
import org.apache.spark._
import scala.collection.JavaConversions._
+import scala.reflect.macros.whitebox
/**
* Ignite RDD. Represents Ignite cache as Spark RDD abstraction.
@@ -293,6 +294,12 @@ class IgniteRDD[K, V] (
ensureCache().removeAll()
}
+ /**
+ * Returns `IgniteRDD` that will operate with binary objects. This method
+ * behaves similar to [[org.apache.ignite.IgniteCache#withKeepBinary]].
+ *
+ * @return New `IgniteRDD` instance for binary objects.
+ */
def withKeepBinary[K1, V1](): IgniteRDD[K1, V1] = {
new IgniteRDD[K1, V1](
ic.asInstanceOf[IgniteContext[K1, V1]],
[16/16] ignite git commit: Merge branch 'gridgain-7.6.1'
Posted by vk...@apache.org.
Merge branch 'gridgain-7.6.1'
# Conflicts:
# modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b742f5fc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b742f5fc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b742f5fc
Branch: refs/heads/master
Commit: b742f5fc8eadd850a01246be7f88f08f424a471f
Parents: b500d3a 5c14928
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Jun 15 15:04:34 2016 +0300
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Jun 15 15:04:34 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/MarshallerContextImpl.java | 51 ++-
.../ignite/internal/binary/BinaryContext.java | 43 ++-
.../processors/cache/GridCacheAdapter.java | 46 ++-
.../processors/cache/GridCacheEntryEx.java | 9 +-
.../processors/cache/GridCacheMapEntry.java | 100 +++--
.../processors/cache/GridCacheUtils.java | 3 +
.../binary/CacheObjectBinaryProcessorImpl.java | 29 +-
.../distributed/dht/GridDhtCacheAdapter.java | 10 +-
.../distributed/dht/GridDhtLockFuture.java | 18 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 2 -
.../distributed/dht/GridDhtTxPrepareFuture.java | 19 +-
.../dht/GridPartitionedGetFuture.java | 2 -
.../dht/GridPartitionedSingleGetFuture.java | 2 -
.../dht/atomic/GridDhtAtomicCache.java | 8 -
.../dht/colocated/GridDhtColocatedCache.java | 4 +-
.../dht/preloader/GridDhtForceKeysFuture.java | 3 +-
.../dht/preloader/GridDhtPartitionDemander.java | 3 +-
.../distributed/near/GridNearGetFuture.java | 4 -
.../cache/distributed/near/GridNearTxLocal.java | 7 +-
.../local/atomic/GridLocalAtomicCache.java | 8 -
.../continuous/CacheContinuousQueryEntry.java | 16 +
.../continuous/CacheContinuousQueryHandler.java | 2 +-
.../continuous/CacheContinuousQueryManager.java | 135 +++++++
.../cache/transactions/IgniteTxAdapter.java | 2 -
.../cache/transactions/IgniteTxEntry.java | 24 +-
.../cache/transactions/IgniteTxHandler.java | 46 +++
.../transactions/IgniteTxLocalAdapter.java | 34 +-
.../cacheobject/IgniteCacheObjectProcessor.java | 7 +
.../IgniteCacheObjectProcessorImpl.java | 5 +
.../continuous/GridContinuousProcessor.java | 216 ++++++++---
.../datastreamer/DataStreamerImpl.java | 3 +-
.../service/GridServiceProcessor.java | 305 ++++++++-------
.../ignite/spi/discovery/tcp/ServerImpl.java | 4 +-
...eClientReconnectContinuousProcessorTest.java | 60 ++-
.../ignite/internal/binary/AffinityKey.java | 69 ++++
.../binary/GridBinaryAffinityKeySelfTest.java | 15 +
...aultBinaryMappersBinaryMetaDataSelfTest.java | 17 +
.../processors/cache/GridCacheTestEntryEx.java | 5 +-
.../cache/IgniteCacheAbstractTest.java | 2 +-
...gniteCacheInvokeReadThroughAbstractTest.java | 382 +++++++++++++++++++
...iteCacheInvokeReadThroughSingleNodeTest.java | 106 +++++
.../cache/IgniteCacheInvokeReadThroughTest.java | 182 +++++----
.../IgniteCacheReadThroughStoreCallTest.java | 288 ++++++++++++++
.../IgniteCacheLoaderWriterAbstractTest.java | 10 +
...ridCacheContinuousQueryAbstractSelfTest.java | 2 +-
...niteCacheContinuousQueryBackupQueueTest.java | 135 +++++++
.../IgniteNoCustomEventsOnNodeStart.java | 85 +++++
.../service/GridServiceClientNodeTest.java | 102 ++++-
.../testsuites/IgniteCacheTestSuite2.java | 3 +
.../testsuites/IgniteCacheTestSuite4.java | 4 +
.../IgniteCacheQuerySelfTestSuite3.java | 2 +
.../org/apache/ignite/spark/IgniteContext.scala | 4 +-
.../org/apache/ignite/spark/IgniteRDD.scala | 25 +-
.../apache/ignite/spark/JavaIgniteContext.scala | 4 +-
.../org/apache/ignite/spark/JavaIgniteRDD.scala | 2 +
.../ignite/spark/impl/IgniteAbstractRDD.scala | 15 +-
.../apache/ignite/spark/impl/IgniteSqlRDD.scala | 5 +-
.../spark/impl/JavaIgniteAbstractRDD.scala | 34 --
.../org/apache/ignite/spark/IgniteRDDSpec.scala | 25 +-
59 files changed, 2242 insertions(+), 511 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b742f5fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b742f5fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b742f5fc/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
----------------------------------------------------------------------
[15/16] ignite git commit: IGNITE-2344 - WebSessionFilter doesn't
support session ID renewal. Fixes #780.
Posted by vk...@apache.org.
IGNITE-2344 - WebSessionFilter doesn't support session ID renewal. Fixes #780.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5c149287
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5c149287
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5c149287
Branch: refs/heads/master
Commit: 5c1492872358cac78a6954796ea1e3420bf7c26e
Parents: e9546aa
Author: samaitra <sa...@gmail.com>
Authored: Wed Jun 15 12:58:33 2016 +0300
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Jun 15 13:52:36 2016 +0300
----------------------------------------------------------------------
.../cache/websession/WebSessionFilter.java | 39 ++-
.../internal/websession/WebSessionSelfTest.java | 330 ++++++++++++++++++-
2 files changed, 357 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5c149287/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java
----------------------------------------------------------------------
diff --git a/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java b/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java
index 2b7442f..6c51876 100644
--- a/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java
+++ b/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java
@@ -31,9 +31,8 @@ import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletRequestWrapper;
-import javax.servlet.http.HttpSession;
+import javax.servlet.http.*;
+
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteClientDisconnectedException;
@@ -963,6 +962,22 @@ public class WebSessionFilter implements Filter {
return newId;
}
+
+ /** {@inheritDoc} */
+ @Override public void login(String username, String password) throws ServletException{
+ HttpServletRequest req = (HttpServletRequest)getRequest();
+
+ req.login(username, password);
+
+ String newId = req.getSession(false).getId();
+
+ this.ses.setId(newId);
+
+ this.ses = createSession(ses, newId);
+ this.ses.servletContext(ctx);
+ this.ses.filter(WebSessionFilter.this);
+ this.ses.resetUpdates();
+ }
}
/**
@@ -1026,5 +1041,23 @@ public class WebSessionFilter implements Filter {
return newId;
}
+
+ /** {@inheritDoc} */
+ @Override public void login(String username, String password) throws ServletException{
+ final HttpServletRequest req = (HttpServletRequest)getRequest();
+
+ req.login(username, password);
+
+ final String newId = req.getSession(false).getId();
+
+ if (!F.eq(newId, ses.getId())) {
+ try {
+ ses = createSessionV2(ses, newId);
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5c149287/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java b/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java
index 1e01d3c..0ab1130 100644
--- a/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java
+++ b/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java
@@ -17,16 +17,11 @@
package org.apache.ignite.internal.websession;
-import java.io.BufferedReader;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Serializable;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
+import java.io.*;
+import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLConnection;
-import java.util.Random;
+import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -51,6 +46,7 @@ import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.eclipse.jetty.security.HashLoginService;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.webapp.WebAppContext;
@@ -92,6 +88,13 @@ public class WebSessionSelfTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testSessionRenewalDuringLogin() throws Exception {
+ testSessionRenewalDuringLogin("/modules/core/src/test/config/websession/example-cache.xml");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testSingleRequestMetaInf() throws Exception {
testSingleRequest("ignite-webapp-config.xml");
}
@@ -293,6 +296,171 @@ public class WebSessionSelfTest extends GridCommonAbstractTest {
}
/**
+ * Tests session renewal during login. Checks modification attribute in cache.
+ *
+ * @param cfg Configuration.
+ * @throws Exception If failed.
+ */
+ private void testSessionRenewalDuringLogin(String cfg) throws Exception {
+ Server srv = null;
+ String sesId = null;
+ try {
+ srv = startServerWithLoginService(TEST_JETTY_PORT, cfg, null, new SessionLoginServlet());
+
+ URLConnection conn = new URL("http://localhost:" + TEST_JETTY_PORT + "/ignitetest/test").openConnection();
+
+ conn.connect();
+
+ String sesIdCookie1 = getSessionIdFromCookie(conn);
+
+ X.println(">>>", "Initial session Cookie: " + sesIdCookie1, ">>>");
+
+ try (BufferedReader rdr = new BufferedReader(new InputStreamReader(conn.getInputStream()))) {
+ sesId = rdr.readLine();
+
+ if (!keepBinary()) {
+ IgniteCache<String, HttpSession> cache = G.ignite().cache(getCacheName());
+
+ assertNotNull(cache);
+
+ HttpSession ses = cache.get(sesId);
+
+ assertNotNull(ses);
+
+ assertEquals("val1", ses.getAttribute("key1"));
+ }
+ else {
+ final IgniteCache<String, WebSessionEntity> cache = G.ignite().cache(getCacheName());
+
+ assertNotNull(cache);
+
+ final WebSessionEntity entity = cache.get(sesId);
+
+ assertNotNull(entity);
+
+ final byte[] data = entity.attributes().get("key1");
+
+ assertNotNull(data);
+
+ final Marshaller marshaller = G.ignite().configuration().getMarshaller();
+
+ final String val = marshaller.unmarshal(data, getClass().getClassLoader());
+
+ assertEquals("val1", val);
+ }
+ }
+
+ URLConnection conn2 = new URL("http://localhost:" + TEST_JETTY_PORT + "/ignitetest/login").openConnection();
+
+ HttpURLConnection con = (HttpURLConnection) conn2;
+
+ con.addRequestProperty("Cookie", "JSESSIONID=" + sesIdCookie1);
+
+ con.setRequestMethod("POST");
+
+ con.setDoOutput(true);
+
+ String sesIdCookie2 = getSessionIdFromCookie(con);
+
+ X.println(">>>", "Logged In session Cookie: " + sesIdCookie2, ">>>");
+
+ try (BufferedReader rdr = new BufferedReader(new InputStreamReader(con.getInputStream()))) {
+ String sesId2 = rdr.readLine();
+
+ if (!keepBinary()) {
+ IgniteCache<String, HttpSession> cache = G.ignite().cache(getCacheName());
+
+ assertNotNull(cache);
+
+ HttpSession ses = cache.get(sesId2);
+
+ assertNotNull(ses);
+
+ assertEquals("val1", ses.getAttribute("key1"));
+
+ }
+ else {
+ final IgniteCache<String, WebSessionEntity> cache = G.ignite().cache(getCacheName());
+
+ assertNotNull(cache);
+
+ final WebSessionEntity entity = cache.get(sesId2);
+
+ assertNotNull(entity);
+
+ final byte[] data = entity.attributes().get("key1");
+
+ assertNotNull(data);
+
+ final Marshaller marshaller = G.ignite().configuration().getMarshaller();
+
+ final String val = marshaller.unmarshal(data, getClass().getClassLoader());
+
+ assertEquals("val1", val);
+
+ }
+
+ }
+
+ URLConnection conn3 = new URL("http://localhost:" + TEST_JETTY_PORT + "/ignitetest/simple").openConnection();
+
+ conn3.addRequestProperty("Cookie", "JSESSIONID=" + sesIdCookie2);
+
+ conn3.connect();
+
+ String sesIdCookie3 = getSessionIdFromCookie(conn3);
+
+ X.println(">>>", "Post Logged In session Cookie: " + sesIdCookie3, ">>>");
+
+ assertEquals(sesIdCookie2, sesIdCookie3);
+
+ try (BufferedReader rdr = new BufferedReader(new InputStreamReader(conn3.getInputStream()))) {
+ String sesId3 = rdr.readLine();
+
+ if (!keepBinary()) {
+ IgniteCache<String, HttpSession> cache = G.ignite().cache(getCacheName());
+
+ HttpSession session = cache.get(sesId3);
+
+ assertNotNull(session);
+
+ assertNotNull(cache);
+
+ HttpSession ses = cache.get(sesId3);
+
+ assertNotNull(ses);
+
+ assertEquals("val1", ses.getAttribute("key1"));
+ }
+ else {
+ final IgniteCache<String, WebSessionEntity> cache = G.ignite().cache(getCacheName());
+
+ assertNotNull(cache);
+
+ final WebSessionEntity entity = cache.get(sesId3);
+
+ assertNotNull(entity);
+
+ assertNotNull(cache.get(sesId3));
+
+ final byte[] data = entity.attributes().get("key1");
+
+ assertNotNull(data);
+
+ final Marshaller marshaller = G.ignite().configuration().getMarshaller();
+
+ final String val = marshaller.unmarshal(data, getClass().getClassLoader());
+
+ assertEquals("val1", val);
+ }
+ }
+ }
+ finally {
+ stopServerWithLoginService(srv);
+ }
+ }
+
+ /**
* Tests invalidated sessions.
*
* @throws Exception Exception If failed.
@@ -668,6 +836,35 @@ public class WebSessionSelfTest extends GridCommonAbstractTest {
}
/**
+ * Starts server with Login Service and create a realm file.
+ *
+ * @param port Port number.
+ * @param cfg Configuration.
+ * @param gridName Grid name.
+ * @param servlet Servlet.
+ * @return Server.
+ * @throws Exception In case of error.
+ */
+ private Server startServerWithLoginService(int port, @Nullable String cfg, @Nullable String gridName, HttpServlet servlet)
+ throws Exception {
+ Server srv = new Server(port);
+
+ WebAppContext ctx = getWebContext(cfg, gridName, keepBinary(), servlet);
+
+ HashLoginService hashLoginService = new HashLoginService();
+ hashLoginService.setName("Test Realm");
+ createRealm();
+ hashLoginService.setConfig("realm.properties");
+ ctx.getSecurityHandler().setLoginService(hashLoginService);
+
+ srv.setHandler(ctx);
+
+ srv.start();
+
+ return srv;
+ }
+
+ /**
* Stops server.
*
* @param srv Server.
@@ -679,6 +876,60 @@ public class WebSessionSelfTest extends GridCommonAbstractTest {
}
/**
+ * Stops server and delete realm file.
+ *
+ * @param srv Server.
+ * @throws Exception In case of error.
+ */
+ private void stopServerWithLoginService(@Nullable Server srv) throws Exception{
+ if (srv != null){
+ srv.stop();
+ File realmFile = new File("realm.properties");
+ realmFile.delete();
+ }
+ }
+
+ /** Creates a realm file to store test user credentials */
+ private void createRealm() throws Exception{
+ File realmFile = new File("realm.properties");
+ FileWriter fileWriter = new FileWriter(realmFile);
+ fileWriter.append("admin:admin");
+ fileWriter.flush();
+ fileWriter.close();
+ }
+
+ /**
+ * Retrieves HttpSession sessionId from Cookie
+ *
+ * @param conn URLConnection
+ * @return sesId
+ */
+ private String getSessionIdFromCookie(URLConnection conn) {
+ String sessionCookieValue = null;
+ String sesId = null;
+ Map<String, List<String>> headerFields = conn.getHeaderFields();
+ Set<String> headerFieldsSet = headerFields.keySet();
+ Iterator<String> hearerFieldsIter = headerFieldsSet.iterator();
+
+ while (hearerFieldsIter.hasNext()) {
+ String headerFieldKey = hearerFieldsIter.next();
+
+ if ("Set-Cookie".equalsIgnoreCase(headerFieldKey)) {
+ List<String> headerFieldValue = headerFields.get(headerFieldKey);
+
+ for (String headerValue : headerFieldValue) {
+ String[] fields = headerValue.split(";");
+ sessionCookieValue = fields[0];
+ sesId = sessionCookieValue.substring(sessionCookieValue.indexOf("=")+1,
+ sessionCookieValue.length());
+ }
+ }
+ }
+
+ return sesId;
+ }
+
+ /**
* Test servlet.
*/
private static class SessionCreateServlet extends HttpServlet {
@@ -831,6 +1082,67 @@ public class WebSessionSelfTest extends GridCommonAbstractTest {
}
/**
+ * Test session behavior on id change.
+ */
+ private static class SessionLoginServlet extends HttpServlet {
+ /** {@inheritDoc} */
+ @Override protected void doGet(HttpServletRequest req, HttpServletResponse res)
+ throws ServletException, IOException {
+
+ if (req.getPathInfo().equals("/test")) {
+ HttpSession ses = req.getSession(true);
+ assertNotNull(ses);
+ ses.setAttribute("checkCnt", 0);
+ ses.setAttribute("key1", "val1");
+ ses.setAttribute("key2", "val2");
+ ses.setAttribute("mkey", new TestObj());
+
+ Profile p = (Profile) ses.getAttribute("profile");
+
+ if (p == null) {
+ p = new Profile();
+ ses.setAttribute("profile", p);
+ }
+
+ p.setMarker(req.getParameter("marker"));
+
+ X.println(">>>", "Request session test: " + ses.getId(), ">>>");
+
+ res.getWriter().write(ses.getId());
+
+ res.getWriter().flush();
+
+ } else if (req.getPathInfo().equals("/simple")) {
+ HttpSession session = req.getSession();
+ X.println(">>>", "Request session simple: " + session.getId(), ">>>");
+
+ res.getWriter().write(session.getId());
+
+ res.getWriter().flush();
+ }
+ }
+ /** {@inheritDoc} */
+ @Override protected void doPost(HttpServletRequest req, HttpServletResponse res)
+ throws ServletException, IOException {
+ if (req.getPathInfo().equals("/login")) {
+ try {
+ req.login("admin", "admin");
+ } catch (Exception e) {
+ X.printerrln("Login failed.");
+ }
+
+ HttpSession session = req.getSession();
+
+ X.println(">>>", "Logged In session: " + session.getId(), ">>>");
+
+ res.getWriter().write(session.getId());
+
+ res.getWriter().flush();
+ }
+ }
+ }
+
+ /**
* Servlet for restarts test.
*/
private static class RestartsTestServlet extends HttpServlet {
@@ -932,4 +1244,4 @@ public class WebSessionSelfTest extends GridCommonAbstractTest {
return result;
}
}
-}
\ No newline at end of file
+}
[09/16] ignite git commit: IGNITE-3272 Fixed "Memory consumption in
ContinuousQueryHandler". (cherry picked from commit 5f44672)
Posted by vk...@apache.org.
IGNITE-3272 Fixed "Memory consumption in ContinuousQueryHandler". (cherry picked from commit 5f44672)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e9a33f23
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e9a33f23
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e9a33f23
Branch: refs/heads/master
Commit: e9a33f2376733ec3c507963c6ff98e3ad0514281
Parents: fda4110
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Tue Jun 14 18:17:33 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue Jun 14 18:19:18 2016 +0300
----------------------------------------------------------------------
.../continuous/CacheContinuousQueryEntry.java | 16 +++
.../continuous/CacheContinuousQueryHandler.java | 2 +-
...niteCacheContinuousQueryBackupQueueTest.java | 135 +++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite3.java | 2 +
4 files changed, 154 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e9a33f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index 63dc4cb..74f930a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -200,6 +200,22 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
}
/**
+ * @return If entry filtered then will return light-weight <i><b>new entry</b></i> without values and key
+ * (avoid to huge memory consumption), otherwise {@code this}.
+ */
+ CacheContinuousQueryEntry forBackupQueue() {
+ if (!isFiltered())
+ return this;
+
+ CacheContinuousQueryEntry e =
+ new CacheContinuousQueryEntry(cacheId, evtType, null, null, null, keepBinary, part, updateCntr, topVer);
+
+ e.flags = flags;
+
+ return e;
+ }
+
+ /**
* @return {@code True} if entry sent by backup node.
*/
boolean isBackup() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e9a33f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 3b77d48..5012569 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -740,7 +740,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
if (!primary && !internal && entry.updateCounter() != -1L /* Skip init query and expire entries */) {
entry.markBackup();
- backupQueue.add(entry);
+ backupQueue.add(entry.forBackupQueue());
}
return notify;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e9a33f23/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
new file mode 100644
index 0000000..aea1954
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheContinuousQueryBackupQueueTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** Keys count. */
+ private static final int KEYS_COUNT = 1024;
+
+ /** Grid count. */
+ private static final int GRID_COUNT = 2;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setAtomicityMode(ATOMIC);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setBackups(1);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return TimeUnit.MINUTES.toMillis(2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testBackupQueue() throws Exception {
+ startGridsMultiThreaded(GRID_COUNT);
+
+ final CacheEventListener lsnr = new CacheEventListener();
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(lsnr);
+ qry.setRemoteFilterFactory(new FilterFactory());
+
+ try (QueryCursor<?> ignore = grid(0).cache(null).query(qry)) {
+ for (int i = 0; i < KEYS_COUNT; i++) {
+ log.info("Put key: " + i);
+
+ for (int j = 0; j < 100; j++)
+ grid(i % GRID_COUNT).cache(null).put(i, new byte[1024 * 50]);
+ }
+
+ log.info("Finish.");
+ }
+ }
+
+ /**
+ *
+ */
+ private static class FilterFactory implements Factory<CacheEntryEventFilter<Object, Object>> {
+ /** {@inheritDoc} */
+ @Override public CacheEntryEventFilter<Object, Object> create() {
+ return new CacheEventFilter();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CacheEventFilter implements CacheEntryEventFilter<Object, Object>, Serializable {
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
+ return false;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> {
+ /** {@inheritDoc} */
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ fail();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e9a33f23/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
index ce02823..94b9dce 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxOffheapTieredTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxOffheapValuesTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryBackupQueueTest;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTxReconnectTest;
@@ -113,6 +114,7 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite {
suite.addTestSuite(CacheKeepBinaryIterationStoreEnabledTest.class);
suite.addTestSuite(CacheKeepBinaryIterationSwapEnabledTest.class);
suite.addTestSuite(CacheKeepBinaryIterationNearEnabledTest.class);
+ suite.addTestSuite(IgniteCacheContinuousQueryBackupQueueTest.class);
return suite;
}
[07/16] ignite git commit: ignite-gg-11181 - scanCount with offheap
index fix
Posted by vk...@apache.org.
ignite-gg-11181 - scanCount with offheap index fix
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6094ee0d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6094ee0d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6094ee0d
Branch: refs/heads/master
Commit: 6094ee0d73f26e913dd5ff4c64d72a75e450479b
Parents: 4f8ba17
Author: Sergi Vladykin <se...@gmail.com>
Authored: Mon Jun 6 00:14:05 2016 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Tue Jun 14 13:25:04 2016 +0300
----------------------------------------------------------------------
.../unsafe/GridOffheapSnapTreeSelfTest.java | 2 +-
.../query/h2/opt/GridH2AbstractKeyValueRow.java | 23 +--
.../query/h2/opt/GridH2KeyValueRowOffheap.java | 17 +-
.../cache/IgniteCacheOffheapIndexScanTest.java | 195 +++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite.java | 2 +
5 files changed, 226 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6094ee0d/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/unsafe/GridOffheapSnapTreeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/unsafe/GridOffheapSnapTreeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/unsafe/GridOffheapSnapTreeSelfTest.java
index 463b6dc..92d9ec2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/unsafe/GridOffheapSnapTreeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/unsafe/GridOffheapSnapTreeSelfTest.java
@@ -313,7 +313,7 @@ public class GridOffheapSnapTreeSelfTest extends GridCommonAbstractTest {
}
@Override public int hashCode() {
- return ptr;
+ throw new IllegalStateException();
}
@Override public String toString() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/6094ee0d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
index ca5442a..fe6851d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
@@ -351,41 +351,37 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
/** {@inheritDoc} */
@Override public void setKeyAndVersion(SearchRow old) {
- assert false;
+ throw new IllegalStateException();
}
/** {@inheritDoc} */
@Override public void setKey(long key) {
- assert false;
+ throw new IllegalStateException();
}
/** {@inheritDoc} */
@Override public Row getCopy() {
- assert false;
-
- return null;
+ throw new IllegalStateException();
}
/** {@inheritDoc} */
@Override public void setDeleted(boolean deleted) {
- assert false;
+ throw new IllegalStateException();
}
/** {@inheritDoc} */
@Override public long getKey() {
- assert false;
-
- return 0;
+ throw new IllegalStateException();
}
/** {@inheritDoc} */
@Override public void setSessionId(int sesId) {
- assert false;
+ throw new IllegalStateException();
}
/** {@inheritDoc} */
@Override public void setVersion(int ver) {
- assert false;
+ throw new IllegalStateException();
}
/**
@@ -469,4 +465,9 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
throw new IllegalStateException();
}
}
+
+ /** {@inheritDoc} */
+ @Override public final int hashCode() {
+ throw new IllegalStateException();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6094ee0d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
index 2dd9f25..ee68431 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
@@ -170,7 +170,7 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow {
}
}
else
- assert false : col;
+ throw new IllegalStateException("Column: " + col);
Data data = Data.create(null, bytes);
@@ -378,4 +378,19 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow {
@Override protected void addOffheapRowId(SB sb) {
sb.a('-').a(ptr);
}
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+
+ if (obj instanceof GridH2KeyValueRowOffheap) {
+ GridH2KeyValueRowOffheap row = (GridH2KeyValueRowOffheap)obj;
+
+ if (pointer() == row.pointer())
+ return true;
+ }
+
+ return false;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6094ee0d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapIndexScanTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapIndexScanTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapIndexScanTest.java
new file mode 100644
index 0000000..dbc8a65
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapIndexScanTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+
+/**
+ * Based scanCount with offheap index issue.
+ */
+public class IgniteCacheOffheapIndexScanTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static IgniteCache<Integer, Object> cache;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(disco);
+
+ CacheConfiguration<?,?> cacheCfg = new CacheConfiguration<>();
+
+ cacheCfg.setCacheMode(LOCAL);
+ cacheCfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
+ cacheCfg.setSqlOnheapRowCacheSize(256);
+ cacheCfg.setIndexedTypes(
+ Integer.class, Person.class
+ );
+
+ cfg.setCacheConfiguration(cacheCfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGridsMultiThreaded(1, false);
+
+ cache = grid(0).cache(null);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueryPlan() throws Exception {
+ for (int i = 0 ; i < 1000; i++)
+ cache.put(i, new Person(i, "firstName" + i, "lastName" + i, i % 100));
+
+ final AtomicBoolean end = new AtomicBoolean();
+
+ IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ while(!end.get())
+ cache.query(new SqlFieldsQuery("select _val from Person")).getAll();
+
+ return null;
+ }
+ }, 5);
+
+ for (int i = 0; i < 150; i++) {
+ String plan = (String)cache.query(new SqlFieldsQuery(
+ "explain analyze select count(*) from Person where salary = 50")).getAll().get(0).get(0);
+
+ assertTrue(plan, plan.contains("scanCount: 11 "));
+
+ Thread.sleep(100);
+ }
+
+ end.set(true);
+
+ fut.get();
+ }
+
+ /**
+ * Person record used for query test.
+ */
+ public static class Person implements Serializable {
+ /** Person ID. */
+ @QuerySqlField(index = true)
+ private int id;
+
+ /** Organization ID. */
+ @QuerySqlField(index = true)
+ private int orgId;
+
+ /** First name (not-indexed). */
+ @QuerySqlField
+ private String firstName;
+
+ /** Last name (not indexed). */
+ @QuerySqlField
+ private String lastName;
+
+ /** Salary. */
+ @QuerySqlField(index = true)
+ private double salary;
+
+ /**
+ * Constructs empty person.
+ */
+ public Person() {
+ // No-op.
+ }
+
+ /**
+ * Constructs person record that is not linked to any organization.
+ *
+ * @param id Person ID.
+ * @param firstName First name.
+ * @param lastName Last name.
+ * @param salary Salary.
+ */
+ public Person(int id, String firstName, String lastName, double salary) {
+ this(id, 0, firstName, lastName, salary);
+ }
+
+ /**
+ * Constructs person record.
+ *
+ * @param id Person ID.
+ * @param orgId Organization ID.
+ * @param firstName First name.
+ * @param lastName Last name.
+ * @param salary Salary.
+ */
+ public Person(int id, int orgId, String firstName, String lastName, double salary) {
+ this.id = id;
+ this.orgId = orgId;
+ this.firstName = firstName;
+ this.lastName = lastName;
+ this.salary = salary;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ return this == o || (o instanceof Person) && id == ((Person)o).id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "Person [firstName=" + firstName +
+ ", id=" + id +
+ ", orgId=" + orgId +
+ ", lastName=" + lastName +
+ ", salary=" + salary +
+ ']';
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6094ee0d/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index ebad581..7b39453 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheCollocatedQuerySel
import org.apache.ignite.internal.processors.cache.IgniteCacheDuplicateEntityConfigurationSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheLargeResultSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapEvictQueryTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapIndexScanTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapTieredMultithreadedSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCachePartitionedQueryMultiThreadedSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheQueryEvictsMultiThreadedSelfTest;
@@ -101,6 +102,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheOffheapEvictQueryTest.class);
suite.addTestSuite(IgniteCacheSqlQueryMultiThreadedSelfTest.class);
suite.addTestSuite(IgniteCacheOffheapTieredMultithreadedSelfTest.class);
+ suite.addTestSuite(IgniteCacheOffheapIndexScanTest.class);
suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class);
suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class);
suite.addTestSuite(IgniteCacheQueryNodeFailTest.class);
[02/16] ignite git commit: ignite-114 Load value from store for cache
'invoke' (cherry picked from commit e10ffef)
Posted by vk...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheReadThroughStoreCallTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheReadThroughStoreCallTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheReadThroughStoreCallTest.java
new file mode 100644
index 0000000..bb092d4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheReadThroughStoreCallTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionIsolation.values;
+
+/**
+ *
+ */
+public class IgniteCacheReadThroughStoreCallTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final Map<Object, Object> storeMap = new ConcurrentHashMap8<>();
+
+ /** */
+ protected boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ storeMap.clear();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultiNode() throws Exception {
+ startGridsMultiThreaded(4);
+
+ client = true;
+
+ startGrid(4);
+
+ checkLoadCount(cacheConfiguration(PARTITIONED, ATOMIC, 0));
+
+ checkLoadCount(cacheConfiguration(PARTITIONED, ATOMIC, 1));
+
+ checkLoadCount(cacheConfiguration(PARTITIONED, ATOMIC, 2));
+
+ checkLoadCount(cacheConfiguration(PARTITIONED, TRANSACTIONAL, 0));
+
+ checkLoadCount(cacheConfiguration(PARTITIONED, TRANSACTIONAL, 1));
+
+ checkLoadCount(cacheConfiguration(PARTITIONED, TRANSACTIONAL, 2));
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ * @throws Exception If failed.
+ */
+ private void checkLoadCount(CacheConfiguration<Object, Object> ccfg) throws Exception {
+ storeMap.clear();
+
+ Ignite ignite0 = ignite(0);
+
+ ignite0.createCache(ccfg);
+
+ try {
+ int key = 0;
+
+ for (Ignite node : G.allGrids()) {
+ log.info("Test for node: " + node.name());
+
+ final IgniteCache<Object, Object> cache = node.cache(ccfg.getName());
+
+ for (int i = 0; i < 50; i++) {
+ final int k = key++;
+
+ checkReadThrough(cache, new IgniteRunnable() {
+ @Override public void run() {
+ cache.invoke(k, new TestEntryProcessor());
+ }
+ }, null, null, 1);
+ }
+
+ for (int i = 0; i < 50; i++) {
+ final int k = key++;
+
+ checkReadThrough(cache, new IgniteRunnable() {
+ @Override public void run() {
+ cache.put(k, k);
+ }
+ }, null, null, 0);
+ }
+
+ if (ccfg.getAtomicityMode() == TRANSACTIONAL) {
+ for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+ for (TransactionIsolation isolation : values()) {
+ log.info("Test tx [concurrency=" + concurrency + ", isolation=" + isolation + ']');
+
+ for (int i = 0; i < 50; i++) {
+ final int k = key++;
+
+ checkReadThrough(cache, new IgniteRunnable() {
+ @Override public void run() {
+ cache.invoke(k, new TestEntryProcessor());
+ }
+ }, concurrency, isolation, 2);
+ }
+ }
+ }
+ }
+ }
+
+ ignite0.cache(ccfg.getName()).removeAll();
+ }
+ finally {
+ ignite0.destroyCache(ccfg.getName());
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ * @param c Cache operation Closure.
+ * @param concurrency Transaction concurrency.
+ * @param isolation Transaction isolation.
+ * @param expLoadCnt Expected number of store 'load' calls.
+ * @throws Exception If failed.
+ */
+ private void checkReadThrough(IgniteCache<Object, Object> cache,
+ IgniteRunnable c,
+ @Nullable TransactionConcurrency concurrency,
+ @Nullable TransactionIsolation isolation,
+ int expLoadCnt) throws Exception {
+ TestStore.loadCnt.set(0);
+
+ Transaction tx = isolation != null ? cache.unwrap(Ignite.class).transactions().txStart(concurrency, isolation)
+ : null;
+
+ try {
+ c.run();
+
+ if (tx != null)
+ tx.commit();
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+
+ assertEquals(expLoadCnt, TestStore.loadCnt.get());
+ }
+
+ /**
+ * @param cacheMode Cache mode.
+ * @param atomicityMode Atomicity mode.
+ * @param backups Number of backups.
+ * @return Cache configuration.
+ */
+ @SuppressWarnings("unchecked")
+ protected CacheConfiguration<Object, Object> cacheConfiguration(CacheMode cacheMode,
+ CacheAtomicityMode atomicityMode,
+ int backups) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setReadThrough(true);
+ ccfg.setWriteThrough(true);
+ ccfg.setCacheStoreFactory(new TestStoreFactory());
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setCacheMode(cacheMode);
+ ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
+ ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+ if (cacheMode == PARTITIONED)
+ ccfg.setBackups(backups);
+
+ return ccfg;
+ }
+
+ /**
+ *
+ */
+ public static class TestStoreFactory implements Factory<CacheStore> {
+ /** {@inheritDoc} */
+ @Override public CacheStore create() {
+ return new TestStore();
+ }
+ }
+
+ /**
+ *
+ */
+ public static class TestStore extends CacheStoreAdapter<Object, Object> {
+ /** */
+ static AtomicInteger loadCnt = new AtomicInteger();
+
+ /** {@inheritDoc} */
+ @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args) {
+ fail();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object load(Object key) {
+ loadCnt.incrementAndGet();
+
+ return storeMap.get(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<?, ?> entry) {
+ storeMap.put(entry.getKey(), entry.getValue());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) {
+ storeMap.remove(key);
+ }
+ }
+
+ /**
+ *
+ */
+ static class TestEntryProcessor implements EntryProcessor<Object, Object, Object> {
+ /** {@inheritDoc} */
+ @Override public Object process(MutableEntry<Object, Object> entry, Object... args) {
+ Object val = entry.getValue();
+
+ entry.setValue(entry.getKey());
+
+ return val;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java
index 28a954b..dd6b268 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java
@@ -147,6 +147,16 @@ public abstract class IgniteCacheLoaderWriterAbstractTest extends IgniteCacheAbs
assertFalse(storeMap.containsKey(key));
assertNull(cache.get(key));
+
+ ldrCallCnt.set(0);
+
+ cache.invoke(key, new EntryProcessor<Object, Object, Object>() {
+ @Override public Object process(MutableEntry<Object, Object> e, Object... args) {
+ return null;
+ }
+ });
+
+ checkCalls(1, 0);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 094558d..018fa17 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -65,8 +65,10 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheConfigurationDefau
import org.apache.ignite.internal.processors.cache.IgniteCacheConfigurationTemplateTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheDynamicStopSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheGetCustomCollectionsSelfTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheInvokeReadThroughSingleNodeTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheInvokeReadThroughTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheLoadRebalanceEvictionSelfTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheReadThroughStoreCallTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheTxCopyOnReadDisabledTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheTxLocalPeekModesTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheTxLocalStoreValueTest;
@@ -206,7 +208,9 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(IgniteCacheTxLocalPeekModesTest.class);
suite.addTestSuite(IgniteCacheTxReplicatedPeekModesTest.class);
+ suite.addTestSuite(IgniteCacheInvokeReadThroughSingleNodeTest.class);
suite.addTestSuite(IgniteCacheInvokeReadThroughTest.class);
+ suite.addTestSuite(IgniteCacheReadThroughStoreCallTest.class);
suite.addTestSuite(GridCacheVersionMultinodeTest.class);
suite.addTestSuite(IgniteCacheNearReadCommittedTest.class);
[04/16] ignite git commit: ignite-3038 Do not use custom discovery
events to start continuous queries for system caches (cherry picked from
commit 7f878c5)
Posted by vk...@apache.org.
ignite-3038 Do not use custom discovery events to start continuous queries for system caches
(cherry picked from commit 7f878c5)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e5fcebb0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e5fcebb0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e5fcebb0
Branch: refs/heads/master
Commit: e5fcebb095fc189487d1d4f267fbf2164aef7329
Parents: e90c9c8
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 14 10:17:50 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 14 11:15:13 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/MarshallerContextImpl.java | 51 +++-
.../processors/cache/GridCacheAdapter.java | 12 +-
.../binary/CacheObjectBinaryProcessorImpl.java | 29 +-
.../continuous/CacheContinuousQueryManager.java | 135 ++++++++
.../cacheobject/IgniteCacheObjectProcessor.java | 7 +
.../IgniteCacheObjectProcessorImpl.java | 5 +
.../continuous/GridContinuousProcessor.java | 216 +++++++++----
.../service/GridServiceProcessor.java | 305 ++++++++++---------
...eClientReconnectContinuousProcessorTest.java | 60 +++-
...ridCacheContinuousQueryAbstractSelfTest.java | 2 +-
.../IgniteNoCustomEventsOnNodeStart.java | 86 ++++++
.../service/GridServiceClientNodeTest.java | 102 ++++++-
.../testsuites/IgniteCacheTestSuite2.java | 3 +
13 files changed, 775 insertions(+), 238 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index 8f566a9..b4c9607 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException;
import org.apache.ignite.internal.util.GridStripedLock;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.PluginProvider;
@@ -64,6 +65,9 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
/** Non-volatile on purpose. */
private int failedCnt;
+ /** */
+ private ContinuousQueryListener lsnr;
+
/**
* @param plugins Plugins.
* @throws IgniteCheckedException In case of error.
@@ -75,25 +79,58 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
}
/**
+ * @param ctx Context.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException {
+ if (ctx.clientNode()) {
+ lsnr = new ContinuousQueryListener(ctx.log(MarshallerContextImpl.class), workDir);
+
+ ctx.continuous().registerStaticRoutine(
+ CU.MARSH_CACHE_NAME,
+ lsnr,
+ null,
+ null);
+ }
+ }
+
+ /**
* @param ctx Kernal context.
* @throws IgniteCheckedException In case of error.
*/
public void onMarshallerCacheStarted(GridKernalContext ctx) throws IgniteCheckedException {
assert ctx != null;
- if (!ctx.isDaemon()) {
+ log = ctx.log(MarshallerContextImpl.class);
+
+ cache = ctx.cache().marshallerCache();
+
+ if (ctx.cache().marshallerCache().context().affinityNode()) {
ctx.cache().marshallerCache().context().continuousQueries().executeInternalQuery(
- new ContinuousQueryListener(ctx.log(MarshallerContextImpl.class), workDir),
+ new ContinuousQueryListener(log, workDir),
null,
- ctx.cache().marshallerCache().context().affinityNode(),
+ true,
true,
false
);
}
-
- log = ctx.log(MarshallerContextImpl.class);
-
- cache = ctx.cache().marshallerCache();
+ else {
+ if (lsnr != null) {
+ ctx.closure().runLocalSafe(new Runnable() {
+ @SuppressWarnings("unchecked")
+ @Override public void run() {
+ try {
+ Iterable entries = cache.context().continuousQueries().existingEntries(false, null);
+
+ lsnr.onUpdated(entries);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to load marshaller cache entries: " + e, e);
+ }
+ }
+ });
+ }
+ }
latch.countDown();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 2212926..3dbd0f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4014,8 +4014,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/**
* @return Distributed ignite cache iterator.
+ * @throws IgniteCheckedException If failed.
*/
public Iterator<Cache.Entry<K, V>> igniteIterator() throws IgniteCheckedException {
+ return igniteIterator(ctx.keepBinary());
+ }
+
+ /**
+ * @param keepBinary
+ * @return Distributed ignite cache iterator.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Iterator<Cache.Entry<K, V>> igniteIterator(boolean keepBinary) throws IgniteCheckedException {
GridCacheContext ctx0 = ctx.isNear() ? ctx.near().dht().context() : ctx;
final CacheOperationContext opCtx = ctx.operationContextPerCall();
@@ -4023,7 +4033,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (!ctx0.isSwapOrOffheapEnabled() && ctx0.kernalContext().discovery().size() == 1)
return localIteratorHonorExpirePolicy(opCtx);
- final GridCloseableIterator<Map.Entry<K, V>> iter = ctx0.queries().createScanQuery(null, null, ctx.keepBinary())
+ final GridCloseableIterator<Map.Entry<K, V>> iter = ctx0.queries().createScanQuery(null, null, keepBinary)
.keepAll(false)
.executeScanQuery();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 18751c1..139a8a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -150,9 +150,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
/** Metadata updates collected before metadata cache is initialized. */
private final Map<Integer, BinaryMetadata> metaBuf = new ConcurrentHashMap<>();
- /** */
- private UUID metaCacheQryId;
-
/**
* @param ctx Kernal context.
*/
@@ -256,6 +253,17 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
}
/** {@inheritDoc} */
+ @Override public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException {
+ if (clientNode && !ctx.isDaemon()) {
+ ctx.continuous().registerStaticRoutine(
+ CU.UTILITY_CACHE_NAME,
+ new MetaDataEntryListener(),
+ new MetaDataEntryFilter(),
+ null);
+ }
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void onUtilityCacheStarted() throws IgniteCheckedException {
IgniteCacheProxy<Object, Object> proxy = ctx.cache().jcache(CU.UTILITY_CACHE_NAME);
@@ -272,13 +280,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
if (clientNode) {
assert !metaDataCache.context().affinityNode();
- metaCacheQryId = metaDataCache.context().continuousQueries().executeInternalQuery(
- new MetaDataEntryListener(),
- new MetaDataEntryFilter(),
- false,
- true,
- false);
-
while (true) {
ClusterNode oldestSrvNode =
CU.oldestAliveCacheServerNode(ctx.cache().context(), AffinityTopologyVersion.NONE);
@@ -359,14 +360,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
}
}
- /** {@inheritDoc} */
- @Override public void onKernalStop(boolean cancel) {
- super.onKernalStop(cancel);
-
- if (metaCacheQryId != null)
- metaDataCache.context().continuousQueries().cancelInternalQuery(metaCacheQryId);
- }
-
/**
* @param key Metadata key.
* @param newMeta Metadata.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index b042249..c966527 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -31,6 +31,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.Cache;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryCreatedListener;
@@ -46,6 +47,7 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
@@ -57,8 +59,10 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteAsyncCallback;
import org.apache.ignite.lang.IgniteClosure;
@@ -728,6 +732,70 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/**
+ * @param keepBinary Keep binary flag.
+ * @param filter Filter.
+ * @return Iterable for events created for existing cache entries.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Iterable<CacheEntryEvent<?, ?>> existingEntries(final boolean keepBinary, final CacheEntryEventFilter filter)
+ throws IgniteCheckedException {
+ final Iterator<Cache.Entry<?, ?>> it = cctx.cache().igniteIterator(keepBinary);
+
+ final Cache cache = cctx.kernalContext().cache().jcache(cctx.name());
+
+ return new Iterable<CacheEntryEvent<?, ?>>() {
+ @Override public Iterator<CacheEntryEvent<?, ?>> iterator() {
+ return new Iterator<CacheEntryEvent<?, ?>>() {
+ private CacheQueryEntryEvent<?, ?> next;
+
+ {
+ advance();
+ }
+
+ @Override public boolean hasNext() {
+ return next != null;
+ }
+
+ @Override public CacheEntryEvent<?, ?> next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ CacheEntryEvent next0 = next;
+
+ advance();
+
+ return next0;
+ }
+
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ private void advance() {
+ next = null;
+
+ while (next == null) {
+ if (!it.hasNext())
+ break;
+
+ Cache.Entry e = it.next();
+
+ next = new CacheEntryEventImpl(
+ cache,
+ CREATED,
+ e.getKey(),
+ e.getValue());
+
+ if (filter != null && !filter.evaluate(next))
+ next = null;
+ }
+ }
+ };
+ }
+ };
+ }
+
+ /**
* @param nodes Nodes.
* @return {@code True} if all nodes greater than {@link GridContinuousProcessor#QUERY_MSG_VER_2_SINCE},
* otherwise {@code false}.
@@ -1129,4 +1197,71 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
lsnr.acknowledgeBackupOnTimeout(ctx);
}
}
+
+ /**
+ *
+ */
+ private static class CacheEntryEventImpl extends CacheQueryEntryEvent {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ @GridToStringInclude
+ private Object key;
+
+ /** */
+ @GridToStringInclude
+ private Object val;
+
+ /**
+ * @param src Event source.
+ * @param evtType Event type.
+ * @param key Key.
+ * @param val Value.
+ */
+ public CacheEntryEventImpl(Cache src, EventType evtType, Object key, Object val) {
+ super(src, evtType);
+
+ this.key = key;
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getPartitionUpdateCounter() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object getOldValue() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isOldValueAvailable() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object getKey() {
+ return key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object getValue() {
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object unwrap(Class cls) {
+ if (cls.isAssignableFrom(getClass()))
+ return cls.cast(this);
+
+ throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheEntryEventImpl.class, this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
index 686f308..b8ac301 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
@@ -22,6 +22,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridComponent;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessor;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
@@ -34,6 +35,12 @@ import org.jetbrains.annotations.Nullable;
*/
public interface IgniteCacheObjectProcessor extends GridProcessor {
/**
+ * @param ctx Context.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException;
+
+ /**
* @see GridComponent#onKernalStart()
* @throws IgniteCheckedException If failed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index 9b47e59..3203548 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@ -236,6 +236,11 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
}
/** {@inheritDoc} */
+ @Override public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public void onUtilityCacheStarted() throws IgniteCheckedException {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 0db9b27..fce48c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -30,12 +30,15 @@ import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
@@ -82,6 +85,7 @@ import org.jsr166.ConcurrentHashMap8;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
import static org.apache.ignite.internal.GridTopic.TOPIC_CONTINUOUS;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.processors.continuous.GridContinuousMessageType.MSG_EVT_ACK;
@@ -136,6 +140,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/** */
private boolean processorStopped;
+ /** Query sequence number for message topic. */
+ private final AtomicLong seq = new AtomicLong();
+
/**
* @param ctx Kernal context.
*/
@@ -254,13 +261,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
UUID routineId = msg.routineId();
unregisterRemote(routineId);
+ }
- if (snd.isClient()) {
- Map<UUID, LocalRoutineInfo> clientRoutineMap = clientInfos.get(snd.id());
-
- if (clientRoutineMap != null)
- clientRoutineMap.remove(msg.routineId());
- }
+ for (Map<UUID, LocalRoutineInfo> clientInfo : clientInfos.values()) {
+ if (clientInfo.remove(msg.routineId()) != null)
+ break;
}
}
});
@@ -309,6 +314,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
});
+ ctx.marshallerContext().onContinuousProcessorStarted(ctx);
+
+ ctx.cacheObjects().onContinuousProcessorStarted(ctx);
+
+ ctx.service().onContinuousProcessorStarted(ctx);
+
if (log.isDebugEnabled())
log.debug("Continuous processor started.");
}
@@ -392,16 +403,33 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) {
+ if (log.isDebugEnabled()) {
+ log.debug("collectDiscoveryData [node=" + nodeId +
+ ", loc=" + ctx.localNodeId() +
+ ", locInfos=" + locInfos +
+ ", clientInfos=" + clientInfos +
+ ']');
+ }
+
if (!nodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) {
Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos0 = U.newHashMap(clientInfos.size());
for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> e : clientInfos.entrySet()) {
- Map<UUID, LocalRoutineInfo> copy = U.newHashMap(e.getValue().size());
+ Map<UUID, LocalRoutineInfo> cp = U.newHashMap(e.getValue().size());
for (Map.Entry<UUID, LocalRoutineInfo> e0 : e.getValue().entrySet())
- copy.put(e0.getKey(), e0.getValue());
+ cp.put(e0.getKey(), e0.getValue());
+
+ clientInfos0.put(e.getKey(), cp);
+ }
+
+ if (nodeId.equals(ctx.localNodeId()) && ctx.discovery().localNode().isClient()) {
+ Map<UUID, LocalRoutineInfo> infos = new HashMap<>();
- clientInfos0.put(e.getKey(), copy);
+ for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet())
+ infos.put(e.getKey(), e.getValue());
+
+ clientInfos0.put(ctx.localNodeId(), infos);
}
DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos0);
@@ -429,6 +457,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
@Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable obj) {
DiscoveryData data = (DiscoveryData)obj;
+ if (log.isDebugEnabled()) {
+ log.info("onDiscoveryDataReceived [joining=" + joiningNodeId +
+ ", rmtNodeId=" + rmtNodeId +
+ ", loc=" + ctx.localNodeId() +
+ ", data=" + data +
+ ']');
+ }
+
if (!ctx.isDaemon() && data != null) {
for (DiscoveryDataItem item : data.items) {
try {
@@ -456,29 +492,31 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) {
UUID clientNodeId = entry.getKey();
- Map<UUID, LocalRoutineInfo> clientRoutineMap = entry.getValue();
+ if (!ctx.localNodeId().equals(clientNodeId)) {
+ Map<UUID, LocalRoutineInfo> clientRoutineMap = entry.getValue();
- for (Map.Entry<UUID, LocalRoutineInfo> e : clientRoutineMap.entrySet()) {
- UUID routineId = e.getKey();
- LocalRoutineInfo info = e.getValue();
+ for (Map.Entry<UUID, LocalRoutineInfo> e : clientRoutineMap.entrySet()) {
+ UUID routineId = e.getKey();
+ LocalRoutineInfo info = e.getValue();
- try {
- if (info.prjPred != null)
- ctx.resource().injectGeneric(info.prjPred);
-
- if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) {
- if (registerHandler(clientNodeId,
- routineId,
- info.hnd,
- info.bufSize,
- info.interval,
- info.autoUnsubscribe,
- false))
- info.hnd.onListenerRegistered(routineId, ctx);
+ try {
+ if (info.prjPred != null)
+ ctx.resource().injectGeneric(info.prjPred);
+
+ if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) {
+ if (registerHandler(clientNodeId,
+ routineId,
+ info.hnd,
+ info.bufSize,
+ info.interval,
+ info.autoUnsubscribe,
+ false))
+ info.hnd.onListenerRegistered(routineId, ctx);
+ }
+ }
+ catch (IgniteCheckedException err) {
+ U.error(log, "Failed to register continuous handler.", err);
}
- }
- catch (IgniteCheckedException err) {
- U.error(log, "Failed to register continuous handler.", err);
}
}
@@ -536,6 +574,47 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
/**
+ * Registers routine info to be sent in discovery data during this node join
+ * (to be used for internal queries started from client nodes).
+ *
+ * @param cacheName Cache name.
+ * @param locLsnr Local listener.
+ * @param rmtFilter Remote filter.
+ * @param prjPred Projection predicate.
+ * @return Routine ID.
+ * @throws IgniteCheckedException If failed.
+ */
+ public UUID registerStaticRoutine(
+ String cacheName,
+ CacheEntryUpdatedListener<?, ?> locLsnr,
+ CacheEntryEventSerializableFilter rmtFilter,
+ @Nullable IgnitePredicate<ClusterNode> prjPred) throws IgniteCheckedException {
+ String topicPrefix = "CONTINUOUS_QUERY_STATIC" + "_" + cacheName;
+
+ CacheContinuousQueryHandler hnd = new CacheContinuousQueryHandler(
+ cacheName,
+ TOPIC_CACHE.topic(topicPrefix, ctx.localNodeId(), seq.incrementAndGet()),
+ locLsnr,
+ rmtFilter,
+ true,
+ false,
+ true,
+ false);
+
+ hnd.internal(true);
+
+ final UUID routineId = UUID.randomUUID();
+
+ LocalRoutineInfo routineInfo = new LocalRoutineInfo(prjPred, hnd, 1, 0, true);
+
+ locInfos.put(routineId, routineInfo);
+
+ registerMessageListener(hnd);
+
+ return routineId;
+ }
+
+ /**
* @param hnd Handler.
* @param bufSize Buffer size.
* @param interval Time interval.
@@ -609,29 +688,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
// Register per-routine notifications listener if ordered messaging is used.
- if (hnd.orderedTopic() != null) {
- ctx.io().addMessageListener(hnd.orderedTopic(), new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object obj) {
- GridContinuousMessage msg = (GridContinuousMessage)obj;
-
- // Only notification can be ordered.
- assert msg.type() == MSG_EVT_NOTIFICATION;
-
- if (msg.data() == null && msg.dataBytes() != null) {
- try {
- msg.data(marsh.unmarshal(msg.dataBytes(), U.resolveClassLoader(ctx.config())));
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to process message (ignoring): " + msg, e);
-
- return;
- }
- }
-
- processNotification(nodeId, msg);
- }
- });
- }
+ registerMessageListener(hnd);
StartFuture fut = new StartFuture(ctx, routineId);
@@ -663,6 +720,35 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
/**
+ * @param hnd Handler.
+ */
+ private void registerMessageListener(GridContinuousHandler hnd) {
+ if (hnd.orderedTopic() != null) {
+ ctx.io().addMessageListener(hnd.orderedTopic(), new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object obj) {
+ GridContinuousMessage msg = (GridContinuousMessage)obj;
+
+ // Only notification can be ordered.
+ assert msg.type() == MSG_EVT_NOTIFICATION;
+
+ if (msg.data() == null && msg.dataBytes() != null) {
+ try {
+ msg.data(marsh.unmarshal(msg.dataBytes(), U.resolveClassLoader(ctx.config())));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to process message (ignoring): " + msg, e);
+
+ return;
+ }
+ }
+
+ processNotification(nodeId, msg);
+ }
+ });
+ }
+ }
+
+ /**
* @param routineId Consume ID.
* @return Future.
*/
@@ -809,12 +895,28 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
@Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
cancelFutures(new IgniteClientDisconnectedCheckedException(reconnectFut, "Client node disconnected."));
- for (UUID rmtId : rmtInfos.keySet())
- unregisterRemote(rmtId);
+ if (log.isDebugEnabled()) {
+ log.debug("onDisconnected [rmtInfos=" + rmtInfos +
+ ", locInfos=" + locInfos +
+ ", clientInfos=" + clientInfos + ']');
+ }
+
+ for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) {
+ RemoteRoutineInfo info = e.getValue();
+
+ if (!ctx.localNodeId().equals(info.nodeId) || info.autoUnsubscribe)
+ unregisterRemote(e.getKey());
+ }
rmtInfos.clear();
clientInfos.clear();
+
+ if (log.isDebugEnabled()) {
+ log.debug("after onDisconnected [rmtInfos=" + rmtInfos +
+ ", locInfos=" + locInfos +
+ ", clientInfos=" + clientInfos + ']');
+ }
}
/**
@@ -1040,6 +1142,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
if (doRegister) {
+ if (log.isDebugEnabled())
+ log.debug("Register handler: [nodeId=" + nodeId + ", routineId=" + routineId + ", info=" + info + ']');
+
if (interval > 0) {
IgniteThread checker = new IgniteThread(new GridWorker(ctx.gridName(), "continuous-buffer-checker", log) {
@SuppressWarnings("ConstantConditions")
@@ -1154,6 +1259,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
stopLock.unlock();
}
+ if (log.isDebugEnabled())
+ log.debug("unregisterRemote [routineId=" + routineId + ", loc=" + loc + ", rmt=" + remote + ']');
+
if (remote != null)
unregisterHandler(routineId, remote.hnd, false);
else if (loc != null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index b507e5f..0bc1141 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -35,7 +35,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.Cache;
import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -156,12 +155,6 @@ public class GridServiceProcessor extends GridProcessorAdapter {
/** Topology listener. */
private GridLocalEventListener topLsnr = new TopologyListener();
- /** Deployment listener ID. */
- private UUID cfgQryId;
-
- /** Assignment listener ID. */
- private UUID assignQryId;
-
/**
* @param ctx Kernal context.
*/
@@ -178,6 +171,22 @@ public class GridServiceProcessor extends GridProcessorAdapter {
new ServicesCompatibilityState(srvcCompatibilitySysProp != null ? srvcCompatibilitySysProp : false, false));
}
+ /**
+ * @param ctx Context.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException {
+ if (ctx.clientNode()) {
+ assert !ctx.isDaemon();
+
+ ctx.continuous().registerStaticRoutine(
+ CU.UTILITY_CACHE_NAME,
+ new ServiceEntriesListener(),
+ null,
+ null);
+ }
+ }
+
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
ctx.addNodeAttribute(ATTR_SERVICES_COMPATIBILITY_MODE, srvcCompatibilitySysProp);
@@ -209,13 +218,32 @@ public class GridServiceProcessor extends GridProcessorAdapter {
if (ctx.deploy().enabled())
ctx.cache().context().deploy().ignoreOwnership(true);
- boolean affNode = cache.context().affinityNode();
+ if (!ctx.clientNode()) {
+ assert cache.context().affinityNode();
+
+ cache.context().continuousQueries().executeInternalQuery(new ServiceEntriesListener(),
+ null,
+ true,
+ true,
+ false);
+ }
+ else {
+ assert !ctx.isDaemon();
- cfgQryId = cache.context().continuousQueries().executeInternalQuery(
- new DeploymentListener(), null, affNode, true, !affNode);
+ ctx.closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ try {
+ Iterable<CacheEntryEvent<?, ?>> entries =
+ cache.context().continuousQueries().existingEntries(false, null);
- assignQryId = cache.context().continuousQueries().executeInternalQuery(
- new AssignmentListener(), null, affNode, true, !affNode);
+ onSystemCacheUpdated(entries);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to load service entries: " + e, e);
+ }
+ }
+ });
+ }
}
finally {
if (ctx.deploy().enabled())
@@ -249,12 +277,6 @@ public class GridServiceProcessor extends GridProcessorAdapter {
if (!ctx.clientNode())
ctx.event().removeLocalEventListener(topLsnr);
- if (cfgQryId != null)
- cache.context().continuousQueries().cancelInternalQuery(cfgQryId);
-
- if (assignQryId != null)
- cache.context().continuousQueries().cancelInternalQuery(assignQryId);
-
Collection<ServiceContextImpl> ctxs = new ArrayList<>();
synchronized (locSvcs) {
@@ -1305,92 +1327,116 @@ public class GridServiceProcessor extends GridProcessorAdapter {
/**
* Service deployment listener.
*/
- private class DeploymentListener implements CacheEntryUpdatedListener<Object, Object> {
+ @SuppressWarnings("unchecked")
+ private class ServiceEntriesListener implements CacheEntryUpdatedListener<Object, Object> {
/** {@inheritDoc} */
@Override public void onUpdated(final Iterable<CacheEntryEvent<?, ?>> deps) {
depExe.submit(new BusyRunnable() {
@Override public void run0() {
- boolean firstTime = true;
+ onSystemCacheUpdated(deps);
+ }
+ });
+ }
+ }
- for (CacheEntryEvent<?, ?> e : deps) {
- if (!(e.getKey() instanceof GridServiceDeploymentKey))
- continue;
+ /**
+ * @param evts Update events.
+ */
+ private void onSystemCacheUpdated(final Iterable<CacheEntryEvent<?, ?>> evts) {
+ boolean firstTime = true;
- if (firstTime) {
- markCompatibilityStateAsUsed();
+ for (CacheEntryEvent<?, ?> e : evts) {
+ if (e.getKey() instanceof GridServiceDeploymentKey) {
+ if (firstTime) {
+ markCompatibilityStateAsUsed();
- firstTime = false;
- }
+ firstTime = false;
+ }
- GridServiceDeployment dep;
+ processDeployment((CacheEntryEvent)e);
+ }
+ else if (e.getKey() instanceof GridServiceAssignmentsKey) {
+ if (firstTime) {
+ markCompatibilityStateAsUsed();
- try {
- dep = (GridServiceDeployment)e.getValue();
- }
- catch (IgniteException ex) {
- if (X.hasCause(ex, ClassNotFoundException.class))
- continue;
- else
- throw ex;
- }
+ firstTime = false;
+ }
- if (dep != null) {
- svcName.set(dep.configuration().getName());
+ processAssignment((CacheEntryEvent)e);
+ }
+ }
+ }
- // Ignore other utility cache events.
- AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
+ /**
+ * @param e Entry.
+ */
+ private void processDeployment(CacheEntryEvent<GridServiceDeploymentKey, GridServiceDeployment> e) {
+ GridServiceDeployment dep;
- ClusterNode oldest = U.oldest(ctx.discovery().nodes(topVer), null);
+ try {
+ dep = e.getValue();
+ }
+ catch (IgniteException ex) {
+ if (X.hasCause(ex, ClassNotFoundException.class))
+ return;
+ else
+ throw ex;
+ }
- if (oldest.isLocal())
- onDeployment(dep, topVer);
- }
- // Handle undeployment.
- else {
- String name = ((GridServiceDeploymentKey)e.getKey()).name();
+ if (dep != null) {
+ svcName.set(dep.configuration().getName());
- svcName.set(name);
+ // Ignore other utility cache events.
+ AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
- Collection<ServiceContextImpl> ctxs;
+ ClusterNode oldest = U.oldest(ctx.discovery().nodes(topVer), null);
- synchronized (locSvcs) {
- ctxs = locSvcs.remove(name);
- }
+ if (oldest.isLocal())
+ onDeployment(dep, topVer);
+ }
+ // Handle undeployment.
+ else {
+ String name = e.getKey().name();
- if (ctxs != null) {
- synchronized (ctxs) {
- cancel(ctxs, ctxs.size());
- }
- }
+ svcName.set(name);
- // Finish deployment futures if undeployment happened.
- GridFutureAdapter<?> fut = depFuts.remove(name);
+ Collection<ServiceContextImpl> ctxs;
- if (fut != null)
- fut.onDone();
+ synchronized (locSvcs) {
+ ctxs = locSvcs.remove(name);
+ }
- // Complete undeployment future.
- fut = undepFuts.remove(name);
+ if (ctxs != null) {
+ synchronized (ctxs) {
+ cancel(ctxs, ctxs.size());
+ }
+ }
- if (fut != null)
- fut.onDone();
+ // Finish deployment futures if undeployment happened.
+ GridFutureAdapter<?> fut = depFuts.remove(name);
- GridServiceAssignmentsKey key = new GridServiceAssignmentsKey(name);
+ if (fut != null)
+ fut.onDone();
- // Remove assignment on primary node in case of undeploy.
- if (cache.cache().affinity().isPrimary(ctx.discovery().localNode(), key)) {
- try {
- cache.getAndRemove(key);
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to remove assignments for undeployed service: " + name, ex);
- }
- }
- }
- }
+ // Complete undeployment future.
+ fut = undepFuts.remove(name);
+
+ if (fut != null)
+ fut.onDone();
+
+ GridServiceAssignmentsKey key = new GridServiceAssignmentsKey(name);
+
+ // Remove assignment on primary node in case of undeploy.
+ if (cache.cache().affinity().isPrimary(ctx.discovery().localNode(), key)) {
+ try {
+ cache.getAndRemove(key);
}
- });
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to remove assignments for undeployed service: " + name, ex);
+ }
+ }
}
+ }
/**
* Deployment callback.
@@ -1446,7 +1492,6 @@ public class GridServiceProcessor extends GridProcessorAdapter {
}
}
});
- }
}
}
@@ -1619,79 +1664,59 @@ public class GridServiceProcessor extends GridProcessorAdapter {
}
/**
- * Assignment listener.
+ * @param e Entry.
*/
- private class AssignmentListener implements CacheEntryUpdatedListener<Object, Object> {
- /** {@inheritDoc} */
- @Override public void onUpdated(final Iterable<CacheEntryEvent<?, ?>> assignCol) throws CacheEntryListenerException {
- depExe.submit(new BusyRunnable() {
- @Override public void run0() {
- boolean firstTime = true;
-
- for (CacheEntryEvent<?, ?> e : assignCol) {
- if (!(e.getKey() instanceof GridServiceAssignmentsKey))
- continue;
-
- if (firstTime) {
- markCompatibilityStateAsUsed();
-
- firstTime = false;
- }
+ private void processAssignment(CacheEntryEvent<GridServiceAssignmentsKey, GridServiceAssignments> e) {
+ GridServiceAssignments assigns;
- GridServiceAssignments assigns;
-
- try {
- assigns = (GridServiceAssignments)e.getValue();
- }
- catch (IgniteException ex) {
- if (X.hasCause(ex, ClassNotFoundException.class))
- continue;
- else
- throw ex;
- }
+ try {
+ assigns = e.getValue();
+ }
+ catch (IgniteException ex) {
+ if (X.hasCause(ex, ClassNotFoundException.class))
+ return;
+ else
+ throw ex;
+ }
- if (assigns != null) {
- svcName.set(assigns.name());
+ if (assigns != null) {
+ svcName.set(assigns.name());
- Throwable t = null;
+ Throwable t = null;
- try {
- redeploy(assigns);
- }
- catch (Error | RuntimeException th) {
- t = th;
- }
+ try {
+ redeploy(assigns);
+ }
+ catch (Error | RuntimeException th) {
+ t = th;
+ }
- GridServiceDeploymentFuture fut = depFuts.get(assigns.name());
+ GridServiceDeploymentFuture fut = depFuts.get(assigns.name());
- if (fut != null && fut.configuration().equalsIgnoreNodeFilter(assigns.configuration())) {
- depFuts.remove(assigns.name(), fut);
+ if (fut != null && fut.configuration().equalsIgnoreNodeFilter(assigns.configuration())) {
+ depFuts.remove(assigns.name(), fut);
- // Complete deployment futures once the assignments have been stored in cache.
- fut.onDone(null, t);
- }
- }
- // Handle undeployment.
- else {
- String name = ((GridServiceAssignmentsKey)e.getKey()).name();
+ // Complete deployment futures once the assignments have been stored in cache.
+ fut.onDone(null, t);
+ }
+ }
+ // Handle undeployment.
+ else {
+ String name = e.getKey().name();
- svcName.set(name);
+ svcName.set(name);
- Collection<ServiceContextImpl> ctxs;
+ Collection<ServiceContextImpl> ctxs;
- synchronized (locSvcs) {
- ctxs = locSvcs.remove(name);
- }
+ synchronized (locSvcs) {
+ ctxs = locSvcs.remove(name);
+ }
- if (ctxs != null) {
- synchronized (ctxs) {
- cancel(ctxs, ctxs.size());
- }
- }
- }
- }
+ if (ctxs != null) {
+ synchronized (ctxs) {
+ cancel(ctxs, ctxs.size());
}
- });
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
index 4c44adc..0ff5883 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
@@ -125,6 +125,7 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
}
/**
+ * @param stopFromClient If {@code true} stops listener from client node, otherwise from server.
* @throws Exception If failed.
*/
private void testMessageListenerReconnect(boolean stopFromClient) throws Exception {
@@ -178,9 +179,11 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
assertTrue(latch.await(5000, MILLISECONDS));
- log.info("Stop listen, should not get remote messages anymore.");
+ Ignite stopFrom = (stopFromClient ? client : srv);
- (stopFromClient ? client : srv).message().stopRemoteListen(opId);
+ log.info("Stop listen, should not get remote messages anymore [from=" + stopFrom.name() + ']');
+
+ stopFrom.message().stopRemoteListen(opId);
srv.message().send(topic, "msg3");
@@ -243,6 +246,59 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
}
/**
+ * @throws Exception If failed.
+ */
+ public void testCacheContinuousQueryReconnectNewServer() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>());
+
+ CacheEventListener lsnr = new CacheEventListener();
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setAutoUnsubscribe(true);
+
+ qry.setLocalListener(lsnr);
+
+ QueryCursor<?> cur = clientCache.query(qry);
+
+ continuousQueryReconnect(client, clientCache, lsnr);
+
+ // Check new server registers listener for reconnected client.
+ try (Ignite newSrv = startGrid(serverCount() + 1)) {
+ awaitPartitionMapExchange();
+
+ lsnr.latch = new CountDownLatch(10);
+
+ IgniteCache<Object, Object> newSrvCache = newSrv.cache(null);
+
+ for (Integer key : primaryKeys(newSrvCache, 10))
+ newSrvCache.put(key, key);
+
+ assertTrue(lsnr.latch.await(5000, MILLISECONDS));
+ }
+
+ cur.close();
+
+ // Check new server does not register listener for closed query.
+ try (Ignite newSrv = startGrid(serverCount() + 1)) {
+ awaitPartitionMapExchange();
+
+ lsnr.latch = new CountDownLatch(5);
+
+ IgniteCache<Object, Object> newSrvCache = newSrv.cache(null);
+
+ for (Integer key : primaryKeys(newSrvCache, 5))
+ newSrvCache.put(key, key);
+
+ assertFalse(lsnr.latch.await(3000, MILLISECONDS));
+ }
+ }
+
+ /**
* @param client Client.
* @param clientCache Client cache.
* @param lsnr Continuous query listener.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index dbe282e..f372e0d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -208,7 +208,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
for (int i = 0; i < gridCount(); i++) {
GridContinuousProcessor proc = grid(i).context().continuous();
- assertEquals(String.valueOf(i), 3, ((Map)U.field(proc, "locInfos")).size());
+ assertEquals(String.valueOf(i), 2, ((Map)U.field(proc, "locInfos")).size());
assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "rmtInfos")).size());
assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "startFuts")).size());
assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "stopFuts")).size());
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
new file mode 100644
index 0000000..cbeb23f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import com.mchange.v2.c3p0.util.TestUtils;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Sanity test to verify there are no unnecessary messages on node start.
+ */
+public class IgniteNoCustomEventsOnNodeStart extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private boolean client;
+
+ /** */
+ private static volatile boolean failed;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TestTcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi();
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNoCustomEventsOnStart() throws Exception {
+ failed = false;
+
+ for (int i = 0; i < 5; i++) {
+ client = i % 2 == 1;
+
+ startGrid(i);
+ }
+
+ assertFalse(failed);
+ }
+
+ /**
+ *
+ */
+ static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+ /** {@inheritDoc} */
+ @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) {
+ if (GridTestUtils.getFieldValue(msg, "delegate") instanceof CacheAffinityChangeMessage)
+ return;
+
+ failed = true;
+
+ fail("Should not be called: " + msg);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
index c3b9cf4..665294d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
@@ -21,6 +21,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -34,47 +35,118 @@ public class GridServiceClientNodeTest extends GridCommonAbstractTest {
protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
/** */
- private static final int NODE_CNT = 3;
+ private boolean client;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- ((TcpDiscoverySpi) cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setMaxMissedClientHeartbeats(30);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setHeartbeatFrequency(1000);
- if (gridName.equals(getTestGridName(NODE_CNT - 1)))
- cfg.setClientMode(true);
+ cfg.setClientMode(client);
return cfg;
}
/** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
- startGrids(NODE_CNT);
+ super.afterTest();
}
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- stopAllGrids();
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDeployFromClient() throws Exception {
+ startGrids(3);
+
+ client = true;
+
+ Ignite ignite = startGrid(3);
+
+ checkDeploy(ignite, "service1");
}
/**
* @throws Exception If failed.
*/
- public void testDeployFromClient() throws Exception {
- Ignite ignite = ignite(NODE_CNT - 1);
+ public void testDeployFromClientAfterRouterStop1() throws Exception {
+ startGrid(0);
- assertTrue(ignite.configuration().isClientMode());
+ client = true;
- String svcName = "testService";
+ Ignite ignite = startGrid(1);
+
+ client = false;
+
+ startGrid(2);
+
+ U.sleep(1000);
+
+ stopGrid(0);
+
+ awaitPartitionMapExchange();
+
+ checkDeploy(ignite, "service1");
+
+ startGrid(3);
+
+ for (int i = 0; i < 10; i++)
+ checkDeploy(ignite, "service2-" + i);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDeployFromClientAfterRouterStop2() throws Exception {
+ startGrid(0);
+
+ client = true;
+
+ Ignite ignite = startGrid(1);
+
+ client = false;
+
+ startGrid(2);
+
+ client = true;
+
+ startGrid(3);
+
+ client = false;
+
+ startGrid(4);
+
+ U.sleep(1000);
+
+ stopGrid(0);
+
+ awaitPartitionMapExchange();
+
+ checkDeploy(ignite, "service1");
+
+ startGrid(5);
+
+ for (int i = 0; i < 10; i++)
+ checkDeploy(ignite, "service2-" + i);
+ }
+
+ /**
+ * @param client Client node.
+ * @param svcName Service name.
+ * @throws Exception If failed.
+ */
+ private void checkDeploy(Ignite client, String svcName) throws Exception {
+ assertTrue(client.configuration().isClientMode());
CountDownLatch latch = new CountDownLatch(1);
DummyService.exeLatch(svcName, latch);
- ignite.services().deployClusterSingleton(svcName, new DummyService());
+ client.services().deployClusterSingleton(svcName, new DummyService());
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 652643d..dc412a9 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -128,6 +128,7 @@ import org.apache.ignite.internal.processors.cache.local.GridCacheLocalMultithre
import org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxMultiThreadedSelfTest;
import org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxSingleThreadedSelfTest;
import org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxTimeoutSelfTest;
+import org.apache.ignite.internal.processors.continuous.IgniteNoCustomEventsOnNodeStart;
/**
* Test suite.
@@ -259,6 +260,8 @@ public class IgniteCacheTestSuite2 extends TestSuite {
suite.addTest(new TestSuite(CacheEnumOperationsTest.class));
suite.addTest(new TestSuite(IgniteCacheIncrementTxTest.class));
+ suite.addTest(new TestSuite(IgniteNoCustomEventsOnNodeStart.class));
+
return suite;
}
}