You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/12 14:09:07 UTC
[01/22] ignite git commit: IGNITE-2575: Added validation of IGFS
endpoint port value. This closes #469.
Repository: ignite
Updated Branches:
refs/heads/ignite-2407 570de20d4 -> 6c2910439
IGNITE-2575: Added validation of IGFS endpoint port value. This closes #469.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b7475f09
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b7475f09
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b7475f09
Branch: refs/heads/ignite-2407
Commit: b7475f09b1727e5cc93681ee229b60ad8e188732
Parents: a4d8a04
Author: dkarachentsev <dk...@gridgain.com>
Authored: Wed Feb 10 12:38:43 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 10 12:38:43 2016 +0300
----------------------------------------------------------------------
.../internal/processors/igfs/IgfsProcessor.java | 14 ++++++++++
.../igfs/IgfsProcessorValidationSelfTest.java | 27 ++++++++++++++++++++
2 files changed, 41 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7475f09/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
index 21446e1..1b60252 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
@@ -67,6 +67,12 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
/** Null IGFS name. */
private static final String NULL_NAME = UUID.randomUUID().toString();
+ /** Min available TCP port. */
+ private static final int MIN_TCP_PORT = 1;
+
+ /** Max available TCP port. */
+ private static final int MAX_TCP_PORT = 0xFFFF;
+
/** Converts context to IGFS. */
private static final IgniteClosure<IgfsContext,IgniteFileSystem> CTX_TO_IGFS = new C1<IgfsContext, IgniteFileSystem>() {
@Override public IgniteFileSystem apply(IgfsContext igfsCtx) {
@@ -307,6 +313,14 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
throw new IgniteCheckedException("Invalid IGFS data cache configuration (key affinity mapper class should be " +
IgfsGroupDataBlocksKeyMapper.class.getSimpleName() + "): " + cfg);
+ if (cfg.getIpcEndpointConfiguration() != null) {
+ final int tcpPort = cfg.getIpcEndpointConfiguration().getPort();
+
+ if (!(tcpPort >= MIN_TCP_PORT && tcpPort <= MAX_TCP_PORT))
+ throw new IgniteCheckedException("IGFS endpoint TCP port is out of range [" + MIN_TCP_PORT +
+ ".." + MAX_TCP_PORT + "]: " + tcpPort);
+ }
+
long maxSpaceSize = cfg.getMaxSpaceSize();
if (maxSpaceSize > 0) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7475f09/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java
index 11a80af..27f47e8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java
@@ -23,6 +23,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
+import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
@@ -442,6 +443,32 @@ public class IgfsProcessorValidationSelfTest extends IgfsCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testInvalidEndpointTcpPort() throws Exception {
+ final String failMsg = "IGFS endpoint TCP port is out of range";
+ g1Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class));
+
+ final String igfsCfgName = "igfs-cfg";
+ final IgfsIpcEndpointConfiguration igfsEndpointCfg = new IgfsIpcEndpointConfiguration();
+ igfsEndpointCfg.setPort(0);
+ g1IgfsCfg1.setName(igfsCfgName);
+ g1IgfsCfg1.setIpcEndpointConfiguration(igfsEndpointCfg);
+
+ checkGridStartFails(g1Cfg, failMsg, true);
+
+ igfsEndpointCfg.setPort(-1);
+ g1IgfsCfg1.setIpcEndpointConfiguration(igfsEndpointCfg);
+
+ checkGridStartFails(g1Cfg, failMsg, true);
+
+ igfsEndpointCfg.setPort(65536);
+ g1IgfsCfg1.setIpcEndpointConfiguration(igfsEndpointCfg);
+
+ checkGridStartFails(g1Cfg, failMsg, true);
+ }
+
+ /**
* Checks that the given grid configuration will lead to {@link IgniteCheckedException} upon grid startup.
*
* @param cfg Grid configuration to check.
[04/22] ignite git commit: ignite-2587 Fixed continuous query
notifications in offheap mode and BinaryObjectOffheapImpl usage.
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapValuesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapValuesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapValuesTest.java
new file mode 100644
index 0000000..2994af6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapValuesTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
+
+/**
+ *
+ */
+public class GridCacheContinuousQueryTxOffheapValuesTest extends GridCacheContinuousQueryTxSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return OFFHEAP_VALUES;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 949290e..4fcc1ed 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -941,7 +941,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
* @param cacheName Cache name.
* @return Near cache for key.
*/
- protected IgniteCache<Integer, Integer> primaryCache(Integer key, String cacheName) {
+ protected <K, V> IgniteCache<K, V> primaryCache(Object key, String cacheName) {
return primaryNode(key, cacheName).cache(cacheName);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 90125b1..5af37a6 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -86,10 +86,14 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheAtomicPrimaryWrite
import org.apache.ignite.internal.processors.cache.IgniteCacheAtomicPrimaryWriteOrderWithStoreInvokeTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheAtomicStopBusySelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerAtomicLocalTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerAtomicOffheapTieredTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerAtomicOffheapValuesTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerAtomicReplicatedTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerAtomicTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerEagerTtlDisabledTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerTxLocalTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerTxOffheapTieredTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerTxOffheapValuesTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerTxReplicatedTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerTxTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheEntryProcessorCallTest;
@@ -156,9 +160,13 @@ public class IgniteCacheTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheEntryListenerAtomicReplicatedTest.class);
suite.addTestSuite(IgniteCacheEntryListenerAtomicLocalTest.class);
suite.addTestSuite(IgniteCacheEntryListenerTxTest.class);
+ suite.addTestSuite(IgniteCacheEntryListenerTxOffheapTieredTest.class);
+ suite.addTestSuite(IgniteCacheEntryListenerTxOffheapValuesTest.class);
suite.addTestSuite(IgniteCacheEntryListenerTxReplicatedTest.class);
suite.addTestSuite(IgniteCacheEntryListenerTxLocalTest.class);
suite.addTestSuite(IgniteCacheEntryListenerEagerTtlDisabledTest.class);
+ suite.addTestSuite(IgniteCacheEntryListenerAtomicOffheapTieredTest.class);
+ suite.addTestSuite(IgniteCacheEntryListenerAtomicOffheapValuesTest.class);
suite.addTestSuite(IgniteClientAffinityAssignmentSelfTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 359cdf3..3cd4579 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -70,11 +70,16 @@ import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQ
import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest;
import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest;
import org.apache.ignite.internal.processors.cache.query.GridCacheSwapScanQuerySelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxOffheapTieredTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxReplicatedSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapTieredTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapValuesTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicP2PDisabledSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryLocalAtomicSelfTest;
@@ -89,6 +94,8 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedTxOneNodeTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedP2PDisabledSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxOffheapTieredTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxOffheapValuesTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
@@ -181,9 +188,13 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(GridCacheContinuousQueryPartitionedOnlySelfTest.class);
suite.addTestSuite(GridCacheContinuousQueryPartitionedP2PDisabledSelfTest.class);
suite.addTestSuite(GridCacheContinuousQueryTxSelfTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryTxOffheapTieredTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryTxOffheapValuesTest.class);
suite.addTestSuite(GridCacheContinuousQueryAtomicSelfTest.class);
suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class);
suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryAtomicOffheapTieredTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryAtomicOffheapValuesTest.class);
suite.addTestSuite(GridCacheContinuousQueryReplicatedTxOneNodeTest.class);
suite.addTestSuite(GridCacheContinuousQueryReplicatedAtomicOneNodeTest.class);
suite.addTestSuite(GridCacheContinuousQueryPartitionTxOneNodeTest.class);
@@ -195,6 +206,9 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(CacheContinuousQueryFailoverAtomicReplicatedSelfTest.class);
suite.addTestSuite(CacheContinuousQueryFailoverTxSelfTest.class);
suite.addTestSuite(CacheContinuousQueryFailoverTxReplicatedSelfTest.class);
+ suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.class);
+ suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class);
+ suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class);
// Reduce fields queries.
suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);
[20/22] ignite git commit: IGNITE-2603
Posted by sb...@apache.org.
IGNITE-2603
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a32dfc41
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a32dfc41
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a32dfc41
Branch: refs/heads/ignite-2407
Commit: a32dfc41ea9301f8b98c6a666e4b72c65c892659
Parents: 725d6cb
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Feb 12 14:30:08 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Feb 12 14:30:08 2016 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheProcessor.java | 8 +-
.../GridCacheReplicatedPreloadSelfTest.java | 121 ++++++++++++++-----
.../p2p/CacheDeploymentAffinityKeyMapper.java | 35 ++++++
.../CacheDeploymentAlwaysTruePredicate2.java | 30 +++++
...oymentCacheEntryEventSerializableFilter.java | 32 +++++
.../p2p/CacheDeploymentCacheEntryListener.java | 31 +++++
...CacheDeploymentCachePluginConfiguration.java | 74 ++++++++++++
...heDeploymentStoreSessionListenerFactory.java | 83 +++++++++++++
8 files changed, 383 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a32dfc41/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 5acad6c..7a36e73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -3425,8 +3425,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
try {
if (val.getCacheStoreFactory() != null) {
try {
- marshaller.unmarshal(marshaller.marshal(val.getCacheStoreFactory()),
- val.getCacheStoreFactory().getClass().getClassLoader());
+ ClassLoader ldr = ctx.config().getClassLoader();
+
+ if (ldr == null)
+ ldr = val.getCacheStoreFactory().getClass().getClassLoader();
+
+ marshaller.unmarshal(marshaller.marshal(val.getCacheStoreFactory()), ldr);
}
catch (IgniteCheckedException e) {
throw new IgniteCheckedException("Failed to validate cache configuration. " +
http://git-wip-us.apache.org/repos/asf/ignite/blob/a32dfc41/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
index 887fea4..1fae875 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
@@ -26,13 +26,18 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.Factory;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryListener;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cache.affinity.AffinityKeyMapper;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -43,6 +48,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.CachePluginConfiguration;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -71,7 +78,7 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
private int poolSize = 2;
/** */
- private volatile boolean needStore = false;
+ private volatile boolean extClassloadingAtCfg = false;
/** */
private volatile boolean isClient = false;
@@ -136,21 +143,47 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
cacheCfg.setRebalanceBatchSize(batchSize);
cacheCfg.setRebalanceThreadPoolSize(poolSize);
- if (needStore) {
- Object sf = null;
+ if (extClassloadingAtCfg) {
+ loadExternalClassesToCfg(cacheCfg);
+ }
- try {
- sf = getExternalClassLoader().
- loadClass("org.apache.ignite.tests.p2p.CacheDeploymentTestStoreFactory").newInstance();
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
+ return cacheCfg;
+ }
+
+ /**
+ *
+ * @param cacheCfg Configuration.
+ */
+ private void loadExternalClassesToCfg(CacheConfiguration cacheCfg) {
+ try {
+ Object sf = getExternalClassLoader().
+ loadClass("org.apache.ignite.tests.p2p.CacheDeploymentTestStoreFactory").newInstance();
cacheCfg.setCacheStoreFactory((Factory)sf);
- }
- return cacheCfg;
+ Object sslf = getExternalClassLoader().
+ loadClass("org.apache.ignite.tests.p2p.CacheDeploymentStoreSessionListenerFactory").newInstance();
+
+ cacheCfg.setCacheStoreSessionListenerFactories((Factory)sslf);
+
+ Object cpc = getExternalClassLoader().
+ loadClass("org.apache.ignite.tests.p2p.CacheDeploymentCachePluginConfiguration").newInstance();
+
+ cacheCfg.setPluginConfigurations((CachePluginConfiguration)cpc);
+
+ Object akm = getExternalClassLoader().
+ loadClass("org.apache.ignite.tests.p2p.CacheDeploymentAffinityKeyMapper").newInstance();
+
+ cacheCfg.setAffinityMapper((AffinityKeyMapper)akm);
+
+ Object pred = getExternalClassLoader().
+ loadClass("org.apache.ignite.tests.p2p.CacheDeploymentAlwaysTruePredicate2").newInstance();
+
+ cacheCfg.setNodeFilter((IgnitePredicate)pred);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
/**
@@ -299,9 +332,9 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
/**
* @throws Exception If test failed.
*/
- public void testStore() throws Exception {
+ public void testExternalClassesAtConfiguration() throws Exception {
try {
- needStore = true;
+ extClassloadingAtCfg = true;
useExtClassLoader = true;
Ignite g1 = startGrid(1);
@@ -316,13 +349,47 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
IgniteCache<Integer, Object> cache2 = g2.cache(null);
IgniteCache<Integer, Object> cache3 = g3.cache(null);
+ final Class<CacheEntryListener> cls1 = (Class<CacheEntryListener>) getExternalClassLoader().
+ loadClass("org.apache.ignite.tests.p2p.CacheDeploymentCacheEntryListener");
+ final Class<CacheEntryEventSerializableFilter> cls2 = (Class<CacheEntryEventSerializableFilter>) getExternalClassLoader().
+ loadClass("org.apache.ignite.tests.p2p.CacheDeploymentCacheEntryEventSerializableFilter");
+
+ CacheEntryListenerConfiguration<Integer, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+ new Factory<CacheEntryListener<Integer, Object>>() {
+ @Override public CacheEntryListener<Integer, Object> create() {
+ try {
+ return cls1.newInstance();
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ },
+ new Factory<CacheEntryEventSerializableFilter<Integer, Object>>() {
+ /** {@inheritDoc} */
+ @Override public CacheEntryEventSerializableFilter<Integer, Object> create() {
+ try {
+
+ return cls2.newInstance();
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ },
+ true,
+ true
+ );
+
+ cache1.registerCacheEntryListener(lsnrCfg);
+
cache1.put(1, 1);
assertEquals(1, cache2.get(1));
assertEquals(1, cache3.get(1));
}
finally {
- needStore = false;
+ extClassloadingAtCfg = false;
isClient = false;
useExtClassLoader = false;
}
@@ -331,9 +398,9 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
/**
* @throws Exception If test failed.
*/
- public void testStoreDynamicStart() throws Exception {
+ public void testExternalClassesAtConfigurationDynamicStart() throws Exception {
try {
- needStore = false;
+ extClassloadingAtCfg = false;
useExtClassLoader = true;
Ignite g1 = startGrid(1);
@@ -343,12 +410,10 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
Ignite g3 = startGrid(3);
- Object sf = getExternalClassLoader().loadClass(
- "org.apache.ignite.tests.p2p.CacheDeploymentTestStoreFactory").newInstance();
-
CacheConfiguration cfg = defaultCacheConfiguration();
- cfg.setCacheStoreFactory((Factory)sf);
+ loadExternalClassesToCfg(cfg);
+
cfg.setName("customStore");
IgniteCache<Integer, Object> cache1 = g1.createCache(cfg);
@@ -362,7 +427,7 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
assertEquals(1, cache3.get(1));
}
finally {
- needStore = false;
+ extClassloadingAtCfg = false;
isClient = false;
useExtClassLoader = false;
}
@@ -371,9 +436,9 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
/**
* @throws Exception If test failed.
*/
- public void testStoreDynamicStart2() throws Exception {
+ public void testExternalClassesAtConfigurationDynamicStart2() throws Exception {
try {
- needStore = false;
+ extClassloadingAtCfg = false;
useExtClassLoader = true;
Ignite g1 = startGrid(1);
@@ -383,12 +448,10 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
Ignite g3 = startGrid(3);
- Object sf = getExternalClassLoader().loadClass(
- "org.apache.ignite.tests.p2p.CacheDeploymentTestStoreFactory").newInstance();
-
CacheConfiguration cfg = defaultCacheConfiguration();
- cfg.setCacheStoreFactory((Factory)sf);
+ loadExternalClassesToCfg(cfg);
+
cfg.setName("customStore");
IgniteCache<Integer, Object> cache1 = g1.getOrCreateCache(cfg);
@@ -402,7 +465,7 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
assertEquals(1, cache3.get(1));
}
finally {
- needStore = false;
+ extClassloadingAtCfg = false;
isClient = false;
useExtClassLoader = false;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a32dfc41/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentAffinityKeyMapper.java
----------------------------------------------------------------------
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentAffinityKeyMapper.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentAffinityKeyMapper.java
new file mode 100644
index 0000000..fbb74d2
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentAffinityKeyMapper.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import org.apache.ignite.cache.affinity.AffinityKeyMapper;
+
+/**
+ * Test affinity ley mapper for cache deployment tests.
+ */
+public class CacheDeploymentAffinityKeyMapper implements AffinityKeyMapper {
+ /** {@inheritDoc} */
+ @Override public Object affinityKey(Object key) {
+ return key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a32dfc41/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentAlwaysTruePredicate2.java
----------------------------------------------------------------------
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentAlwaysTruePredicate2.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentAlwaysTruePredicate2.java
new file mode 100644
index 0000000..d88c7bf
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentAlwaysTruePredicate2.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import org.apache.ignite.lang.IgnitePredicate;
+
+/**
+ *
+ */
+public class CacheDeploymentAlwaysTruePredicate2 implements IgnitePredicate<Object> {
+ /** */
+ @Override public boolean apply(Object o) {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a32dfc41/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCacheEntryEventSerializableFilter.java
----------------------------------------------------------------------
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCacheEntryEventSerializableFilter.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCacheEntryEventSerializableFilter.java
new file mode 100644
index 0000000..c29c1a4
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCacheEntryEventSerializableFilter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+
+/**
+ *
+ */
+public class CacheDeploymentCacheEntryEventSerializableFilter implements CacheEntryEventSerializableFilter {
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent event) throws CacheEntryListenerException {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a32dfc41/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCacheEntryListener.java
----------------------------------------------------------------------
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCacheEntryListener.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCacheEntryListener.java
new file mode 100644
index 0000000..64c13fb
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCacheEntryListener.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryListenerException;
+
+/**
+ *
+ */
+public class CacheDeploymentCacheEntryListener implements CacheEntryCreatedListener {
+ /** {@inheritDoc} */
+ @Override public void onCreated(Iterable iterable) throws CacheEntryListenerException {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a32dfc41/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCachePluginConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCachePluginConfiguration.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCachePluginConfiguration.java
new file mode 100644
index 0000000..bb37c25
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCachePluginConfiguration.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.plugin.CachePluginConfiguration;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Test cache plugin configuration for cache deployment tests.
+ */
+public class CacheDeploymentCachePluginConfiguration<K, V> implements CachePluginConfiguration<K, V> {
+ /** {@inheritDoc} */
+ @Override public CachePluginProvider createProvider(CachePluginContext ctx) {
+ return new CacheDeploymentCachePluginProvider();
+ }
+
+ private static class CacheDeploymentCachePluginProvider implements CachePluginProvider {
+ /** {@inheritDoc} */
+ @Nullable @Override public Object createComponent(Class cls) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) {
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onIgniteStart() throws IgniteCheckedException {
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onIgniteStop(boolean cancel) {
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void validate() throws IgniteCheckedException {
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void validateRemote(CacheConfiguration locCfg, CachePluginConfiguration locPluginCcfg,
+ CacheConfiguration rmtCfg, ClusterNode rmtNode) throws IgniteCheckedException {
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a32dfc41/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentStoreSessionListenerFactory.java
----------------------------------------------------------------------
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentStoreSessionListenerFactory.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentStoreSessionListenerFactory.java
new file mode 100644
index 0000000..74d9d21
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentStoreSessionListenerFactory.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import javax.cache.configuration.Factory;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.store.CacheStoreSession;
+import org.apache.ignite.cache.store.CacheStoreSessionListener;
+import org.apache.ignite.lifecycle.LifecycleAware;
+
+/**
+ * Test store session listener factory for cache deployment tests.
+ */
+public class CacheDeploymentStoreSessionListenerFactory implements Factory<CacheStoreSessionListener> {
+ /** */
+ private String name;
+
+ /**
+ *
+ */
+ public CacheDeploymentStoreSessionListenerFactory() {
+ }
+
+ /**
+ * @param name Name.
+ */
+ public CacheDeploymentStoreSessionListenerFactory(String name) {
+ this.name = name;
+ }
+
+ @Override public CacheStoreSessionListener create() {
+ return new CacheDeploymentSessionListener(name);
+ }
+
+ /**
+ */
+ private static class CacheDeploymentSessionListener implements CacheStoreSessionListener, LifecycleAware {
+ /** */
+ private final String name;
+
+ /**
+ * @param name Name.
+ */
+ private CacheDeploymentSessionListener(String name) {
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionStart(CacheStoreSession ses) {
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
+
+ }
+ }
+}
\ No newline at end of file
[18/22] ignite git commit: IGNITE-2509 - Fixed offheap metrics -
Fixes #470.
Posted by sb...@apache.org.
IGNITE-2509 - Fixed offheap metrics - Fixes #470.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/763bf578
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/763bf578
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/763bf578
Branch: refs/heads/ignite-2407
Commit: 763bf578e9f510e50bdfa6b9e51ea25348bfd2e9
Parents: 35b0e6b
Author: vershov <ve...@gridgain.com>
Authored: Fri Feb 12 12:51:50 2016 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Feb 12 12:51:50 2016 +0300
----------------------------------------------------------------------
.../org/apache/ignite/cache/CacheMemoryMode.java | 2 ++
.../processors/cache/GridCacheAdapter.java | 7 +++++++
.../processors/cache/GridCacheSwapManager.java | 3 ++-
.../internal/GridAffinityNoCacheSelfTest.java | 4 +++-
.../GridCacheOffHeapValuesEvictionSelfTest.java | 17 +++++++++++++++--
5 files changed, 29 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/763bf578/modules/core/src/main/java/org/apache/ignite/cache/CacheMemoryMode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheMemoryMode.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheMemoryMode.java
index a596824..0133327 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMemoryMode.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMemoryMode.java
@@ -56,6 +56,8 @@ public enum CacheMemoryMode {
* Entry keys will be stored on heap memory, and values will be stored in offheap memory. Note
* that in this mode entries can be evicted only to swap. The evictions will happen according
* to configured {@link EvictionPolicy}.
+ * <p/>
+ * Size returned by {@link CachePeekMode#OFFHEAP} is always zero, for this mode.
*/
OFFHEAP_VALUES,
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/763bf578/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 84eb0b8..3fac207 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -54,6 +54,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.CacheInterceptor;
+import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.Affinity;
@@ -4121,6 +4122,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public long offHeapAllocatedSize() {
+ if (ctx.config().getMemoryMode() == CacheMemoryMode.OFFHEAP_VALUES) {
+ assert ctx.unsafeMemory() != null;
+
+ return ctx.unsafeMemory().allocatedSize();
+ }
+
GridCacheSwapManager swapMgr = ctx.swap();
return swapMgr != null ? swapMgr.offHeapAllocatedSize() : -1;
http://git-wip-us.apache.org/repos/asf/ignite/blob/763bf578/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 37b5e15..cbf09bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentMap;
import javax.cache.Cache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.internal.managers.swapspace.GridSwapSpaceManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
@@ -85,7 +86,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
/** Flag to indicate if swap is enabled. */
private boolean swapEnabled;
- /** Flag to indicate if offheap is enabled. */
+ /** Flag to indicate if offheap is enabled. {@link CacheMemoryMode#OFFHEAP_VALUES} treated as offheap disabled. */
private boolean offheapEnabled;
/** Swap listeners. */
http://git-wip-us.apache.org/repos/asf/ignite/blob/763bf578/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java
index 6fb1280..5561f35 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java
@@ -101,13 +101,15 @@ public class GridAffinityNoCacheSelfTest extends GridCommonAbstractTest {
/**
* @param key Key.
*/
- private void checkAffinityImplCacheDeleted(Object key) {
+ private void checkAffinityImplCacheDeleted(Object key) throws InterruptedException{
IgniteEx grid = grid(0);
final String cacheName = "cacheToBeDeleted";
grid(1).getOrCreateCache(cacheName);
+ awaitPartitionMapExchange();
+
Affinity<Object> affinity = grid.affinity(cacheName);
assertTrue(affinity instanceof GridCacheAffinityImpl);
http://git-wip-us.apache.org/repos/asf/ignite/blob/763bf578/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapValuesEvictionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapValuesEvictionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapValuesEvictionSelfTest.java
index 9baab33..0efd89b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapValuesEvictionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapValuesEvictionSelfTest.java
@@ -33,12 +33,18 @@ import org.apache.ignite.testframework.GridTestUtils;
*/
public class GridCacheOffHeapValuesEvictionSelfTest extends GridCacheAbstractSelfTest {
+ /** */
private static final int VAL_SIZE = 512 * 1024; // bytes
+ /** */
private static final int MAX_VALS_AMOUNT = 100;
+ /** */
private static final int MAX_MEMORY_SIZE = MAX_VALS_AMOUNT * VAL_SIZE;
+ /** */
private static final int VALS_AMOUNT = MAX_VALS_AMOUNT * 2;
+ /** */
private static final int THREAD_COUNT = 4;
+ /** {@inheritDoc} */
@Override protected int gridCount() {
return 1;
}
@@ -46,7 +52,7 @@ public class GridCacheOffHeapValuesEvictionSelfTest extends GridCacheAbstractSel
/**
* @throws Exception If failed.
*/
- public void testPutOnHeap() throws Exception {
+ public void testPutValuesOffHeap() throws Exception {
CacheConfiguration<Integer, Object> ccfg = cacheConfiguration(grid(0).name());
ccfg.setName("testPutOffHeapValues");
ccfg.setStatisticsEnabled(true);
@@ -70,6 +76,10 @@ public class GridCacheOffHeapValuesEvictionSelfTest extends GridCacheAbstractSel
assertTrue(MAX_VALS_AMOUNT >= cache.size(CachePeekMode.ONHEAP));
assertTrue(MAX_VALS_AMOUNT - 5 <= cache.size(CachePeekMode.ONHEAP));
assertEquals(cache.size(CachePeekMode.ALL) - cache.size(CachePeekMode.ONHEAP), cache.size(CachePeekMode.SWAP));
+
+ assertTrue((MAX_VALS_AMOUNT + 5) * VAL_SIZE > cache.metrics().getOffHeapAllocatedSize());
+ assertTrue((MAX_VALS_AMOUNT - 5) * VAL_SIZE < cache.metrics().getOffHeapAllocatedSize());
+ assertTrue(cache.metrics().getOffHeapAllocatedSize() >= cache.size(CachePeekMode.ONHEAP) * VAL_SIZE);
}
/**
@@ -109,6 +119,7 @@ public class GridCacheOffHeapValuesEvictionSelfTest extends GridCacheAbstractSel
assertTrue((MAX_VALS_AMOUNT + 5) * VAL_SIZE > cache.metrics().getOffHeapAllocatedSize());
assertTrue((MAX_VALS_AMOUNT - 5) * VAL_SIZE < cache.metrics().getOffHeapAllocatedSize());
+ assertTrue(cache.metrics().getOffHeapAllocatedSize() >= cache.size(CachePeekMode.OFFHEAP) * VAL_SIZE);
}
/**
@@ -146,12 +157,14 @@ public class GridCacheOffHeapValuesEvictionSelfTest extends GridCacheAbstractSel
assertTrue((MAX_VALS_AMOUNT + 5) * VAL_SIZE > cache.metrics().getOffHeapAllocatedSize());
assertTrue((MAX_VALS_AMOUNT - 5) * VAL_SIZE < cache.metrics().getOffHeapAllocatedSize());
+ assertTrue(cache.metrics().getOffHeapAllocatedSize() >= cache.size(CachePeekMode.OFFHEAP) * VAL_SIZE);
}
+ /** Fill cache with values. */
private static void fillCache(final IgniteCache<Integer, Object> cache, long timeout) throws Exception{
final byte[] val = new byte[VAL_SIZE];
final AtomicInteger keyStart = new AtomicInteger(0);
- final CountDownLatch latch = new CountDownLatch(4);
+ final CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
GridTestUtils.runMultiThreaded(new Callable<Void>() {
@Override public Void call() throws Exception {
[09/22] ignite git commit: Added test.
Posted by sb...@apache.org.
Added test.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/16927abb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/16927abb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/16927abb
Branch: refs/heads/ignite-2407
Commit: 16927abbb1ed1a6c3b47831562d263dd38f495d1
Parents: fa3706f
Author: sboikov <sb...@gridgain.com>
Authored: Wed Feb 10 15:05:56 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Feb 10 15:06:31 2016 +0300
----------------------------------------------------------------------
.../cache/CacheQueryBuildValueTest.java | 144 +++++++++++++++++++
1 file changed, 144 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/16927abb/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryBuildValueTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryBuildValueTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryBuildValueTest.java
new file mode 100644
index 0000000..cb574bb
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryBuildValueTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteBinary;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.binary.BinaryTypeConfiguration;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.configuration.BinaryConfiguration;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class CacheQueryBuildValueTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setMarshaller(null);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ QueryEntity entity = new QueryEntity();
+ entity.setKeyType(Integer.class.getName());
+ entity.setValueType(TestBuilderValue.class.getName());
+
+ ArrayList<QueryIndex> idxs = new ArrayList<>();
+
+ QueryIndex idx = new QueryIndex("iVal");
+ idxs.add(idx);
+
+ LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+
+ fields.put("iVal", Integer.class.getName());
+
+ entity.setFields(fields);
+
+ entity.setIndexes(idxs);
+
+ ccfg.setQueryEntities(Collections.singleton(entity));
+
+ cfg.setCacheConfiguration(ccfg);
+
+ BinaryConfiguration binaryCfg = new BinaryConfiguration();
+
+ BinaryTypeConfiguration typeCfg = new BinaryTypeConfiguration();
+ typeCfg.setTypeName(TestBuilderValue.class.getName());
+
+ binaryCfg.setTypeConfigurations(Collections.singletonList(typeCfg));
+
+ cfg.setBinaryConfiguration(binaryCfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrid(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testBuilderAndQuery() throws Exception {
+ Ignite node = ignite(0);
+
+ final IgniteCache<Object, Object> cache = node.cache(null);
+
+ IgniteBinary binary = node.binary();
+
+ BinaryObjectBuilder builder = binary.builder(TestBuilderValue.class.getName());
+
+ cache.put(0, builder.build());
+
+ builder.setField("iVal", 1);
+
+ cache.put(1, builder.build());
+
+ List<Cache.Entry<Object, Object>> entries =
+ cache.query(new SqlQuery<>(TestBuilderValue.class, "true")).getAll();
+
+ assertEquals(2, entries.size());
+ }
+
+ /**
+ *
+ */
+ static class TestBuilderValue implements Serializable {
+ /** */
+ private int iVal;
+
+ /**
+ * @param iVal Integer value.
+ */
+ public TestBuilderValue(int iVal) {
+ this.iVal = iVal;
+ }
+ }
+}
[14/22] ignite git commit: Improved KerberosHadoopFileSystemFactory
JavaDocs.
Posted by sb...@apache.org.
Improved KerberosHadoopFileSystemFactory JavaDocs.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a9937a6e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a9937a6e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a9937a6e
Branch: refs/heads/ignite-2407
Commit: a9937a6e20407189b8bd67b2cc30a30ddc8dd6ce
Parents: d08a779
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Feb 10 16:57:16 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 10 16:57:16 2016 +0300
----------------------------------------------------------------------
.../apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a9937a6e/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
index fc768d6..a78cabc 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
@@ -106,6 +106,9 @@ public class KerberosHadoopFileSystemFactory extends BasicHadoopFileSystemFactor
/**
* Gets the key tab full file name (e.g. "/etc/security/keytabs/hdfs.headless.keytab" or "/etc/krb5.keytab").
+ * <p>
+ * <b>NOTE!</b> Factory can be serialized and transferred to other machines where instance of
+ * {@link IgniteHadoopFileSystem} resides. Corresponding path must exist on these machines as well.
*
* @return The key tab file name.
*/
[17/22] ignite git commit: IGNITE-2555
Posted by sb...@apache.org.
IGNITE-2555
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/35b0e6bf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/35b0e6bf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/35b0e6bf
Branch: refs/heads/ignite-2407
Commit: 35b0e6bf149bb86a3eefefcbc657c822e25681f3
Parents: 877be93
Author: ruskim <ru...@gmail.com>
Authored: Thu Feb 11 18:53:50 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Thu Feb 11 18:53:50 2016 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 14 ++-
.../internal/GridNodeMetricsLogSelfTest.java | 98 ++++++++++++++++++++
.../ignite/testsuites/IgniteBasicTestSuite.java | 2 +
3 files changed, 113 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/35b0e6bf/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index e3017ff..5d8daf6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -989,6 +989,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
double avgCpuLoadPct = m.getAverageCpuLoad() * 100;
double gcPct = m.getCurrentGcCpuLoad() * 100;
+ //Heap params
long heapUsed = m.getHeapMemoryUsed();
long heapMax = m.getHeapMemoryMaximum();
@@ -997,6 +998,15 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
double freeHeapPct = heapMax > 0 ? ((double)((heapMax - heapUsed) * 100)) / heapMax : -1;
+ //Non heap params
+ long nonHeapUsed = m.getNonHeapMemoryUsed();
+ long nonHeapMax = m.getNonHeapMemoryMaximum();
+
+ long nonHeapUsedInMBytes = nonHeapUsed / 1024 / 1024;
+ long nonHeapCommInMBytes = m.getNonHeapMemoryCommitted() / 1024 / 1024;
+
+ double freeNonHeapPct = nonHeapMax > 0 ? ((double)((nonHeapMax - nonHeapUsed) * 100)) / nonHeapMax : -1;
+
int hosts = 0;
int nodes = 0;
int cpus = 0;
@@ -1046,12 +1056,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
String msg = NL +
"Metrics for local node (to disable set 'metricsLogFrequency' to 0)" + NL +
- " ^-- Node [id=" + id + ", name=" + name() + "]" + NL +
+ " ^-- Node [id=" + id + ", name=" + name() + ", uptime=" + getUpTimeFormatted() + "]" + NL +
" ^-- H/N/C [hosts=" + hosts + ", nodes=" + nodes + ", CPUs=" + cpus + "]" + NL +
" ^-- CPU [cur=" + dblFmt.format(cpuLoadPct) + "%, avg=" +
dblFmt.format(avgCpuLoadPct) + "%, GC=" + dblFmt.format(gcPct) + "%]" + NL +
" ^-- Heap [used=" + dblFmt.format(heapUsedInMBytes) + "MB, free=" +
dblFmt.format(freeHeapPct) + "%, comm=" + dblFmt.format(heapCommInMBytes) + "MB]" + NL +
+ " ^-- Non heap [used=" + dblFmt.format(nonHeapUsedInMBytes) + "MB, free=" +
+ dblFmt.format(freeNonHeapPct) + "%, comm=" + dblFmt.format(nonHeapCommInMBytes) + "MB]" + NL +
" ^-- Public thread pool [active=" + pubPoolActiveThreads + ", idle=" +
pubPoolIdleThreads + ", qSize=" + pubPoolQSize + "]" + NL +
" ^-- System thread pool [active=" + sysPoolActiveThreads + ", idle=" +
http://git-wip-us.apache.org/repos/asf/ignite/blob/35b0e6bf/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
new file mode 100644
index 0000000..fe5922e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+
+import java.io.StringWriter;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.testframework.junits.common.GridCommonTest;
+import org.apache.log4j.Layout;
+import org.apache.log4j.Logger;
+import org.apache.log4j.SimpleLayout;
+import org.apache.log4j.WriterAppender;
+
+/**
+ * Check logging local node metrics
+ */
+@SuppressWarnings({"ProhibitedExceptionDeclared"})
+@GridCommonTest(group = "Kernal")
+public class GridNodeMetricsLogSelfTest extends GridCommonAbstractTest {
+ /** */
+
+ public GridNodeMetricsLogSelfTest() {
+ super(false);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"unchecked"})
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setMetricsLogFrequency(1000);
+
+ return cfg;
+ }
+
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNodeMetricsLog() throws Exception {
+ // Log to string, to check log content
+ Layout layout = new SimpleLayout();
+
+ StringWriter strWr = new StringWriter();
+
+ WriterAppender app = new WriterAppender(layout, strWr);
+
+ Logger.getRootLogger().addAppender(app);
+
+ Ignite g1 = startGrid(1);
+
+ IgniteCache<Integer, String> cache1 = g1.createCache("TestCache1");
+
+ cache1.put(1, "one");
+
+ Ignite g2 = startGrid(2);
+
+ IgniteCache<Integer, String> cache2 = g2.createCache("TestCache2");
+
+ cache2.put(2, "two");
+
+ Thread.sleep(10000);
+
+ //Check that nodes are alie
+ assert cache1.get(1).equals("one");
+ assert cache2.get(2).equals("two");
+
+ String fullLog = strWr.toString();
+
+ Logger.getRootLogger().removeAppender(app);
+
+ assert fullLog.contains("Metrics for local node");
+ assert fullLog.contains("uptime=");
+ assert fullLog.contains("Non heap");
+ assert fullLog.contains("Outbound messages queue");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/35b0e6bf/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index c904ef4..3903910 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.ClusterGroupSelfTest;
import org.apache.ignite.internal.GridFailFastNodeFailureDetectionSelfTest;
import org.apache.ignite.internal.GridLifecycleAwareSelfTest;
import org.apache.ignite.internal.GridLifecycleBeanSelfTest;
+import org.apache.ignite.internal.GridNodeMetricsLogSelfTest;
import org.apache.ignite.internal.GridProjectionForCachesSelfTest;
import org.apache.ignite.internal.GridReduceSelfTest;
import org.apache.ignite.internal.GridReleaseTypeSelfTest;
@@ -114,6 +115,7 @@ public class IgniteBasicTestSuite extends TestSuite {
suite.addTestSuite(IgniteSlowClientDetectionSelfTest.class);
GridTestUtils.addTestIfNeeded(suite, IgniteDaemonNodeMarshallerCacheTest.class, ignoredTests);
suite.addTestSuite(IgniteMarshallerCacheConcurrentReadWriteTest.class);
+ suite.addTestSuite(GridNodeMetricsLogSelfTest.class);
suite.addTestSuite(IgniteExceptionInNioWorkerSelfTest.class);
[08/22] ignite git commit: Merge remote-tracking branch
'origin/master'
Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/master'
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fa3706f8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fa3706f8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fa3706f8
Branch: refs/heads/ignite-2407
Commit: fa3706f866b42418abaa54c5f4e73f2dedb166b0
Parents: c3aa137 4c05fc0
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Feb 10 15:01:00 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 10 15:01:00 2016 +0300
----------------------------------------------------------------------
.../processors/cache/CacheLazyEntry.java | 3 +
.../processors/cache/GridCacheContext.java | 4 +-
.../processors/cache/GridCacheMapEntry.java | 118 +++-
.../binary/CacheObjectBinaryProcessorImpl.java | 6 +-
.../dht/atomic/GridDhtAtomicCache.java | 79 ++-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 85 ++-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 38 +-
.../cache/query/GridCacheQueryManager.java | 30 +-
.../continuous/CacheContinuousQueryHandler.java | 3 +-
.../CacheContinuousQueryListener.java | 2 +-
.../continuous/CacheContinuousQueryManager.java | 120 +++-
.../continuous/GridContinuousProcessor.java | 16 +-
.../IgniteCacheEntryListenerAbstractTest.java | 454 ++++++++----
...cheEntryListenerAtomicOffheapTieredTest.java | 32 +
...cheEntryListenerAtomicOffheapValuesTest.java | 32 +
...teCacheEntryListenerTxOffheapTieredTest.java | 32 +
...teCacheEntryListenerTxOffheapValuesTest.java | 32 +
.../cache/IgniteCacheEntryListenerTxTest.java | 1 +
...ContinuousQueryFailoverAbstractSelfTest.java | 10 +
...tomicPrimaryWriteOrderOffheapTieredTest.java | 33 +
...tinuousQueryFailoverTxOffheapTieredTest.java | 32 +
...acheContinuousQueryRandomOperationsTest.java | 684 +++++++++++++++++++
...ridCacheContinuousQueryAbstractSelfTest.java | 19 +-
...eContinuousQueryAtomicOffheapTieredTest.java | 32 +
...eContinuousQueryAtomicOffheapValuesTest.java | 32 +
...CacheContinuousQueryTxOffheapTieredTest.java | 32 +
...CacheContinuousQueryTxOffheapValuesTest.java | 32 +
.../junits/common/GridCommonAbstractTest.java | 2 +-
.../ignite/testsuites/IgniteCacheTestSuite.java | 8 +
.../IgniteCacheQuerySelfTestSuite.java | 14 +
.../commands/tasks/VisorTasksCommand.scala | 4 +-
.../scala/org/apache/ignite/visor/visor.scala | 4 +
32 files changed, 1749 insertions(+), 276 deletions(-)
----------------------------------------------------------------------
[07/22] ignite git commit: IGNITE-2564: CPP: Fixed a bug preventing
CPP memory reallocation from Java. This closes #460.
Posted by sb...@apache.org.
IGNITE-2564: CPP: Fixed a bug preventing CPP memory reallocation from Java. This closes #460.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c3aa1375
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c3aa1375
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c3aa1375
Branch: refs/heads/ignite-2407
Commit: c3aa1375171b5f3a97cd50fcd209256e46579b0d
Parents: b7475f0
Author: isapego <is...@gridgain.com>
Authored: Wed Feb 10 15:00:42 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 10 15:00:42 2016 +0300
----------------------------------------------------------------------
modules/platforms/cpp/core-test/Makefile.am | 1 +
.../cpp/core-test/project/vs/core-test.vcxproj | 1 +
.../project/vs/core-test.vcxproj.filters | 3 +
.../platforms/cpp/core-test/src/cache_test.cpp | 12 +++
.../cpp/core-test/src/interop_memory_test.cpp | 95 ++++++++++++++++++++
.../include/ignite/impl/ignite_environment.h | 19 ++--
.../cpp/core/src/impl/cache/cache_impl.cpp | 2 +-
.../cpp/core/src/impl/ignite_environment.cpp | 30 +++++--
8 files changed, 149 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c3aa1375/modules/platforms/cpp/core-test/Makefile.am
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/Makefile.am b/modules/platforms/cpp/core-test/Makefile.am
index aa81c65..531fee0 100644
--- a/modules/platforms/cpp/core-test/Makefile.am
+++ b/modules/platforms/cpp/core-test/Makefile.am
@@ -29,6 +29,7 @@ ignite_tests_SOURCES = src/cache_test.cpp \
src/cache_query_test.cpp \
src/concurrent_test.cpp \
src/ignition_test.cpp \
+ src/interop_memory_test.cpp \
src/handle_registry_test.cpp \
src/binary_test_defs.cpp \
src/binary_reader_writer_raw_test.cpp \
http://git-wip-us.apache.org/repos/asf/ignite/blob/c3aa1375/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj
index 422199a..d98d202 100644
--- a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj
+++ b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj
@@ -40,6 +40,7 @@
<ClCompile Include="..\..\src\binary_session_test.cpp" />
<ClCompile Include="..\..\src\binary_test_defs.cpp" />
<ClCompile Include="..\..\src\cache_query_test.cpp" />
+ <ClCompile Include="..\..\src\interop_memory_test.cpp" />
<ClCompile Include="..\..\src\teamcity_boost.cpp" />
<ClCompile Include="..\..\src\teamcity_messages.cpp" />
</ItemGroup>
http://git-wip-us.apache.org/repos/asf/ignite/blob/c3aa1375/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters
index 32737be..15b9c40 100644
--- a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters
+++ b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters
@@ -34,6 +34,9 @@
<ClCompile Include="..\..\src\binary_reader_writer_raw_test.cpp">
<Filter>Code</Filter>
</ClCompile>
+ <ClCompile Include="..\..\src\interop_memory_test.cpp">
+ <Filter>Code</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\include\teamcity_messages.h">
http://git-wip-us.apache.org/repos/asf/ignite/blob/c3aa1375/modules/platforms/cpp/core-test/src/cache_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/src/cache_test.cpp b/modules/platforms/cpp/core-test/src/cache_test.cpp
index 32c5bd6..a11865d 100644
--- a/modules/platforms/cpp/core-test/src/cache_test.cpp
+++ b/modules/platforms/cpp/core-test/src/cache_test.cpp
@@ -476,4 +476,16 @@ BOOST_AUTO_TEST_CASE(TestGetOrCreateCache)
BOOST_REQUIRE(7 == cache2.Get(5));
}
+BOOST_AUTO_TEST_CASE(TestGetBigString)
+{
+ // Get existing cache
+ cache::Cache<int, std::string> cache = grid0.GetOrCreateCache<int, std::string>("partitioned");
+
+ std::string longStr(impl::IgniteEnvironment::DEFAULT_ALLOCATION_SIZE * 10, 'a');
+
+ cache.Put(5, longStr);
+
+ BOOST_REQUIRE(longStr == cache.Get(5));
+}
+
BOOST_AUTO_TEST_SUITE_END()
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/c3aa1375/modules/platforms/cpp/core-test/src/interop_memory_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/src/interop_memory_test.cpp b/modules/platforms/cpp/core-test/src/interop_memory_test.cpp
new file mode 100644
index 0000000..07e928c
--- /dev/null
+++ b/modules/platforms/cpp/core-test/src/interop_memory_test.cpp
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _MSC_VER
+ #define BOOST_TEST_DYN_LINK
+#endif
+
+#include <boost/test/unit_test.hpp>
+
+#include "ignite/ignite.h"
+#include "ignite/ignition.h"
+
+using namespace ignite;
+using namespace impl;
+using namespace boost::unit_test;
+
+BOOST_AUTO_TEST_SUITE(MemoryTestSuite)
+
+BOOST_AUTO_TEST_CASE(MemoryReallocationTest)
+{
+ using impl::interop::InteropMemory;
+ using common::concurrent::SharedPointer;
+
+ IgniteEnvironment env;
+
+ SharedPointer<InteropMemory> mem = env.AllocateMemory();
+
+ BOOST_CHECK_EQUAL(mem.Get()->Capacity(), IgniteEnvironment::DEFAULT_ALLOCATION_SIZE);
+
+ BOOST_CHECK(mem.Get()->Data() != NULL);
+
+ // Checking memory for write access.
+ int32_t capBeforeReallocation = mem.Get()->Capacity();
+
+ for (int32_t i = 0; i <capBeforeReallocation; ++i)
+ {
+ int8_t *data = mem.Get()->Data();
+
+ data[i] = static_cast<int8_t>(i);
+ }
+
+ mem.Get()->Reallocate(mem.Get()->Capacity() * 3);
+
+ BOOST_CHECK(mem.Get()->Capacity() >= IgniteEnvironment::DEFAULT_ALLOCATION_SIZE * 3);
+
+ // Checking memory data.
+ for (int32_t i = 0; i <capBeforeReallocation; ++i)
+ {
+ int8_t *data = mem.Get()->Data();
+
+ BOOST_REQUIRE_EQUAL(data[i], static_cast<int8_t>(i));
+ }
+
+ // Checking memory for write access.
+ capBeforeReallocation = mem.Get()->Capacity();
+
+ for (int32_t i = 0; i <capBeforeReallocation; ++i)
+ {
+ int8_t *data = mem.Get()->Data();
+
+ data[i] = static_cast<int8_t>(i + 42);
+ }
+
+ // Trying reallocate memory once more.
+ mem.Get()->Reallocate(mem.Get()->Capacity() * 3);
+
+ // Checking memory data.
+ for (int32_t i = 0; i <capBeforeReallocation; ++i)
+ {
+ int8_t *data = mem.Get()->Data();
+
+ BOOST_REQUIRE_EQUAL(data[i], static_cast<int8_t>(i + 42));
+ }
+
+ BOOST_CHECK(mem.Get()->Capacity() >= IgniteEnvironment::DEFAULT_ALLOCATION_SIZE * 9);
+
+ // Checking memory for write access.
+ memset(mem.Get()->Data(), 0xF0F0F0F0, mem.Get()->Capacity());
+}
+
+BOOST_AUTO_TEST_SUITE_END()
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/c3aa1375/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
index 2fbdb44..02facfc 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
@@ -25,16 +25,21 @@
#include "binary/binary_type_manager.h"
namespace ignite
-{
+{
namespace impl
{
/**
* Defines environment in which Ignite operates.
*/
class IGNITE_IMPORT_EXPORT IgniteEnvironment
- {
+ {
public:
/**
+ * Default memory block allocation size.
+ */
+ enum { DEFAULT_ALLOCATION_SIZE = 1024 };
+
+ /**
* Default constructor.
*/
IgniteEnvironment();
@@ -51,7 +56,7 @@ namespace ignite
* @return JNI handlers.
*/
ignite::common::java::JniHandlers GetJniHandlers(ignite::common::concurrent::SharedPointer<IgniteEnvironment>* target);
-
+
/**
* Perform initialization on successful start.
*
@@ -64,8 +69,8 @@ namespace ignite
*
* @param memPtr Memory pointer.
*/
- void OnStartCallback(long long memPtr);
-
+ void OnStartCallback(long long memPtr);
+
/**
* Get name of Ignite instance.
*
@@ -120,11 +125,11 @@ namespace ignite
char* name;
/** Type manager. */
- binary::BinaryTypeManager* metaMgr;
+ binary::BinaryTypeManager* metaMgr;
IGNITE_NO_COPY_ASSIGNMENT(IgniteEnvironment);
};
- }
+ }
}
#endif
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/c3aa1375/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
index 08526b5..f66a228 100644
--- a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
@@ -379,7 +379,7 @@ namespace ignite
if (outPtr)
{
- env.Get()->Context()->TargetInStreamOutStream(javaRef, opType, WriteTo(outMem.Get(), inOp, err),
+ env.Get()->Context()->TargetInStreamOutStream(javaRef, opType, outPtr,
inMem.Get()->PointerLong(), &jniErr);
IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
http://git-wip-us.apache.org/repos/asf/ignite/blob/c3aa1375/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
index 013a139..c9c57a0 100644
--- a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
+++ b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
@@ -53,7 +53,23 @@ namespace ignite
SharedPointer<IgniteEnvironment>* ptr = static_cast<SharedPointer<IgniteEnvironment>*>(target);
delete ptr;
- }
+ }
+
+ /**
+ * Memory reallocate callback.
+ *
+ * @param target Target environment.
+ * @param memPtr Memory pointer.
+ * @param cap Required capasity.
+ */
+ void IGNITE_CALL MemoryReallocate(void* target, long long memPtr, int cap)
+ {
+ SharedPointer<IgniteEnvironment>* env = static_cast<SharedPointer<IgniteEnvironment>*>(target);
+
+ SharedPointer<InteropMemory> mem = env->Get()->GetMemory(memPtr);
+
+ mem.Get()->Reallocate(cap);
+ }
IgniteEnvironment::IgniteEnvironment() : ctx(SharedPointer<JniContext>()), latch(new SingleLatch), name(NULL),
metaMgr(new BinaryTypeManager())
@@ -80,18 +96,20 @@ namespace ignite
hnds.onStart = OnStart;
hnds.onStop = OnStop;
+ hnds.memRealloc = MemoryReallocate;
+
hnds.error = NULL;
return hnds;
}
-
+
void IgniteEnvironment::Initialize(SharedPointer<JniContext> ctx)
{
this->ctx = ctx;
-
+
latch->CountDown();
}
-
+
const char* IgniteEnvironment::InstanceName() const
{
return name;
@@ -104,7 +122,7 @@ namespace ignite
SharedPointer<InteropMemory> IgniteEnvironment::AllocateMemory()
{
- SharedPointer<InteropMemory> ptr(new InteropUnpooledMemory(1024));
+ SharedPointer<InteropMemory> ptr(new InteropUnpooledMemory(DEFAULT_ALLOCATION_SIZE));
return ptr;
}
@@ -147,7 +165,7 @@ namespace ignite
InteropInputStream stream(&mem);
BinaryReaderImpl reader(&stream);
-
+
int32_t nameLen = reader.ReadString(NULL, 0);
if (nameLen >= 0)
[21/22] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-2407
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-2407
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/112e76e6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/112e76e6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/112e76e6
Branch: refs/heads/ignite-2407
Commit: 112e76e6e06b9cda1867dc46f6a8380ca4e2bcff
Parents: 570de20 a32dfc4
Author: sboikov <sb...@gridgain.com>
Authored: Fri Feb 12 15:40:59 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Feb 12 15:40:59 2016 +0300
----------------------------------------------------------------------
DEVNOTES.txt | 4 +-
.../apache/ignite/cache/CacheMemoryMode.java | 2 +
.../internal/GridMessageListenHandler.java | 16 +
.../apache/ignite/internal/IgniteKernal.java | 14 +-
.../processors/cache/CacheLazyEntry.java | 3 +
.../processors/cache/GridCacheAdapter.java | 7 +
.../processors/cache/GridCacheContext.java | 4 +-
.../processors/cache/GridCacheMapEntry.java | 118 +++-
.../processors/cache/GridCacheProcessor.java | 8 +-
.../processors/cache/GridCacheSwapManager.java | 3 +-
.../binary/CacheObjectBinaryProcessorImpl.java | 6 +-
.../dht/atomic/GridDhtAtomicCache.java | 79 ++-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 85 ++-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 38 +-
.../cache/query/GridCacheQueryManager.java | 30 +-
.../continuous/CacheContinuousQueryHandler.java | 3 +-
.../CacheContinuousQueryListener.java | 2 +-
.../continuous/CacheContinuousQueryManager.java | 120 +++-
.../continuous/GridContinuousProcessor.java | 62 +-
.../internal/processors/igfs/IgfsProcessor.java | 14 +
.../internal/GridAffinityNoCacheSelfTest.java | 4 +-
.../internal/GridNodeMetricsLogSelfTest.java | 98 +++
...eClientReconnectContinuousProcessorTest.java | 32 +-
.../GridCacheOffHeapValuesEvictionSelfTest.java | 17 +-
.../IgniteCacheEntryListenerAbstractTest.java | 454 ++++++++----
...cheEntryListenerAtomicOffheapTieredTest.java | 32 +
...cheEntryListenerAtomicOffheapValuesTest.java | 32 +
...niteCacheEntryListenerExpiredEventsTest.java | 202 ++++++
...teCacheEntryListenerTxOffheapTieredTest.java | 32 +
...teCacheEntryListenerTxOffheapValuesTest.java | 32 +
.../cache/IgniteCacheEntryListenerTxTest.java | 1 +
.../GridCacheReplicatedPreloadSelfTest.java | 121 +++-
...ContinuousQueryFailoverAbstractSelfTest.java | 10 +
...tomicPrimaryWriteOrderOffheapTieredTest.java | 33 +
...tinuousQueryFailoverTxOffheapTieredTest.java | 32 +
...acheContinuousQueryRandomOperationsTest.java | 684 +++++++++++++++++++
...ridCacheContinuousQueryAbstractSelfTest.java | 19 +-
...eContinuousQueryAtomicOffheapTieredTest.java | 32 +
...eContinuousQueryAtomicOffheapValuesTest.java | 32 +
...CacheContinuousQueryTxOffheapTieredTest.java | 32 +
...CacheContinuousQueryTxOffheapValuesTest.java | 32 +
...IgniteCacheContinuousQueryReconnectTest.java | 192 ++++++
.../igfs/IgfsProcessorValidationSelfTest.java | 27 +
.../tcp/TcpClientDiscoverySpiSelfTest.java | 2 +-
.../junits/common/GridCommonAbstractTest.java | 2 +-
.../ignite/testsuites/IgniteBasicTestSuite.java | 2 +
.../ignite/testsuites/IgniteCacheTestSuite.java | 8 +
.../p2p/CacheDeploymentAffinityKeyMapper.java | 35 +
.../CacheDeploymentAlwaysTruePredicate2.java | 30 +
...oymentCacheEntryEventSerializableFilter.java | 32 +
.../p2p/CacheDeploymentCacheEntryListener.java | 31 +
...CacheDeploymentCachePluginConfiguration.java | 74 ++
...heDeploymentStoreSessionListenerFactory.java | 83 +++
.../hadoop/fs/BasicHadoopFileSystemFactory.java | 22 +-
.../fs/CachingHadoopFileSystemFactory.java | 2 +-
.../fs/KerberosHadoopFileSystemFactory.java | 217 ++++++
...KerberosHadoopFileSystemFactorySelfTest.java | 121 ++++
.../testsuites/IgniteHadoopTestSuite.java | 3 +
.../cache/CacheQueryBuildValueTest.java | 144 ++++
.../IgniteCacheQuerySelfTestSuite.java | 16 +
modules/platforms/cpp/core-test/Makefile.am | 1 +
.../cpp/core-test/project/vs/core-test.vcxproj | 1 +
.../project/vs/core-test.vcxproj.filters | 3 +
.../platforms/cpp/core-test/src/cache_test.cpp | 12 +
.../cpp/core-test/src/interop_memory_test.cpp | 95 +++
.../include/ignite/impl/ignite_environment.h | 19 +-
.../cpp/core/src/impl/cache/cache_impl.cpp | 2 +-
.../cpp/core/src/impl/ignite_environment.cpp | 30 +-
.../Common/IgniteConfigurationXmlSerializer.cs | 7 +-
.../commands/tasks/VisorTasksCommand.scala | 4 +-
.../scala/org/apache/ignite/visor/visor.scala | 4 +
71 files changed, 3450 insertions(+), 352 deletions(-)
----------------------------------------------------------------------
[16/22] ignite git commit: Fix R# analysis warnings
Posted by sb...@apache.org.
Fix R# analysis warnings
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/877be93e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/877be93e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/877be93e
Branch: refs/heads/ignite-2407
Commit: 877be93ee35afddcc126a147cfd3cd1dda4a46ce
Parents: 0491a5f
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Thu Feb 11 16:31:28 2016 +0300
Committer: Pavel Tupitsyn <pt...@gridgain.com>
Committed: Thu Feb 11 16:31:28 2016 +0300
----------------------------------------------------------------------
.../Impl/Common/IgniteConfigurationXmlSerializer.cs | 7 ++-----
1 file changed, 2 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/877be93e/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/IgniteConfigurationXmlSerializer.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/IgniteConfigurationXmlSerializer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/IgniteConfigurationXmlSerializer.cs
index af25bfa..c27012a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/IgniteConfigurationXmlSerializer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/IgniteConfigurationXmlSerializer.cs
@@ -23,7 +23,6 @@ namespace Apache.Ignite.Core.Impl.Common
using System.ComponentModel;
using System.Configuration;
using System.Diagnostics;
- using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Reflection;
using System.Xml;
@@ -99,14 +98,13 @@ namespace Apache.Ignite.Core.Impl.Common
/// <summary>
/// Writes the property of a basic type (primitives, strings, types).
/// </summary>
- [SuppressMessage("ReSharper", "AssignNullToNotNullAttribute")]
private static void WriteBasicProperty(object obj, XmlWriter writer, Type valueType, PropertyInfo property)
{
var converter = GetConverter(property, valueType);
var stringValue = converter.ConvertToInvariantString(obj);
- writer.WriteString(stringValue);
+ writer.WriteString(stringValue ?? "");
}
/// <summary>
@@ -125,7 +123,6 @@ namespace Apache.Ignite.Core.Impl.Common
/// <summary>
/// Writes the complex property (nested object).
/// </summary>
- [SuppressMessage("ReSharper", "AssignNullToNotNullAttribute")]
private static void WriteComplexProperty(object obj, XmlWriter writer, Type valueType)
{
var props = GetNonDefaultProperties(obj).ToList();
@@ -139,7 +136,7 @@ namespace Apache.Ignite.Core.Impl.Common
{
var converter = GetConverter(prop, prop.PropertyType);
var stringValue = converter.ConvertToInvariantString(prop.GetValue(obj, null));
- writer.WriteAttributeString(PropertyNameToXmlName(prop.Name), stringValue);
+ writer.WriteAttributeString(PropertyNameToXmlName(prop.Name), stringValue ?? "");
}
// Write elements
[03/22] ignite git commit: IGNITE-1507 Correct help text for 'tasks'
command in visorcmd.
Posted by sb...@apache.org.
IGNITE-1507 Correct help text for 'tasks' command in visorcmd.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0b47d5cb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0b47d5cb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0b47d5cb
Branch: refs/heads/ignite-2407
Commit: 0b47d5cbc6a6bf87e2ab121a0e50d795e3f9ed17
Parents: c67e2ea
Author: Vasiliy Sisko <vs...@gridgain.com>
Authored: Wed Feb 10 17:13:50 2016 +0700
Committer: Andrey <an...@gridgain.com>
Committed: Wed Feb 10 17:13:50 2016 +0700
----------------------------------------------------------------------
.../apache/ignite/visor/commands/tasks/VisorTasksCommand.scala | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0b47d5cb/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/tasks/VisorTasksCommand.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/tasks/VisorTasksCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/tasks/VisorTasksCommand.scala
index e158506..660c5f1 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/tasks/VisorTasksCommand.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/tasks/VisorTasksCommand.scala
@@ -250,7 +250,7 @@ private case class VisorTask(
* | | of Event Storage SPI that is responsible for temporary storage of generated |
* | | events on each node can also affect the functionality of this command. |
* | | |
- * | | By default - all events are enabled and Ignite stores last 10,000 local |
+ * | | By default - all events are disabled and Ignite stores last 10,000 local |
* | | events on each node. Both of these defaults can be changed in configuration. |
* +---------------------------------------------------------------------------------------+
* }}}
@@ -1341,7 +1341,7 @@ object VisorTasksCommand {
"of Event Storage SPI that is responsible for temporary storage of generated",
"events on each node can also affect the functionality of this command.",
" ",
- "By default - all events are enabled and Ignite stores last 10,000 local",
+ "By default - all events are disabled and Ignite stores last 10,000 local",
"events on each node. Both of these defaults can be changed in configuration."
),
spec = List(
[12/22] ignite git commit: Fixed devnotes.txt for yarn and mesos
modules.
Posted by sb...@apache.org.
Fixed devnotes.txt for yarn and mesos modules.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/14824a19
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/14824a19
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/14824a19
Branch: refs/heads/ignite-2407
Commit: 14824a19847091d5599f41eeaf52aa2694b7da87
Parents: 5539cba
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Wed Feb 10 15:46:27 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Feb 10 15:46:27 2016 +0300
----------------------------------------------------------------------
DEVNOTES.txt | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/14824a19/DEVNOTES.txt
----------------------------------------------------------------------
diff --git a/DEVNOTES.txt b/DEVNOTES.txt
index 3e19243..e920b79 100644
--- a/DEVNOTES.txt
+++ b/DEVNOTES.txt
@@ -110,7 +110,7 @@ cd to ./modules/mesos
mvn clean package
-Look for ignite-mesos-<version>-jar-with-dependencies.jar in ./target directory.
+Look for ignite-mesos-<version>.jar in ./target directory.
Ignite Yarn Maven Build Instructions
============================================
@@ -118,7 +118,7 @@ cd to ./modules/yarn
mvn clean package
-Look for ignite-yarn-<version>-jar-with-dependencies.jar in ./target directory.
+Look for ignite-yarn-<version>.jar in ./target directory.
Run tests
==========
[13/22] ignite git commit: Merge remote-tracking branch
'origin/master'
Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/master'
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d08a7790
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d08a7790
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d08a7790
Branch: refs/heads/ignite-2407
Commit: d08a779083634b2d332961c8ae098e2e2cf041ed
Parents: 008c8cd 14824a1
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Feb 10 15:46:36 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 10 15:46:36 2016 +0300
----------------------------------------------------------------------
DEVNOTES.txt | 4 +-
...niteCacheEntryListenerExpiredEventsTest.java | 202 +++++++++++++++++++
.../cache/CacheQueryBuildValueTest.java | 144 +++++++++++++
3 files changed, 348 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
[11/22] ignite git commit: IGNITE-2195: Implemented Hadoop FileSystem
factory capable of working with kerberized file systems. This closes #464.
Posted by sb...@apache.org.
IGNITE-2195: Implemented Hadoop FileSystem factory capable of working with kerberized file systems. This closes #464.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/008c8cd3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/008c8cd3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/008c8cd3
Branch: refs/heads/ignite-2407
Commit: 008c8cd3f33b9c2cace43a9d1f2b4e4542fb58fe
Parents: fa3706f
Author: iveselovskiy <iv...@gridgain.com>
Authored: Wed Feb 10 15:45:58 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 10 15:45:58 2016 +0300
----------------------------------------------------------------------
.../hadoop/fs/BasicHadoopFileSystemFactory.java | 22 +-
.../fs/CachingHadoopFileSystemFactory.java | 2 +-
.../fs/KerberosHadoopFileSystemFactory.java | 214 +++++++++++++++++++
...KerberosHadoopFileSystemFactorySelfTest.java | 104 +++++++++
.../testsuites/IgniteHadoopTestSuite.java | 3 +
5 files changed, 339 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/008c8cd3/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
index c791e9a..01fe6c9 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
@@ -66,7 +66,7 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
/** {@inheritDoc} */
@Override public FileSystem get(String usrName) throws IOException {
- return create0(IgfsUtils.fixUserName(usrName));
+ return get0(IgfsUtils.fixUserName(usrName));
}
/**
@@ -76,7 +76,7 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
* @return File system.
* @throws IOException If failed.
*/
- protected FileSystem create0(String usrName) throws IOException {
+ protected FileSystem get0(String usrName) throws IOException {
assert cfg != null;
try {
@@ -87,12 +87,12 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
ClassLoader clsLdr = getClass().getClassLoader();
if (ctxClsLdr == clsLdr)
- return FileSystem.get(fullUri, cfg, usrName);
+ return create(usrName);
else {
Thread.currentThread().setContextClassLoader(clsLdr);
try {
- return FileSystem.get(fullUri, cfg, usrName);
+ return create(usrName);
}
finally {
Thread.currentThread().setContextClassLoader(ctxClsLdr);
@@ -107,6 +107,18 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
}
/**
+ * Internal file system creation routine, invoked in correct class loader context.
+ *
+ * @param usrName User name.
+ * @return File system.
+ * @throws IOException If failed.
+ * @throws InterruptedException if the current thread is interrupted.
+ */
+ protected FileSystem create(String usrName) throws IOException, InterruptedException {
+ return FileSystem.get(fullUri, cfg, usrName);
+ }
+
+ /**
* Gets file system URI.
* <p>
* This URI will be used as a first argument when calling {@link FileSystem#get(URI, Configuration, String)}.
@@ -152,7 +164,7 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
*
* @param cfgPaths Paths to file system configuration files.
*/
- public void setConfigPaths(String... cfgPaths) {
+ public void setConfigPaths(@Nullable String... cfgPaths) {
this.cfgPaths = cfgPaths;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/008c8cd3/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
index 91f7777..e1b30c4 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
@@ -47,7 +47,7 @@ public class CachingHadoopFileSystemFactory extends BasicHadoopFileSystemFactory
private final transient HadoopLazyConcurrentMap<String, FileSystem> cache = new HadoopLazyConcurrentMap<>(
new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() {
@Override public FileSystem createValue(String key) throws IOException {
- return create0(key);
+ return get0(key);
}
}
);
http://git-wip-us.apache.org/repos/asf/ignite/blob/008c8cd3/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
new file mode 100644
index 0000000..fc768d6
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.fs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * Secure Hadoop file system factory that can work with underlying file system protected with Kerberos.
+ * It uses "impersonation" mechanism, to be able to work on behalf of arbitrary client user.
+ * Please see https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html for details.
+ * The principal and the key tab name to be used for Kerberos authentication are set explicitly
+ * in the factory configuration.
+ *
+ * <p>This factory does not cache any file system instances. If {@code "fs.[prefix].impl.disable.cache"} is set
+ * to {@code true}, file system instances will be cached by Hadoop.
+ */
+public class KerberosHadoopFileSystemFactory extends BasicHadoopFileSystemFactory {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** The default interval used to re-login from the key tab, in milliseconds. */
+ public static final long DFLT_RELOGIN_INTERVAL = 10 * 60 * 1000L;
+
+ /** Keytab full file name. */
+ private String keyTab;
+
+ /** Keytab principal. */
+ private String keyTabPrincipal;
+
+ /** The re-login interval. See {@link #getReloginInterval()} for more information. */
+ private long reloginInterval = DFLT_RELOGIN_INTERVAL;
+
+ /** Time of last re-login attempt, in system milliseconds. */
+ private transient volatile long lastReloginTime;
+
+ /**
+ * Constructor.
+ */
+ public KerberosHadoopFileSystemFactory() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileSystem get(String userName) throws IOException {
+ reloginIfNeeded();
+
+ return super.get(userName);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected FileSystem create(String usrName) throws IOException, InterruptedException {
+ UserGroupInformation proxyUgi = UserGroupInformation.createProxyUser(usrName,
+ UserGroupInformation.getLoginUser());
+
+ return proxyUgi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+ @Override public FileSystem run() throws Exception {
+ return FileSystem.get(fullUri, cfg);
+ }
+ });
+ }
+
+ /**
+ * Gets the key tab principal short name (e.g. "hdfs").
+ *
+ * @return The key tab principal.
+ */
+ @Nullable public String getKeyTabPrincipal() {
+ return keyTabPrincipal;
+ }
+
+ /**
+ * Set the key tab principal name. See {@link #getKeyTabPrincipal()} for more information.
+ *
+ * @param keyTabPrincipal The key tab principal name.
+ */
+ public void setKeyTabPrincipal(@Nullable String keyTabPrincipal) {
+ this.keyTabPrincipal = keyTabPrincipal;
+ }
+
+ /**
+ * Gets the key tab full file name (e.g. "/etc/security/keytabs/hdfs.headless.keytab" or "/etc/krb5.keytab").
+ *
+ * @return The key tab file name.
+ */
+ @Nullable public String getKeyTab() {
+ return keyTab;
+ }
+
+ /**
+ * Sets the key tab file name. See {@link #getKeyTab()} for more information.
+ *
+ * @param keyTab The key tab file name.
+ */
+ public void setKeyTab(@Nullable String keyTab) {
+ this.keyTab = keyTab;
+ }
+
+ /**
+ * The interval used to re-login from the key tab, in milliseconds.
+ * Important that the value should not be larger than the Kerberos ticket life time multiplied by 0.2. This is
+ * because the ticket renew window starts from {@code 0.8 * ticket life time}.
+ * Default ticket life time is 1 day (24 hours), so the default re-login interval (10 min)
+ * is obeys this rule well.
+ *
+ * <p>Zero value means that re-login should be attempted on each file system operation.
+ * Negative values are not allowed.
+ *
+ * <p>Note, however, that it does not make sense to make this value small, because Hadoop does not allow to
+ * login if less than {@link org.apache.hadoop.security.UserGroupInformation#MIN_TIME_BEFORE_RELOGIN} milliseconds
+ * have passed since the time of the previous login.
+ * See {@link org.apache.hadoop.security.UserGroupInformation#hasSufficientTimeElapsed(long)} and its usages for
+ * more detail.
+ *
+ * @return The re-login interval, in milliseconds.
+ */
+ public long getReloginInterval() {
+ return reloginInterval;
+ }
+
+ /**
+ * Sets the relogin interval in milliseconds. See {@link #getReloginInterval()} for more information.
+ *
+ * @param reloginInterval The re-login interval, in milliseconds.
+ */
+ public void setReloginInterval(long reloginInterval) {
+ this.reloginInterval = reloginInterval;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ A.ensure(!F.isEmpty(keyTab), "keyTab cannot not be empty.");
+ A.ensure(!F.isEmpty(keyTabPrincipal), "keyTabPrincipal cannot not be empty.");
+ A.ensure(reloginInterval >= 0, "reloginInterval cannot not be negative.");
+
+ super.start();
+
+ try {
+ UserGroupInformation.setConfiguration(cfg);
+ UserGroupInformation.loginUserFromKeytab(keyTabPrincipal, keyTab);
+ }
+ catch (IOException ioe) {
+ throw new IgniteException("Failed login from keytab [keyTab=" + keyTab +
+ ", keyTabPrincipal=" + keyTabPrincipal + ']', ioe);
+ }
+ }
+
+ /**
+ * Re-logins the user if needed.
+ * First, the re-login interval defined in factory is checked. The re-login attempts will be not more
+ * frequent than one attempt per {@code reloginInterval}.
+ * Second, {@link UserGroupInformation#checkTGTAndReloginFromKeytab()} method invoked that gets existing
+ * TGT and checks its validity. If the TGT is expired or is close to expiry, it performs re-login.
+ *
+ * <p>This operation expected to be called upon each operation with the file system created with the factory.
+ * As long as {@link #get(String)} operation is invoked upon each file {@link IgniteHadoopFileSystem}, there
+ * is no need to invoke it otherwise specially.
+ *
+ * @throws IOException If login fails.
+ */
+ private void reloginIfNeeded() throws IOException {
+ long now = System.currentTimeMillis();
+
+ if (now >= lastReloginTime + reloginInterval) {
+ UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
+
+ lastReloginTime = now;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+
+ U.writeString(out, keyTab);
+ U.writeString(out, keyTabPrincipal);
+ out.writeLong(reloginInterval);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+
+ keyTab = U.readString(in);
+ keyTabPrincipal = U.readString(in);
+ reloginInterval = in.readLong();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/008c8cd3/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java
new file mode 100644
index 0000000..8fb1612
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java
@@ -0,0 +1,104 @@
+package org.apache.ignite.hadoop.fs;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.concurrent.Callable;
+
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+
+/**
+ * Tests KerberosHadoopFileSystemFactory.
+ */
+public class KerberosHadoopFileSystemFactorySelfTest extends GridCommonAbstractTest {
+ /**
+ * Test parameters validation.
+ *
+ * @throws Exception If failed.
+ */
+ public void testParameters() throws Exception {
+ checkParameters(null, null, -1);
+
+ checkParameters(null, null, 100);
+ checkParameters(null, "b", -1);
+ checkParameters("a", null, -1);
+
+ checkParameters(null, "b", 100);
+ checkParameters("a", null, 100);
+ checkParameters("a", "b", -1);
+ }
+
+ /**
+ * Check parameters.
+ *
+ * @param keyTab Key tab.
+ * @param keyTabPrincipal Key tab principal.
+ * @param reloginInterval Re-login interval.
+ */
+ @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+ private void checkParameters(String keyTab, String keyTabPrincipal, long reloginInterval) {
+ final KerberosHadoopFileSystemFactory fac = new KerberosHadoopFileSystemFactory();
+
+ fac.setKeyTab(keyTab);
+ fac.setKeyTabPrincipal(keyTabPrincipal);
+ fac.setReloginInterval(reloginInterval);
+
+ GridTestUtils.assertThrows(null, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ fac.start();
+
+ return null;
+ }
+ }, IllegalArgumentException.class, null);
+ }
+
+ /**
+ * Checks serializatuion and deserialization of the secure factory.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSerialization() throws Exception {
+ KerberosHadoopFileSystemFactory fac = new KerberosHadoopFileSystemFactory();
+
+ checkSerialization(fac);
+
+ fac = new KerberosHadoopFileSystemFactory();
+
+ fac.setUri("igfs://igfs@localhost:10500/");
+ fac.setConfigPaths("/a/core-sute.xml", "/b/mapred-site.xml");
+ fac.setKeyTabPrincipal("foo");
+ fac.setKeyTab("/etc/krb5.keytab");
+ fac.setReloginInterval(30 * 60 * 1000L);
+
+ checkSerialization(fac);
+ }
+
+ /**
+ * Serializes the factory,
+ *
+ * @param fac The facory to check.
+ * @throws Exception If failed.
+ */
+ private void checkSerialization(KerberosHadoopFileSystemFactory fac) throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ ObjectOutput oo = new ObjectOutputStream(baos);
+
+ oo.writeObject(fac);
+
+ ObjectInput in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+
+ KerberosHadoopFileSystemFactory fac2 = (KerberosHadoopFileSystemFactory)in.readObject();
+
+ assertEquals(fac.getUri(), fac2.getUri());
+ Assert.assertArrayEquals(fac.getConfigPaths(), fac2.getConfigPaths());
+ assertEquals(fac.getKeyTab(), fac2.getKeyTab());
+ assertEquals(fac.getKeyTabPrincipal(), fac2.getKeyTabPrincipal());
+ assertEquals(fac.getReloginInterval(), fac2.getReloginInterval());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/008c8cd3/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 9092f32..acd255c 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -35,6 +35,7 @@ import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.client.hadoop.HadoopClientProtocolEmbeddedSelfTest;
import org.apache.ignite.client.hadoop.HadoopClientProtocolSelfTest;
+import org.apache.ignite.hadoop.fs.KerberosHadoopFileSystemFactorySelfTest;
import org.apache.ignite.igfs.Hadoop1OverIgfsDualAsyncTest;
import org.apache.ignite.igfs.Hadoop1OverIgfsDualSyncTest;
import org.apache.ignite.igfs.HadoopFIleSystemFactorySelfTest;
@@ -101,6 +102,8 @@ public class IgniteHadoopTestSuite extends TestSuite {
TestSuite suite = new TestSuite("Ignite Hadoop MR Test Suite");
+ suite.addTest(new TestSuite(ldr.loadClass(KerberosHadoopFileSystemFactorySelfTest.class.getName())));
+
suite.addTest(new TestSuite(ldr.loadClass(HadoopSnappyTest.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(HadoopSnappyFullMapReduceTest.class.getName())));
[05/22] ignite git commit: ignite-2587 Fixed continuous query
notifications in offheap mode and BinaryObjectOffheapImpl usage.
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index 27edb0c..e6bfd87 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -56,6 +57,7 @@ import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
@@ -70,6 +72,7 @@ import static javax.cache.event.EventType.CREATED;
import static javax.cache.event.EventType.EXPIRED;
import static javax.cache.event.EventType.REMOVED;
import static javax.cache.event.EventType.UPDATED;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -79,7 +82,7 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED;
*/
public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAbstractTest {
/** */
- private static volatile List<CacheEntryEvent<? extends Integer, ? extends Integer>> evts;
+ private static volatile List<CacheEntryEvent<?, ?>> evts;
/** */
private static volatile CountDownLatch evtsLatch;
@@ -91,7 +94,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
private Integer lastKey = 0;
/** */
- private CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg;
+ private CacheEntryListenerConfiguration<Object, Object> lsnrCfg;
+
+ /** */
+ private boolean useObjects;
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@@ -103,9 +109,18 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
cfg.setEagerTtl(eagerTtl());
+ cfg.setMemoryMode(memoryMode());
+
return cfg;
}
+ /**
+ * @return Cache memory mode.
+ */
+ protected CacheMemoryMode memoryMode() {
+ return ONHEAP_TIERED;
+ }
+
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
@@ -129,9 +144,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
* @throws Exception If failed.
*/
public void testExceptionIgnored() throws Exception {
- CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
- new Factory<CacheEntryListener<Integer, Integer>>() {
- @Override public CacheEntryListener<Integer, Integer> create() {
+ CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+ new Factory<CacheEntryListener<Object, Object>>() {
+ @Override public CacheEntryListener<Object, Object> create() {
return new ExceptionListener();
}
},
@@ -140,7 +155,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
false
);
- IgniteCache<Integer, Integer> cache = jcache();
+ IgniteCache<Object, Object> cache = jcache();
cache.registerCacheEntryListener(lsnrCfg);
@@ -158,13 +173,13 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
}
lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
- new Factory<CacheEntryListener<Integer, Integer>>() {
- @Override public CacheEntryListener<Integer, Integer> create() {
+ new Factory<CacheEntryListener<Object, Object>>() {
+ @Override public CacheEntryListener<Object, Object> create() {
return new CreateUpdateRemoveExpireListener();
}
},
- new Factory<CacheEntryEventSerializableFilter<? super Integer, ? super Integer>>() {
- @Override public CacheEntryEventSerializableFilter<? super Integer, ? super Integer> create() {
+ new Factory<CacheEntryEventSerializableFilter<Object, Object>>() {
+ @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
return new ExceptionFilter();
}
},
@@ -192,9 +207,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
* @throws Exception If failed.
*/
public void testNoOldValue() throws Exception {
- CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
- new Factory<CacheEntryListener<Integer, Integer>>() {
- @Override public CacheEntryListener<Integer, Integer> create() {
+ CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+ new Factory<CacheEntryListener<Object, Object>>() {
+ @Override public CacheEntryListener<Object, Object> create() {
return new CreateUpdateRemoveExpireListener();
}
},
@@ -203,7 +218,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
true
);
- IgniteCache<Integer, Integer> cache = jcache();
+ IgniteCache<Object, Object> cache = jcache();
try {
for (Integer key : keys()) {
@@ -222,21 +237,30 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
* @throws Exception If failed.
*/
+ public void testSynchronousEventsObjectKeyValue() throws Exception {
+ useObjects = true;
+
+ testSynchronousEvents();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testSynchronousEvents() throws Exception {
- final CacheEntryCreatedListener<Integer, Integer> lsnr = new CreateUpdateRemoveExpireListener() {
- @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ final CacheEntryCreatedListener<Object, Object> lsnr = new CreateUpdateRemoveExpireListener() {
+ @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) {
super.onRemoved(evts);
awaitLatch();
}
- @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
super.onCreated(evts);
awaitLatch();
}
- @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
super.onUpdated(evts);
awaitLatch();
@@ -252,9 +276,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
}
};
- CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
- new Factory<CacheEntryListener<Integer, Integer>>() {
- @Override public CacheEntryListener<Integer, Integer> create() {
+ CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+ new Factory<CacheEntryListener<Object, Object>>() {
+ @Override public CacheEntryListener<Object, Object> create() {
return lsnr;
}
},
@@ -263,7 +287,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
true
);
- IgniteCache<Integer, Integer> cache = jcache();
+ IgniteCache<Object, Object> cache = jcache();
cache.registerCacheEntryListener(lsnrCfg);
@@ -299,7 +323,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
if (!eagerTtl()) {
U.sleep(1100);
- assertNull(primaryCache(key, cache.getName()).get(key));
+ assertNull(primaryCache(key, cache.getName()).get(key(key)));
evtsLatch.await(5000, MILLISECONDS);
@@ -378,13 +402,13 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
final CyclicBarrier barrier = new CyclicBarrier(THREADS);
- final IgniteCache<Integer, Integer> cache = jcache(0);
+ final IgniteCache<Object, Object> cache = jcache(0);
GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
- CacheEntryListenerConfiguration<Integer, Integer> cfg = new MutableCacheEntryListenerConfiguration<>(
- new Factory<CacheEntryListener<Integer, Integer>>() {
- @Override public CacheEntryListener<Integer, Integer> create() {
+ CacheEntryListenerConfiguration<Object, Object> cfg = new MutableCacheEntryListenerConfiguration<>(
+ new Factory<CacheEntryListener<Object, Object>>() {
+ @Override public CacheEntryListener<Object, Object> create() {
return new CreateUpdateRemoveExpireListener();
}
},
@@ -441,9 +465,13 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
* @param expEvts Expected events number.
* @throws Exception If failed.
*/
- private void syncEvent(Integer key, Integer val, IgniteCache<Integer, Integer> cache, int expEvts)
+ private void syncEvent(
+ Integer key,
+ Integer val,
+ IgniteCache<Object, Object> cache,
+ int expEvts)
throws Exception {
- evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<? extends Integer, ? extends Integer>>());
+ evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>());
evtsLatch = new CountDownLatch(expEvts);
@@ -466,9 +494,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
});
if (val != null)
- cache.put(key, val);
+ cache.put(key(key), value(val));
else
- cache.remove(key);
+ cache.remove(key(key));
done.set(true);
@@ -480,15 +508,45 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
}
/**
+ * @param key Integer key.
+ * @return Key instance.
+ */
+ private Object key(Integer key) {
+ assert key != null;
+
+ return useObjects ? new ListenerTestKey(key) : key;
+ }
+
+ /**
+ * @param val Integer value.
+ * @return Value instance.
+ */
+ private Object value(Integer val) {
+ if (val == null)
+ return null;
+
+ return useObjects ? new ListenerTestValue(val) : val;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testEventsObjectKeyValue() throws Exception {
+ useObjects = true;
+
+ testEvents();
+ }
+
+ /**
* @throws Exception If failed.
*/
public void testEvents() throws Exception {
- IgniteCache<Integer, Integer> cache = jcache();
+ IgniteCache<Object, Object> cache = jcache();
- Map<Integer, Integer> vals = new HashMap<>();
+ Map<Object, Object> vals = new HashMap<>();
for (int i = 0; i < 100; i++)
- vals.put(i + 2_000_000, i);
+ vals.put(key(i + 2_000_000), value(i));
cache.putAll(vals); // Put some data in cache to make sure events are not generated for existing entries.
@@ -518,7 +576,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
checkEvents(cache, new CreateUpdateRemoveExpireListenerFactory(), key, true, true, true, true);
}
- CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+ CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
new CreateUpdateRemoveExpireListenerFactory(),
new TestFilterFactory(),
true,
@@ -551,7 +609,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
- private void checkListenerOnStart(Map<Integer, Integer> vals) throws Exception {
+ private void checkListenerOnStart(Map<Object, Object> vals) throws Exception {
lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
new CreateUpdateRemoveExpireListenerFactory(),
null,
@@ -564,7 +622,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
try {
awaitPartitionMapExchange();
- IgniteCache<Integer, Integer> cache = grid.cache(null);
+ IgniteCache<Object, Object> cache = grid.cache(null);
Integer key = Integer.MAX_VALUE;
@@ -588,7 +646,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
try {
awaitPartitionMapExchange();
- IgniteCache<Integer, Integer> cache = grid.cache(null);
+ IgniteCache<Object, Object> cache = grid.cache(null);
log.info("Check filter for listener in configuration.");
@@ -613,14 +671,14 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
* @throws Exception If failed.
*/
private void checkEvents(
- final IgniteCache<Integer, Integer> cache,
- final Factory<CacheEntryListener<Integer, Integer>> lsnrFactory,
+ final IgniteCache<Object, Object> cache,
+ final Factory<CacheEntryListener<Object, Object>> lsnrFactory,
Integer key,
boolean create,
boolean update,
boolean rmv,
boolean expire) throws Exception {
- CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+ CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
lsnrFactory,
null,
true,
@@ -642,8 +700,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
* @param vals Values in cache.
* @throws Exception If failed.
*/
- private void checkFilter(final IgniteCache<Integer, Integer> cache, Map<Integer, Integer> vals) throws Exception {
- evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<? extends Integer, ? extends Integer>>());
+ private void checkFilter(final IgniteCache<Object, Object> cache, Map<Object, Object> vals) throws Exception {
+ evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>());
final int expEvts = (vals.size() / 2) * 4; // Remove, create, update and expire for half of modified entries.
@@ -653,16 +711,18 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
cache.putAll(vals);
- final Map<Integer, Integer> newVals = new HashMap<>();
+ final Map<Object, Object> newVals = new HashMap<>();
- for (Integer key : vals.keySet())
- newVals.put(key, -1);
+ for (Object key : vals.keySet())
+ newVals.put(key, value(-1));
cache.withExpiryPolicy(new ModifiedExpiryPolicy(new Duration(MILLISECONDS, 500))).putAll(newVals);
+ U.sleep(1000);
+
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
- for (Integer key : newVals.keySet()) {
+ for (Object key : newVals.keySet()) {
if (primaryCache(key, cache.getName()).get(key) != null)
return false;
}
@@ -675,13 +735,20 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
assertEquals(expEvts, evts.size());
- Set<Integer> rmvd = new HashSet<>();
- Set<Integer> created = new HashSet<>();
- Set<Integer> updated = new HashSet<>();
- Set<Integer> expired = new HashSet<>();
+ Set<Object> rmvd = new HashSet<>();
+ Set<Object> created = new HashSet<>();
+ Set<Object> updated = new HashSet<>();
+ Set<Object> expired = new HashSet<>();
+
+ for (CacheEntryEvent<?, ?> evt : evts) {
+ Integer key;
+
+ if (useObjects)
+ key = ((ListenerTestKey)evt.getKey()).key;
+ else
+ key = (Integer)evt.getKey();
- for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) {
- assertTrue(evt.getKey() % 2 == 0);
+ assertTrue(key % 2 == 0);
assertTrue(vals.keySet().contains(evt.getKey()));
@@ -707,7 +774,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
break;
case UPDATED:
- assertEquals(-1, (int)evt.getValue());
+ assertEquals(value(-1), evt.getValue());
assertEquals(vals.get(evt.getKey()), evt.getOldValue());
@@ -722,7 +789,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
case EXPIRED:
assertNull(evt.getValue());
- assertEquals(-1, (int)evt.getOldValue());
+ assertEquals(value(-1), evt.getOldValue());
assertTrue(rmvd.contains(evt.getKey()));
@@ -757,8 +824,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
* @throws Exception If failed.
*/
private void checkEvents(
- final IgniteCache<Integer, Integer> cache,
- final CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg,
+ final IgniteCache<Object, Object> cache,
+ final CacheEntryListenerConfiguration<Object, Object> lsnrCfg,
Integer key,
boolean create,
boolean update,
@@ -789,64 +856,64 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
if (expire)
expEvts += 2;
- evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<? extends Integer, ? extends Integer>>());
+ evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>());
evtsLatch = new CountDownLatch(expEvts);
- cache.put(key, 0);
+ cache.put(key(key), value(0));
for (int i = 0; i < UPDATES; i++) {
if (i % 2 == 0)
- cache.put(key, i + 1);
+ cache.put(key(key), value(i + 1));
else
- cache.invoke(key, new EntrySetValueProcessor(i + 1));
+ cache.invoke(key(key), new EntrySetValueProcessor(value(i + 1)));
}
// Invoke processor does not update value, should not trigger event.
- assertEquals(String.valueOf(UPDATES), cache.invoke(key, new EntryToStringProcessor()));
+ assertEquals(String.valueOf(UPDATES), cache.invoke(key(key), new EntryToStringProcessor()));
- assertFalse(cache.putIfAbsent(key, -1));
+ assertFalse(cache.putIfAbsent(key(key), value(-1)));
- assertFalse(cache.remove(key, -1));
+ assertFalse(cache.remove(key(key), value(-1)));
- assertTrue(cache.remove(key));
+ assertTrue(cache.remove(key(key)));
- IgniteCache<Integer, Integer> expirePlcCache =
+ IgniteCache<Object, Object> expirePlcCache =
cache.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(MILLISECONDS, 100)));
- expirePlcCache.put(key, 10);
+ expirePlcCache.put(key(key), value(10));
U.sleep(700);
if (!eagerTtl())
- assertNull(primaryCache(key, cache.getName()).get(key)); // Provoke expire event if eager ttl is disabled.
+ assertNull(primaryCache(key, cache.getName()).get(key(key))); // Provoke expire event if eager ttl is disabled.
- IgniteCache<Integer, Integer> cache1 = cache;
+ IgniteCache<Object, Object> cache1 = cache;
if (gridCount() > 1)
cache1 = jcache(1); // Do updates from another node.
- cache1.put(key, 1);
+ cache1.put(key(key), value(1));
- cache1.put(key, 2);
+ cache1.put(key(key), value(2));
- assertTrue(cache1.remove(key));
+ assertTrue(cache1.remove(key(key)));
- IgniteCache<Integer, Integer> expirePlcCache1 =
+ IgniteCache<Object, Object> expirePlcCache1 =
cache1.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(MILLISECONDS, 100)));
- expirePlcCache1.put(key, 20);
+ expirePlcCache1.put(key(key), value(20));
U.sleep(200);
if (!eagerTtl())
- assertNull(primaryCache(key, cache.getName()).get(key)); // Provoke expire event if eager ttl is disabled.
+ assertNull(primaryCache(key, cache.getName()).get(key(key))); // Provoke expire event if eager ttl is disabled.
evtsLatch.await(5000, MILLISECONDS);
assertEquals(expEvts, evts.size());
- Iterator<CacheEntryEvent<? extends Integer, ? extends Integer>> iter = evts.iterator();
+ Iterator<CacheEntryEvent<?, ?>> iter = evts.iterator();
if (create)
checkEvent(iter, key, CREATED, 0, null);
@@ -886,11 +953,11 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
cache.deregisterCacheEntryListener(lsnrCfg);
- cache.put(key, 1);
+ cache.put(key(key), value(1));
- cache.put(key, 2);
+ cache.put(key(key), value(2));
- assertTrue(cache.remove(key));
+ assertTrue(cache.remove(key(key)));
U.sleep(500); // Sleep some time to ensure listener was really removed.
@@ -908,26 +975,26 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
* @param expVal Expected value.
* @param expOld Expected old value.
*/
- private void checkEvent(Iterator<CacheEntryEvent<? extends Integer, ? extends Integer>> iter,
+ private void checkEvent(Iterator<CacheEntryEvent<?, ?>> iter,
Integer expKey,
EventType expType,
@Nullable Integer expVal,
@Nullable Integer expOld) {
assertTrue(iter.hasNext());
- CacheEntryEvent<? extends Integer, ? extends Integer> evt = iter.next();
+ CacheEntryEvent<?, ?> evt = iter.next();
iter.remove();
assertTrue(evt.getSource() instanceof IgniteCacheProxy);
- assertEquals(expKey, evt.getKey());
+ assertEquals(key(expKey), evt.getKey());
assertEquals(expType, evt.getEventType());
- assertEquals(expVal, evt.getValue());
+ assertEquals(value(expVal), evt.getValue());
- assertEquals(expOld, evt.getOldValue());
+ assertEquals(value(expOld), evt.getOldValue());
if (expOld == null)
assertFalse(evt.isOldValueAvailable());
@@ -977,7 +1044,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
* @param evt Event.
*/
- private static void onEvent(CacheEntryEvent<? extends Integer, ? extends Integer> evt) {
+ private static void onEvent(CacheEntryEvent<?, ?> evt) {
// System.out.println("Received event [evt=" + evt + ", thread=" + Thread.currentThread().getName() + ']');
assertNotNull(evt);
@@ -993,9 +1060,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class CreateUpdateRemoveExpireListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+ private static class CreateUpdateRemoveExpireListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
/** {@inheritDoc} */
- @Override public CacheEntryListener<Integer, Integer> create() {
+ @Override public CacheEntryListener<Object, Object> create() {
return new CreateUpdateRemoveExpireListener();
}
}
@@ -1003,9 +1070,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class NoOpCreateUpdateListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+ private static class NoOpCreateUpdateListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
/** {@inheritDoc} */
- @Override public CacheEntryListener<Integer, Integer> create() {
+ @Override public CacheEntryListener<Object, Object> create() {
return new NoOpCreateUpdateListener();
}
}
@@ -1013,9 +1080,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class CreateUpdateListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+ private static class CreateUpdateListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
/** {@inheritDoc} */
- @Override public CacheEntryListener<Integer, Integer> create() {
+ @Override public CacheEntryListener<Object, Object> create() {
return new CreateUpdateListener();
}
}
@@ -1023,9 +1090,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class CreateListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+ private static class CreateListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
/** {@inheritDoc} */
- @Override public CacheEntryListener<Integer, Integer> create() {
+ @Override public CacheEntryListener<Object, Object> create() {
return new CreateListener();
}
}
@@ -1033,9 +1100,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class RemoveListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+ private static class RemoveListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
/** {@inheritDoc} */
- @Override public CacheEntryListener<Integer, Integer> create() {
+ @Override public CacheEntryListener<Object, Object> create() {
return new RemoveListener();
}
}
@@ -1043,9 +1110,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class UpdateListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+ private static class UpdateListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
/** {@inheritDoc} */
- @Override public CacheEntryListener<Integer, Integer> create() {
+ @Override public CacheEntryListener<Object, Object> create() {
return new UpdateListener();
}
}
@@ -1053,9 +1120,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class ExpireListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+ private static class ExpireListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
/** {@inheritDoc} */
- @Override public CacheEntryListener<Integer, Integer> create() {
+ @Override public CacheEntryListener<Object, Object> create() {
return new ExpireListener();
}
}
@@ -1063,9 +1130,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class TestFilterFactory implements Factory<CacheEntryEventSerializableFilter<Integer, Integer>> {
+ private static class TestFilterFactory implements Factory<CacheEntryEventSerializableFilter<Object, Object>> {
/** {@inheritDoc} */
- @Override public CacheEntryEventSerializableFilter<Integer, Integer> create() {
+ @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
return new TestFilter();
}
}
@@ -1073,10 +1140,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class CreateListener implements CacheEntryCreatedListener<Integer, Integer> {
+ private static class CreateListener implements CacheEntryCreatedListener<Object, Object> {
/** {@inheritDoc} */
- @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
- for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+ @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts)
onEvent(evt);
}
}
@@ -1084,10 +1151,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class UpdateListener implements CacheEntryUpdatedListener<Integer, Integer> {
+ private static class UpdateListener implements CacheEntryUpdatedListener<Object, Object> {
/** {@inheritDoc} */
- @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
- for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts)
onEvent(evt);
}
}
@@ -1095,10 +1162,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class RemoveListener implements CacheEntryRemovedListener<Integer, Integer> {
+ private static class RemoveListener implements CacheEntryRemovedListener<Object, Object> {
/** {@inheritDoc} */
- @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
- for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+ @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts)
onEvent(evt);
}
}
@@ -1106,10 +1173,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class ExpireListener implements CacheEntryExpiredListener<Integer, Integer> {
+ private static class ExpireListener implements CacheEntryExpiredListener<Object, Object> {
/** {@inheritDoc} */
- @Override public void onExpired(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
- for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+ @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts)
onEvent(evt);
}
}
@@ -1117,32 +1184,39 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class TestFilter implements CacheEntryEventSerializableFilter<Integer, Integer> {
+ private static class TestFilter implements CacheEntryEventSerializableFilter<Object, Object> {
/** {@inheritDoc} */
- @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) {
+ @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
assert evt != null;
assert evt.getSource() != null : evt;
assert evt.getEventType() != null : evt;
assert evt.getKey() != null : evt;
- return evt.getKey() % 2 == 0;
+ Integer key;
+
+ if (evt.getKey() instanceof ListenerTestKey)
+ key = ((ListenerTestKey)evt.getKey()).key;
+ else
+ key = (Integer)evt.getKey();
+
+ return key % 2 == 0;
}
}
/**
*
*/
- private static class CreateUpdateListener implements CacheEntryCreatedListener<Integer, Integer>,
- CacheEntryUpdatedListener<Integer, Integer> {
+ private static class CreateUpdateListener implements CacheEntryCreatedListener<Object, Object>,
+ CacheEntryUpdatedListener<Object, Object> {
/** {@inheritDoc} */
- @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
- for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+ @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts)
onEvent(evt);
}
/** {@inheritDoc} */
- @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
- for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts)
onEvent(evt);
}
}
@@ -1150,11 +1224,11 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class NoOpCreateUpdateListener implements CacheEntryCreatedListener<Integer, Integer>,
- CacheEntryUpdatedListener<Integer, Integer> {
+ private static class NoOpCreateUpdateListener implements CacheEntryCreatedListener<Object, Object>,
+ CacheEntryUpdatedListener<Object, Object> {
/** {@inheritDoc} */
- @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
- for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) {
+ @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts) {
assertNotNull(evt);
assertNotNull(evt.getSource());
assertNotNull(evt.getEventType());
@@ -1163,8 +1237,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
}
/** {@inheritDoc} */
- @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
- for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts) {
assertNotNull(evt);
assertNotNull(evt.getSource());
assertNotNull(evt.getEventType());
@@ -1177,16 +1251,16 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
*
*/
private static class CreateUpdateRemoveExpireListener extends CreateUpdateListener
- implements CacheEntryRemovedListener<Integer, Integer>, CacheEntryExpiredListener<Integer, Integer> {
+ implements CacheEntryRemovedListener<Object, Object>, CacheEntryExpiredListener<Object, Object> {
/** {@inheritDoc} */
- @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
- for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+ @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts)
onEvent(evt);
}
/** {@inheritDoc} */
- @Override public void onExpired(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
- for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+ @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts)
onEvent(evt);
}
}
@@ -1194,9 +1268,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class ExceptionFilter implements CacheEntryEventSerializableFilter<Integer, Integer> {
+ private static class ExceptionFilter implements CacheEntryEventSerializableFilter<Object, Object> {
/** {@inheritDoc} */
- @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) {
+ @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
throw new RuntimeException("Test filter error.");
}
}
@@ -1205,24 +1279,24 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
*
*/
private static class ExceptionListener extends CreateUpdateListener
- implements CacheEntryRemovedListener<Integer, Integer>, CacheEntryExpiredListener<Integer, Integer> {
+ implements CacheEntryRemovedListener<Object, Object>, CacheEntryExpiredListener<Object, Object> {
/** {@inheritDoc} */
- @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
error();
}
/** {@inheritDoc} */
- @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
error();
}
/** {@inheritDoc} */
- @Override public void onExpired(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts) {
error();
}
/** {@inheritDoc} */
- @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) {
error();
}
@@ -1237,10 +1311,12 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- protected static class EntryToStringProcessor implements EntryProcessor<Integer, Integer, String> {
+ protected static class EntryToStringProcessor implements EntryProcessor<Object, Object, String> {
/** {@inheritDoc} */
- @Override public String process(MutableEntry<Integer, Integer> e, Object... arguments)
- throws EntryProcessorException {
+ @Override public String process(MutableEntry<Object, Object> e, Object... args) {
+ if (e.getValue() instanceof ListenerTestValue)
+ return String.valueOf(((ListenerTestValue)e.getValue()).val1);
+
return String.valueOf(e.getValue());
}
@@ -1253,19 +1329,19 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- protected static class EntrySetValueProcessor implements EntryProcessor<Integer, Integer, String> {
+ protected static class EntrySetValueProcessor implements EntryProcessor<Object, Object, String> {
/** */
- private Integer val;
+ private Object val;
/**
* @param val Value to set.
*/
- public EntrySetValueProcessor(Integer val) {
+ public EntrySetValueProcessor(Object val) {
this.val = val;
}
/** {@inheritDoc} */
- @Override public String process(MutableEntry<Integer, Integer> e, Object... arguments)
+ @Override public String process(MutableEntry<Object, Object> e, Object... args)
throws EntryProcessorException {
e.setValue(val);
@@ -1307,4 +1383,88 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
// No-op.
}
}
+
+ /**
+ *
+ */
+ static class ListenerTestKey implements Serializable {
+ /** */
+ private final Integer key;
+
+ /**
+ * @param key Key.
+ */
+ public ListenerTestKey(Integer key) {
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ ListenerTestKey that = (ListenerTestKey)o;
+
+ return key.equals(that.key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return key.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ListenerTestKey.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class ListenerTestValue implements Serializable {
+ /** */
+ private final Integer val1;
+
+ /** */
+ private final String val2;
+
+ /**
+ * @param val Value.
+ */
+ public ListenerTestValue(Integer val) {
+ this.val1 = val;
+ this.val2 = String.valueOf(val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ ListenerTestValue that = (ListenerTestValue) o;
+
+ return val1.equals(that.val1) && val2.equals(that.val2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = val1.hashCode();
+
+ res = 31 * res + val2.hashCode();
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ListenerTestValue.class, this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapTieredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapTieredTest.java
new file mode 100644
index 0000000..69efb84
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapTieredTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+
+/**
+ *
+ */
+public class IgniteCacheEntryListenerAtomicOffheapTieredTest extends IgniteCacheEntryListenerAtomicTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return OFFHEAP_TIERED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapValuesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapValuesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapValuesTest.java
new file mode 100644
index 0000000..23b1bc0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapValuesTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheEntryListenerAtomicOffheapValuesTest extends IgniteCacheEntryListenerAtomicTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return OFFHEAP_VALUES;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapTieredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapTieredTest.java
new file mode 100644
index 0000000..d552195
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapTieredTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+
+/**
+ *
+ */
+public class IgniteCacheEntryListenerTxOffheapTieredTest extends IgniteCacheEntryListenerTxTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return OFFHEAP_TIERED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapValuesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapValuesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapValuesTest.java
new file mode 100644
index 0000000..32555c8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapValuesTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
+
+/**
+ *
+ */
+public class IgniteCacheEntryListenerTxOffheapValuesTest extends IgniteCacheEntryListenerTxTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return OFFHEAP_VALUES;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
index a9e43d4..41725e7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
@@ -48,6 +48,7 @@ public class IgniteCacheEntryListenerTxTest extends IgniteCacheEntryListenerAbst
return null;
}
+ /** {@inheritDoc} */
@Override public void testEvents(){
fail("https://issues.apache.org/jira/browse/IGNITE-1600");
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 1c65f9b..a42f056 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -55,6 +55,7 @@ import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.query.ContinuousQuery;
@@ -97,6 +98,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheMemoryMode.*;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -142,6 +144,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
ccfg.setBackups(backups);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setNearConfiguration(nearCacheConfiguration());
+ ccfg.setMemoryMode(memoryMode());
cfg.setCacheConfiguration(ccfg);
@@ -151,6 +154,13 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
}
/**
+ * @return Cache memory mode.
+ */
+ protected CacheMemoryMode memoryMode() {
+ return ONHEAP_TIERED;
+ }
+
+ /**
* @return Near cache configuration.
*/
protected NearCacheConfiguration nearCacheConfiguration() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.java
new file mode 100644
index 0000000..cc8590d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest
+ extends CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return OFFHEAP_TIERED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxOffheapTieredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxOffheapTieredTest.java
new file mode 100644
index 0000000..cae06c3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxOffheapTieredTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverTxOffheapTieredTest extends CacheContinuousQueryFailoverTxSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return OFFHEAP_TIERED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
new file mode 100644
index 0000000..d9b2091
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
@@ -0,0 +1,684 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int NODES = 5;
+
+ /** */
+ private static final int KEYS = 10;
+
+ /** */
+ private static final int VALS = 10;
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(NODES - 1);
+
+ client = true;
+
+ startGrid(NODES - 1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomic() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicReplicated() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicOffheapValues() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ ATOMIC,
+ OFFHEAP_VALUES,
+ false);
+
+ testContinuousQuery(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicOffheapTiered() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ ATOMIC,
+ OFFHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicNoBackups() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 0,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTx() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxReplicated() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxOffheapValues() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ OFFHEAP_VALUES,
+ false);
+
+ testContinuousQuery(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxOffheapTiered() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ OFFHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxNoBackups() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 0,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg);
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ * @throws Exception If failed.
+ */
+ private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg) throws Exception {
+ ignite(0).createCache(ccfg);
+
+ try {
+ IgniteCache<Object, Object> cache = ignite(NODES - 1).cache(ccfg.getName());
+
+ long seed = System.currentTimeMillis();
+
+ Random rnd = new Random(seed);
+
+ log.info("Random seed: " + seed);
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue =
+ new ArrayBlockingQueue<>(10_000);
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts) {
+ // System.out.println("Event: " + evt);
+
+ evtsQueue.add(evt);
+ }
+ }
+ });
+
+ QueryCursor<?> cur = cache.query(qry);
+
+ ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
+
+ try {
+ for (int i = 0; i < 1000; i++) {
+ if (i % 100 == 0)
+ log.info("Iteration: " + i);
+
+ randomUpdate(rnd, evtsQueue, expData, cache);
+ }
+ }
+ finally {
+ cur.close();
+ }
+ }
+ finally {
+ ignite(0).destroyCache(ccfg.getName());
+ }
+ }
+
+ /**
+ * @param rnd Random generator.
+ * @param evtsQueue Events queue.
+ * @param expData Expected cache data.
+ * @param cache Cache.
+ * @throws Exception If failed.
+ */
+ private void randomUpdate(
+ Random rnd,
+ BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue,
+ ConcurrentMap<Object, Object> expData,
+ IgniteCache<Object, Object> cache)
+ throws Exception {
+ Object key = new QueryTestKey(rnd.nextInt(KEYS));
+ Object newVal = value(rnd);
+ Object oldVal = expData.get(key);
+
+ int op = rnd.nextInt(11);
+
+ // log.info("Random operation [key=" + key + ", op=" + op + ']');
+
+ switch (op) {
+ case 0: {
+ cache.put(key, newVal);
+
+ waitEvent(evtsQueue, key, newVal, oldVal);
+
+ expData.put(key, newVal);
+
+ break;
+ }
+
+ case 1: {
+ cache.getAndPut(key, newVal);
+
+ waitEvent(evtsQueue, key, newVal, oldVal);
+
+ expData.put(key, newVal);
+
+ break;
+ }
+
+ case 2: {
+ cache.remove(key);
+
+ waitEvent(evtsQueue, key, null, oldVal);
+
+ expData.remove(key);
+
+ break;
+ }
+
+ case 3: {
+ cache.getAndRemove(key);
+
+ waitEvent(evtsQueue, key, null, oldVal);
+
+ expData.remove(key);
+
+ break;
+ }
+
+ case 4: {
+ cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+
+ waitEvent(evtsQueue, key, newVal, oldVal);
+
+ expData.put(key, newVal);
+
+ break;
+ }
+
+ case 5: {
+ cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
+
+ waitEvent(evtsQueue, key, null, oldVal);
+
+ expData.remove(key);
+
+ break;
+ }
+
+ case 6: {
+ cache.putIfAbsent(key, newVal);
+
+ if (oldVal == null) {
+ waitEvent(evtsQueue, key, newVal, null);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueue);
+
+ break;
+ }
+
+ case 7: {
+ cache.getAndPutIfAbsent(key, newVal);
+
+ if (oldVal == null) {
+ waitEvent(evtsQueue, key, newVal, null);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueue);
+
+ break;
+ }
+
+ case 8: {
+ cache.replace(key, newVal);
+
+ if (oldVal != null) {
+ waitEvent(evtsQueue, key, newVal, oldVal);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueue);
+
+ break;
+ }
+
+ case 9: {
+ cache.getAndReplace(key, newVal);
+
+ if (oldVal != null) {
+ waitEvent(evtsQueue, key, newVal, oldVal);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueue);
+
+ break;
+ }
+
+ case 10: {
+ if (oldVal != null) {
+ Object replaceVal = value(rnd);
+
+ boolean success = replaceVal.equals(oldVal);
+
+ if (success) {
+ cache.replace(key, replaceVal, newVal);
+
+ waitEvent(evtsQueue, key, newVal, oldVal);
+
+ expData.put(key, newVal);
+ }
+ else {
+ cache.replace(key, replaceVal, newVal);
+
+ checkNoEvent(evtsQueue);
+ }
+ }
+ else {
+ cache.replace(key, value(rnd), newVal);
+
+ checkNoEvent(evtsQueue);
+ }
+
+ break;
+ }
+
+ default:
+ fail();
+ }
+ }
+
+ /**
+ * @param rnd Random generator.
+ * @return Cache value.
+ */
+ private static Object value(Random rnd) {
+ return new QueryTestValue(rnd.nextInt(VALS));
+ }
+
+ /**
+ * @param evtsQueue Event queue.
+ * @param key Key.
+ * @param val Value.
+ * @param oldVal Old value.
+ * @throws Exception If failed.
+ */
+ private void waitEvent(BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue,
+ Object key, Object val, Object oldVal) throws Exception {
+ if (val == null && oldVal == null) {
+ checkNoEvent(evtsQueue);
+
+ return;
+ }
+
+ CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
+
+ assertNotNull("Failed to wait for event [key=" + key +
+ ", val=" + val +
+ ", oldVal=" + oldVal + ']', evt);
+ assertEquals(key, evt.getKey());
+ assertEquals(val, evt.getValue());
+ assertEquals(oldVal, evt.getOldValue());
+ }
+
+ /**
+ * @param evtsQueue Event queue.
+ * @throws Exception If failed.
+ */
+ private void checkNoEvent(BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue) throws Exception {
+ CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS);
+
+ assertNull(evt);
+ }
+
+ /**
+ *
+ * @param cacheMode Cache mode.
+ * @param backups Number of backups.
+ * @param atomicityMode Cache atomicity mode.
+ * @param memoryMode Cache memory mode.
+ * @param store If {@code true} configures dummy cache store.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Object, Object> cacheConfiguration(
+ CacheMode cacheMode,
+ int backups,
+ CacheAtomicityMode atomicityMode,
+ CacheMemoryMode memoryMode,
+ boolean store) {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setCacheMode(cacheMode);
+ ccfg.setMemoryMode(memoryMode);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+ if (cacheMode == PARTITIONED)
+ ccfg.setBackups(backups);
+
+ if (store) {
+ ccfg.setCacheStoreFactory(new TestStoreFactory());
+ ccfg.setReadThrough(true);
+ ccfg.setWriteThrough(true);
+ }
+
+ return ccfg;
+ }
+
+ /**
+ *
+ */
+ private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> {
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public CacheStore<Object, Object> create() {
+ return new CacheStoreAdapter() {
+ @Override public Object load(Object key) throws CacheLoaderException {
+ return null;
+ }
+
+ @Override public void write(Cache.Entry entry) throws CacheWriterException {
+ // No-op.
+ }
+
+ @Override public void delete(Object key) throws CacheWriterException {
+ // No-op.
+ }
+ };
+ }
+ }
+
+ /**
+ *
+ */
+ static class QueryTestKey implements Serializable {
+ /** */
+ private final Integer key;
+
+ /**
+ * @param key Key.
+ */
+ public QueryTestKey(Integer key) {
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ QueryTestKey that = (QueryTestKey)o;
+
+ return key.equals(that.key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return key.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(QueryTestKey.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class QueryTestValue implements Serializable {
+ /** */
+ private final Integer val1;
+
+ /** */
+ private final String val2;
+
+ /**
+ * @param val Value.
+ */
+ public QueryTestValue(Integer val) {
+ this.val1 = val;
+ this.val2 = String.valueOf(val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ QueryTestValue that = (QueryTestValue) o;
+
+ return val1.equals(that.val1) && val2.equals(that.val2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = val1.hashCode();
+
+ res = 31 * res + val2.hashCode();
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(QueryTestValue.class, this);
+ }
+ }
+ /**
+ *
+ */
+ protected static class EntrySetValueProcessor implements EntryProcessor<Object, Object, Object> {
+ /** */
+ private Object val;
+
+ /** */
+ private boolean retOld;
+
+ /**
+ * @param val Value to set.
+ * @param retOld Return old value flag.
+ */
+ public EntrySetValueProcessor(Object val, boolean retOld) {
+ this.val = val;
+ this.retOld = retOld;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object process(MutableEntry<Object, Object> e, Object... args) {
+ Object old = retOld ? e.getValue() : null;
+
+ if (val != null)
+ e.setValue(val);
+ else
+ e.remove();
+
+ return old;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(EntrySetValueProcessor.class, this);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index 5abb98d..dbe282e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -40,6 +40,7 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.query.ContinuousQuery;
@@ -73,9 +74,9 @@ import org.jsr166.ConcurrentHashMap8;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
-
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -117,6 +118,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
cacheCfg.setReadThrough(true);
cacheCfg.setWriteThrough(true);
cacheCfg.setLoadPreviousValue(true);
+ cacheCfg.setMemoryMode(memoryMode());
cfg.setCacheConfiguration(cacheCfg);
}
@@ -135,6 +137,13 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
}
/**
+ * @return Cache memory mode.
+ */
+ protected CacheMemoryMode memoryMode() {
+ return ONHEAP_TIERED;
+ }
+
+ /**
* @return Peer class loading enabled flag.
*/
protected boolean peerClassLoadingEnabled() {
@@ -393,8 +402,8 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
}
});
- try (QueryCursor<Cache.Entry<Integer, Integer>> query2 = cache1.query(qry2);
- QueryCursor<Cache.Entry<Integer, Integer>> query1 = cache.query(qry1)) {
+ try (QueryCursor<Cache.Entry<Integer, Integer>> qryCur2 = cache1.query(qry2);
+ QueryCursor<Cache.Entry<Integer, Integer>> qryCur1 = cache.query(qry1)) {
for (int i = 0; i < gridCount(); i++) {
IgniteCache<Object, Object> cache0 = grid(i).cache(null);
@@ -448,7 +457,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
}
});
- QueryCursor<Cache.Entry<Integer, Integer>> query = cache.query(qry);
+ QueryCursor<Cache.Entry<Integer, Integer>> qryCur = cache.query(qry);
for (int key = 0; key < keyCnt; key++)
cache.put(key, key);
@@ -461,7 +470,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
}, 2000L);
}
finally {
- query.close();
+ qryCur.close();
}
}
else {
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapTieredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapTieredTest.java
new file mode 100644
index 0000000..d6948e2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapTieredTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+
+/**
+ *
+ */
+public class GridCacheContinuousQueryAtomicOffheapTieredTest extends GridCacheContinuousQueryAtomicSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return OFFHEAP_TIERED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapValuesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapValuesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapValuesTest.java
new file mode 100644
index 0000000..4002435
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapValuesTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
+
+/**
+ *
+ */
+public class GridCacheContinuousQueryAtomicOffheapValuesTest extends GridCacheContinuousQueryAtomicSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return OFFHEAP_VALUES;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapTieredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapTieredTest.java
new file mode 100644
index 0000000..bcba7b6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapTieredTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+
+/**
+ *
+ */
+public class GridCacheContinuousQueryTxOffheapTieredTest extends GridCacheContinuousQueryTxSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return OFFHEAP_TIERED;
+ }
+}
[22/22] ignite git commit: ignite-2407
Posted by sb...@apache.org.
ignite-2407
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6c291043
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6c291043
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6c291043
Branch: refs/heads/ignite-2407
Commit: 6c29104396b29c2752855e3cd3c5f5b35e6a1304
Parents: 112e76e
Author: sboikov <sb...@gridgain.com>
Authored: Fri Feb 12 16:08:51 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Feb 12 16:08:51 2016 +0300
----------------------------------------------------------------------
.../distributed/IgniteCachePrimarySyncTest.java | 45 +++++++++++++++++---
1 file changed, 39 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c291043/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePrimarySyncTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePrimarySyncTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePrimarySyncTest.java
index cef73fd..183d4bf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePrimarySyncTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePrimarySyncTest.java
@@ -21,16 +21,26 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
/**
*
@@ -94,21 +104,36 @@ public class IgniteCachePrimarySyncTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testPutGet() throws Exception {
- checkPutGet(ignite(SRVS).cache("cache1"));
+ Ignite ignite = ignite(SRVS);
- checkPutGet(ignite(SRVS).cache("cache2"));
+ checkPutGet(ignite.cache("cache1"), null, null, null);
+
+ checkPutGet(ignite.cache("cache2"), null, null, null);
+
+ checkPutGet(ignite.cache("cache2"), ignite.transactions(), OPTIMISTIC, REPEATABLE_READ);
+
+ checkPutGet(ignite.cache("cache2"), ignite.transactions(), OPTIMISTIC, SERIALIZABLE);
+
+ checkPutGet(ignite.cache("cache2"), ignite.transactions(), PESSIMISTIC, READ_COMMITTED);
}
/**
* @param cache Cache.
+ * @param txs Transactions instance if explicit transaction should be used.
+ * @param concurrency Transaction concurrency.
+ * @param isolation Transaction isolation.
*/
- private void checkPutGet(IgniteCache<Object, Object> cache) {
+ private void checkPutGet(IgniteCache<Object, Object> cache,
+ @Nullable IgniteTransactions txs,
+ TransactionConcurrency concurrency,
+ TransactionIsolation isolation) {
log.info("Check cache: " + cache.getName());
final int KEYS = 50;
for (int iter = 0; iter < 100; iter++) {
- log.info("Iteration: " + iter);
+ if (iter % 10 == 0)
+ log.info("Iteration: " + iter);
for (int i = 0; i < KEYS; i++)
cache.remove(i);
@@ -118,12 +143,20 @@ public class IgniteCachePrimarySyncTest extends GridCommonAbstractTest {
for (int i = 0; i < KEYS; i++)
putBatch.put(i, iter);
- cache.putAll(putBatch);
+ if (txs != null) {
+ try (Transaction tx = txs.txStart(concurrency, isolation)) {
+ cache.putAll(putBatch);
+
+ tx.commit();
+ }
+ }
+ else
+ cache.putAll(putBatch);
Map<Object, Object> vals = cache.getAll(putBatch.keySet());
for (int i = 0; i < KEYS; i++)
- assertNotNull(vals.get(i));
+ assertEquals(iter, vals.get(i));
}
}
}
[19/22] ignite git commit: IGNITE-2468
Posted by sb...@apache.org.
IGNITE-2468
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/725d6cb5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/725d6cb5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/725d6cb5
Branch: refs/heads/ignite-2407
Commit: 725d6cb557684ac8f31dfde8f5fcb4ddb95a18dd
Parents: 763bf57
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Feb 12 14:08:25 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Feb 12 14:08:25 2016 +0300
----------------------------------------------------------------------
.../internal/GridMessageListenHandler.java | 16 ++
.../continuous/GridContinuousProcessor.java | 50 +++--
...eClientReconnectContinuousProcessorTest.java | 32 +++-
...IgniteCacheContinuousQueryReconnectTest.java | 192 +++++++++++++++++++
.../tcp/TcpClientDiscoverySpiSelfTest.java | 2 +-
.../IgniteCacheQuerySelfTestSuite.java | 2 +
6 files changed, 279 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 13aeb54..bf81944 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -83,6 +83,22 @@ public class GridMessageListenHandler implements GridContinuousHandler {
this.pred = pred;
}
+ /**
+ *
+ * @param orig Handler to be copied.
+ */
+ public GridMessageListenHandler(GridMessageListenHandler orig) {
+ assert orig != null;
+
+ this.clsName = orig.clsName;
+ this.depInfo = orig.depInfo;
+ this.pred = orig.pred;
+ this.predBytes = orig.predBytes;
+ this.topic = orig.topic;
+ this.topicBytes = orig.topicBytes;
+ this.depEnabled = false;
+ }
+
/** {@inheritDoc} */
@Override public boolean isEvents() {
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 0218897..496f820 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -40,6 +40,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridMessageListenHandler;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -428,11 +429,17 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
ctx.resource().injectGeneric(item.prjPred);
// Register handler only if local node passes projection predicate.
- if (item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) {
+ if ((item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) &&
+ !locInfos.containsKey(item.routineId)) {
if (registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval,
item.autoUnsubscribe, false))
item.hnd.onListenerRegistered(item.routineId, ctx);
}
+
+ if (!item.autoUnsubscribe)
+ // Register routine locally.
+ locInfos.putIfAbsent(item.routineId, new LocalRoutineInfo(
+ item.prjPred, item.hnd, item.bufSize, item.interval, item.autoUnsubscribe));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to register continuous handler.", e);
@@ -854,6 +861,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e);
}
+ GridContinuousHandler hnd0 = hnd instanceof GridMessageListenHandler ?
+ new GridMessageListenHandler((GridMessageListenHandler)hnd) :
+ hnd;
+
if (node.isClient()) {
Map<UUID, LocalRoutineInfo> clientRoutineMap = clientInfos.get(node.id());
@@ -866,7 +877,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
clientRoutineMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(),
- hnd,
+ hnd0,
data.bufferSize(),
data.interval(),
data.autoUnsubscribe()));
@@ -881,10 +892,16 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (prjPred != null)
ctx.resource().injectGeneric(prjPred);
- if (prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) {
- registered = registerHandler(node.id(), routineId, hnd, data.bufferSize(), data.interval(),
+ if ((prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) &&
+ !locInfos.containsKey(routineId)) {
+ registered = registerHandler(node.id(), routineId, hnd0, data.bufferSize(), data.interval(),
data.autoUnsubscribe(), false);
}
+
+ if (!data.autoUnsubscribe())
+ // Register routine locally.
+ locInfos.putIfAbsent(routineId, new LocalRoutineInfo(
+ prjPred, hnd0, data.bufferSize(), data.interval(), data.autoUnsubscribe()));
}
catch (IgniteCheckedException e) {
err = e;
@@ -894,11 +911,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
// Load partition counters.
- if (hnd.isQuery()) {
+ if (hnd0.isQuery()) {
GridCacheProcessor proc = ctx.cache();
if (proc != null) {
- GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName());
+ GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
if (cache != null && !cache.isLocal()) {
Map<Integer, Long> cntrs = cache.context().topology().updateCounters();
@@ -912,7 +929,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
req.addError(ctx.localNodeId(), err);
if (registered)
- hnd.onListenerRegistered(routineId, ctx);
+ hnd0.onListenerRegistered(routineId, ctx);
}
/**
@@ -1095,22 +1112,31 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
*/
@SuppressWarnings("TooBroadScope")
private void unregisterRemote(UUID routineId) {
- RemoteRoutineInfo info;
+ RemoteRoutineInfo remote;
+ LocalRoutineInfo loc;
stopLock.lock();
try {
- info = rmtInfos.remove(routineId);
+ remote = rmtInfos.remove(routineId);
- if (info == null)
+ loc = locInfos.remove(routineId);
+
+ if (remote == null)
stopped.add(routineId);
}
finally {
stopLock.unlock();
}
- if (info != null)
- unregisterHandler(routineId, info.hnd, false);
+ if (remote != null)
+ unregisterHandler(routineId, remote.hnd, false);
+ else {
+ assert loc != null;
+
+ // Removes routine at node started it when stopRoutine called from another node.
+ unregisterHandler(routineId, loc.hnd, false);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
index dc94c96..4c44adc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
@@ -113,7 +113,21 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
/**
* @throws Exception If failed.
*/
- public void testMessageListenerReconnect() throws Exception {
+ public void testMessageListenerReconnectAndStopFromServer() throws Exception {
+ testMessageListenerReconnect(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMessageListenerReconnectAndStopFromClient() throws Exception {
+ testMessageListenerReconnect(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void testMessageListenerReconnect(boolean stopFromClient) throws Exception {
Ignite client = grid(serverCount());
assertTrue(client.cluster().localNode().isClient());
@@ -166,7 +180,7 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
log.info("Stop listen, should not get remote messages anymore.");
- client.message().stopRemoteListen(opId);
+ (stopFromClient ? client : srv).message().stopRemoteListen(opId);
srv.message().send(topic, "msg3");
@@ -175,6 +189,20 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
assertFalse(latch.await(3000, MILLISECONDS));
+
+ log.info("New nodes should not register stopped listeners.");
+
+ startGrid(serverCount() + 1);
+
+ srv.message().send(topic, "msg4");
+
+ locLsnr.latch = new CountDownLatch(1);
+ latch = new CountDownLatch(1);
+
+ assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
+ assertFalse(latch.await(3000, MILLISECONDS));
+
+ stopGrid(serverCount() + 1);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryReconnectTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryReconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryReconnectTest.java
new file mode 100644
index 0000000..b1d8a49
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryReconnectTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteClientDisconnectedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheContinuousQueryReconnectTest extends GridCommonAbstractTest implements Serializable {
+ /** */
+ final private static AtomicInteger cnt = new AtomicInteger();
+
+ /** */
+ private volatile boolean isClient = false;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setAtomicityMode(atomicMode());
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setBackups(1);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ if (isClient)
+ cfg.setClientMode(true);
+
+ return cfg;
+ }
+
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @return Atomic mode.
+ */
+ protected CacheAtomicityMode atomicMode() {
+ return ATOMIC;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectServer() throws Exception {
+ testReconnect(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectClient() throws Exception {
+ testReconnect(true);
+ }
+
+ /**
+ *
+ */
+ private void putAndCheck(IgniteCache<Object, Object> cache, int diff) {
+ cnt.set(0);
+
+ cache.put(1, "1");
+
+ assertEquals(diff, cnt.get());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void testReconnect(boolean clientQuery) throws Exception {
+ Ignite srv1 = startGrid(0);
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+ @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+ // No-op.
+ }
+ });
+
+ qry.setAutoUnsubscribe(false);
+
+ qry.setRemoteFilter(new CacheEntryEventSerializableFilter<Object, Object>() {
+ @Override public boolean evaluate(CacheEntryEvent<?, ?> event) throws CacheEntryListenerException {
+ cnt.incrementAndGet();
+
+ return true;
+ }
+ });
+
+ isClient = true;
+
+ Ignite client = startGrid(1);
+
+ isClient = false;
+
+ IgniteCache<Object, Object> cache1 = srv1.cache(null);
+ IgniteCache<Object, Object> clCache = client.cache(null);
+
+ putAndCheck(clCache, 0); // 0 remote listeners.
+
+ QueryCursor<Cache.Entry<Object, Object>> cur = (clientQuery ? clCache : cache1).query(qry);
+
+ putAndCheck(clCache, 1); // 1 remote listener.
+
+ final Ignite srv2 = startGrid(2);
+
+ putAndCheck(clCache, 2); // 2 remote listeners.
+
+ stopGrid(0);
+
+ while (true) {
+ try {
+ clCache.get(1);
+
+ break;
+ }
+ catch (IgniteClientDisconnectedException e) {
+ e.reconnectFuture().get(); // Wait for reconnect.
+
+ }
+ catch (CacheException e) {
+ if (e.getCause() instanceof IgniteClientDisconnectedException)
+ ((IgniteClientDisconnectedException)e.getCause()).reconnectFuture().get(); // Wait for reconnect.
+ }
+ }
+
+ putAndCheck(clCache, 1); // 1 remote listener.
+
+ Ignite srv3 = startGrid(3);
+
+ putAndCheck(clCache, 2); // 2 remote listeners.
+
+ stopGrid(1); // Client node.
+
+ isClient = true;
+
+ client = startGrid(4);
+
+ isClient = false;
+
+ clCache = client.cache(null);
+
+ putAndCheck(clCache, 2); // 2 remote listeners.
+
+ Ignite srv4 = startGrid(5);
+
+ putAndCheck(clCache, 3); // 3 remote listeners.
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 030c653..7debb41 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -2050,7 +2050,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override public boolean apply(UUID uuid, Object msg) {
- X.println(">>> Received [locNodeId=" + ignite.configuration().getNodeId() + ", msg=" + msg + ']');
+ X.println(">>> Received [node=" + ignite.name() + ", msg=" + msg + ']');
msgLatch.countDown();
http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 3cd4579..cecb8ad 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -100,6 +100,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTxReconnectTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryReconnectTest;
import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryAtomicSelfTest;
import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryLocalSelfTest;
import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryPartitionedSelfTest;
@@ -200,6 +201,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(GridCacheContinuousQueryPartitionTxOneNodeTest.class);
suite.addTestSuite(GridCacheContinuousQueryPartitionAtomicOneNodeTest.class);
suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class);
+ suite.addTestSuite(IgniteCacheContinuousQueryReconnectTest.class);
suite.addTestSuite(IgniteCacheContinuousQueryClientReconnectTest.class);
suite.addTestSuite(IgniteCacheContinuousQueryClientTxReconnectTest.class);
suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest.class);
[02/22] ignite git commit: IGNITE-864 Disabled version check for
visorcmd
Posted by sb...@apache.org.
IGNITE-864 Disabled version check for visorcmd
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c67e2ea5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c67e2ea5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c67e2ea5
Branch: refs/heads/ignite-2407
Commit: c67e2ea5bfb7b73d76744d54047eb1b692e0907a
Parents: b7475f0
Author: Andrey <an...@gridgain.com>
Authored: Wed Feb 10 16:56:23 2016 +0700
Committer: Andrey <an...@gridgain.com>
Committed: Wed Feb 10 16:56:23 2016 +0700
----------------------------------------------------------------------
.../src/main/scala/org/apache/ignite/visor/visor.scala | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c67e2ea5/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala
index a4eed69..335eb9f 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala
@@ -17,6 +17,7 @@
package org.apache.ignite.visor
+import org.apache.ignite.IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER
import org.apache.ignite._
import org.apache.ignite.cluster.{ClusterGroup, ClusterGroupEmptyException, ClusterMetrics, ClusterNode}
import org.apache.ignite.events.EventType._
@@ -303,6 +304,9 @@ object visor extends VisorTag {
}
})
+ // Make sure visor starts without version checker print.
+ System.setProperty(IGNITE_UPDATE_NOTIFIER, "false")
+
addHelp(
name = "mlist",
shortInfo = "Prints Visor console memory variables.",
[15/22] ignite git commit: added missing file header to
org.apache.ignite.hadoop.fs.KerberosHadoopFileSystemFactorySelfTest
Posted by sb...@apache.org.
added missing file header to org.apache.ignite.hadoop.fs.KerberosHadoopFileSystemFactorySelfTest
Signed-off-by: Anton Vinogradov <av...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0491a5f8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0491a5f8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0491a5f8
Branch: refs/heads/ignite-2407
Commit: 0491a5f814365723b53ff1820aad76f646307f13
Parents: a9937a6
Author: iveselovskiy <iv...@gridgain.com>
Authored: Wed Feb 10 18:54:36 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Wed Feb 10 19:31:41 2016 +0300
----------------------------------------------------------------------
.../KerberosHadoopFileSystemFactorySelfTest.java | 17 +++++++++++++++++
1 file changed, 17 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0491a5f8/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java
index 8fb1612..ea7fa99 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.ignite.hadoop.fs;
import java.io.ByteArrayInputStream;
[06/22] ignite git commit: ignite-2587 Fixed continuous query
notifications in offheap mode and BinaryObjectOffheapImpl usage.
Posted by sb...@apache.org.
ignite-2587 Fixed continuous query notifications in offheap mode and BinaryObjectOffheapImpl usage.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4c05fc02
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4c05fc02
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4c05fc02
Branch: refs/heads/ignite-2407
Commit: 4c05fc0254f446ef040f5d22a066a0d4916a589e
Parents: 0b47d5c
Author: sboikov <sb...@gridgain.com>
Authored: Wed Feb 10 14:07:40 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Feb 10 14:07:40 2016 +0300
----------------------------------------------------------------------
.../processors/cache/CacheLazyEntry.java | 3 +
.../processors/cache/GridCacheContext.java | 4 +-
.../processors/cache/GridCacheMapEntry.java | 118 +++-
.../binary/CacheObjectBinaryProcessorImpl.java | 6 +-
.../dht/atomic/GridDhtAtomicCache.java | 79 ++-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 85 ++-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 38 +-
.../cache/query/GridCacheQueryManager.java | 30 +-
.../continuous/CacheContinuousQueryHandler.java | 3 +-
.../CacheContinuousQueryListener.java | 2 +-
.../continuous/CacheContinuousQueryManager.java | 120 +++-
.../continuous/GridContinuousProcessor.java | 16 +-
.../IgniteCacheEntryListenerAbstractTest.java | 454 ++++++++----
...cheEntryListenerAtomicOffheapTieredTest.java | 32 +
...cheEntryListenerAtomicOffheapValuesTest.java | 32 +
...teCacheEntryListenerTxOffheapTieredTest.java | 32 +
...teCacheEntryListenerTxOffheapValuesTest.java | 32 +
.../cache/IgniteCacheEntryListenerTxTest.java | 1 +
...ContinuousQueryFailoverAbstractSelfTest.java | 10 +
...tomicPrimaryWriteOrderOffheapTieredTest.java | 33 +
...tinuousQueryFailoverTxOffheapTieredTest.java | 32 +
...acheContinuousQueryRandomOperationsTest.java | 684 +++++++++++++++++++
...ridCacheContinuousQueryAbstractSelfTest.java | 19 +-
...eContinuousQueryAtomicOffheapTieredTest.java | 32 +
...eContinuousQueryAtomicOffheapValuesTest.java | 32 +
...CacheContinuousQueryTxOffheapTieredTest.java | 32 +
...CacheContinuousQueryTxOffheapValuesTest.java | 32 +
.../junits/common/GridCommonAbstractTest.java | 2 +-
.../ignite/testsuites/IgniteCacheTestSuite.java | 8 +
.../IgniteCacheQuerySelfTestSuite.java | 14 +
30 files changed, 1743 insertions(+), 274 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
index 05a6fef..30933e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
@@ -50,6 +50,7 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> {
* @param cctx Cache context.
* @param keyObj Key cache object.
* @param valObj Cache object value.
+ * @param keepBinary Keep binary flag.
*/
public CacheLazyEntry(GridCacheContext cctx, KeyCacheObject keyObj, CacheObject valObj, boolean keepBinary) {
this.cctx = cctx;
@@ -61,6 +62,7 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> {
/**
* @param keyObj Key cache object.
* @param val Value.
+ * @param keepBinary Keep binary flag.
* @param cctx Cache context.
*/
public CacheLazyEntry(GridCacheContext cctx, KeyCacheObject keyObj, V val, boolean keepBinary) {
@@ -75,6 +77,7 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> {
* @param keyObj Key cache object.
* @param key Key value.
* @param valObj Cache object
+ * @param keepBinary Keep binary flag.
* @param val Cache value.
*/
public CacheLazyEntry(GridCacheContext<K, V> ctx,
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index e875df0..5729959 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -1729,10 +1729,10 @@ public class GridCacheContext<K, V> implements Externalizable {
* @return Heap-based object.
*/
@Nullable public <T> T unwrapTemporary(@Nullable Object obj) {
- if (!offheapTiered())
+ if (!useOffheapEntry())
return (T)obj;
- return (T) cacheObjects().unwrapTemporary(this, obj);
+ return (T)cacheObjects().unwrapTemporary(this, obj);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index ae40295..9336e0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.Cache;
@@ -44,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtr
import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
import org.apache.ignite.internal.processors.cache.extras.GridCacheTtlEntryExtras;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -1122,7 +1124,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
assert newVer != null : "Failed to get write version for tx: " + tx;
- old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : this.val;
+ boolean internal = isInternal() || !context().userCache();
+
+ Map<UUID, CacheContinuousQueryListener> lsnrCol =
+ notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null;
+
+ old = (retval || intercept || lsnrCol != null) ?
+ rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : this.val;
if (intercept) {
val0 = CU.value(val, cctx, false);
@@ -1206,10 +1214,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
keepBinary);
}
- if (cctx.isLocal() || cctx.isReplicated() ||
- (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local())))
- cctx.continuousQueries().onEntryUpdated(key, val, old, isInternal() || !context().userCache(),
- partition(), tx.local(), false, updateCntr0, topVer);
+ if (lsnrCol != null) {
+ cctx.continuousQueries().onEntryUpdated(
+ lsnrCol,
+ key,
+ val,
+ old,
+ internal,
+ partition(),
+ tx.local(),
+ false,
+ updateCntr0,
+ topVer);
+ }
cctx.dataStructures().onEntryUpdated(key, false, keepBinary);
}
@@ -1304,7 +1321,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
newVer = explicitVer != null ? explicitVer : tx == null ? nextVersion() : tx.writeVersion();
- old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : val;
+ boolean internal = isInternal() || !context().userCache();
+
+ Map<UUID, CacheContinuousQueryListener> lsnrCol =
+ notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null;
+
+ old = (retval || intercept || lsnrCol != null) ?
+ rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val;
if (intercept) {
entry0 = new CacheLazyEntry(cctx, key, old, keepBinary);
@@ -1388,10 +1411,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
keepBinary);
}
- if (cctx.isLocal() || cctx.isReplicated() ||
- (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local())))
- cctx.continuousQueries().onEntryUpdated(key, null, old, isInternal()
- || !context().userCache(),partition(), tx.local(), false, updateCntr0, topVer);
+ if (lsnrCol != null) {
+ cctx.continuousQueries().onEntryUpdated(
+ lsnrCol,
+ key,
+ null,
+ old,
+ internal,
+ partition(),
+ tx.local(),
+ false,
+ updateCntr0,
+ topVer);
+ }
cctx.dataStructures().onEntryUpdated(key, true, keepBinary);
@@ -1440,6 +1472,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
return new GridCacheUpdateTxResult(false, null);
}
+ /**
+ * @param tx Transaction.
+ * @return {@code True} if should notify continuous query manager.
+ */
+ private boolean notifyContinuousQueries(@Nullable IgniteInternalTx tx) {
+ return cctx.isLocal() ||
+ cctx.isReplicated() ||
+ (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local()));
+ }
+
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public GridTuple3<Boolean, Object, EntryProcessorResult<Object>> innerUpdateLocal(
@@ -1470,7 +1512,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
EntryProcessorResult<Object> invokeRes = null;
synchronized (this) {
- boolean needVal = retval || intercept || op == GridCacheOperation.TRANSFORM || !F.isEmpty(filter);
+ boolean internal = isInternal() || !context().userCache();
+
+ Map<UUID, CacheContinuousQueryListener> lsnrCol =
+ cctx.continuousQueries().updateListeners(internal, false);
+
+ boolean needVal = retval ||
+ intercept ||
+ op == GridCacheOperation.TRANSFORM ||
+ !F.isEmpty(filter) ||
+ lsnrCol != null;
checkObsolete();
@@ -1479,7 +1530,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
unswap(retval);
// Possibly get old value form store.
- old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val;
+ old = needVal ? rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val;
boolean readFromStore = false;
@@ -1731,11 +1782,20 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (res)
updateMetrics(op, metrics);
- if (!isNear()) {
+ if (lsnrCol != null) {
long updateCntr = nextPartCounter(AffinityTopologyVersion.NONE);
- cctx.continuousQueries().onEntryUpdated(key, val, old, isInternal() || !context().userCache(),
- partition(), true, false, updateCntr, AffinityTopologyVersion.NONE);
+ cctx.continuousQueries().onEntryUpdated(
+ lsnrCol,
+ key,
+ val,
+ old,
+ internal,
+ partition(),
+ true,
+ false,
+ updateCntr,
+ AffinityTopologyVersion.NONE);
}
cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary);
@@ -1997,8 +2057,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (updateCntr != null)
updateCntr0 = updateCntr;
- cctx.continuousQueries().onEntryUpdated(key, evtVal, prevVal, isInternal()
- || !context().userCache(), partition(), primary, false, updateCntr0, topVer);
+ cctx.continuousQueries().onEntryUpdated(
+ key,
+ evtVal,
+ prevVal,
+ isInternal() || !context().userCache(),
+ partition(),
+ primary,
+ false,
+ updateCntr0,
+ topVer);
}
return new GridCacheUpdateAtomicResult(false,
@@ -2019,7 +2087,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
// Prepare old value and value bytes.
- oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val;
+ oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val;
// Possibly read value from store.
boolean readFromStore = false;
@@ -2937,7 +3005,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/**
* @return {@code True} if values should be stored off-heap.
*/
- protected boolean isOffHeapValuesOnly() {
+ protected final boolean isOffHeapValuesOnly() {
return cctx.config().getMemoryMode() == CacheMemoryMode.OFFHEAP_VALUES;
}
@@ -3236,8 +3304,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
drReplicate(drType, val, ver);
if (!skipQryNtf) {
- cctx.continuousQueries().onEntryUpdated(key, val, null, this.isInternal()
- || !this.context().userCache(), this.partition(), true, preload, updateCntr, topVer);
+ cctx.continuousQueries().onEntryUpdated(
+ key,
+ val,
+ null,
+ this.isInternal() || !this.context().userCache(),
+ this.partition(),
+ true,
+ preload,
+ updateCntr,
+ topVer);
cctx.dataStructures().onEntryUpdated(key, false, true);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 0fef6f8..f091fc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -822,10 +822,10 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
Object val = unmarshal(valPtr, !tmp);
- if (val instanceof BinaryObjectOffheapImpl)
- return (BinaryObjectOffheapImpl)val;
+ if (val instanceof CacheObject)
+ return (CacheObject)val;
- return new CacheObjectImpl(val, null);
+ return toCacheObject(ctx.cacheObjectContext(), val, false);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 6c7bac5..fec61df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -76,6 +76,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
@@ -1992,6 +1993,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean intercept = ctx.config().getInterceptor() != null;
+ boolean initLsnrs = false;
+ Map<UUID, CacheContinuousQueryListener> lsnrs = null;
+ boolean internal = false;
+
// Avoid iterator creation.
for (int i = 0; i < keys.size(); i++) {
KeyCacheObject k = keys.get(i);
@@ -2006,6 +2011,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (entry == null)
continue;
+ if (!initLsnrs) {
+ internal = entry.isInternal() || !context().userCache();
+
+ lsnrs = ctx.continuousQueries().updateListeners(internal, false);
+
+ initLsnrs = true;
+ }
+
GridCacheVersion newConflictVer = req.conflictVersion(i);
long newConflictTtl = req.conflictTtl(i);
long newConflictExpireTime = req.conflictExpireTime(i);
@@ -2034,7 +2047,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
req.invokeArguments(),
primary && writeThrough() && !req.skipStore(),
!req.skipStore(),
- sndPrevVal || req.returnValue(),
+ lsnrs != null || sndPrevVal || req.returnValue(),
req.keepBinary(),
expiry,
true,
@@ -2061,6 +2074,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (dhtFut != null) {
+ dhtFut.listeners(lsnrs);
+
if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios.
GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult();
@@ -2097,10 +2112,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
"[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']');
}
}
- else if (!entry.isNear() && updRes.success()) {
- ctx.continuousQueries().onEntryUpdated(entry.key(), updRes.newValue(), updRes.oldValue(),
- entry.isInternal() || !context().userCache(), entry.partition(), primary, false,
- updRes.updateCounter(), topVer);
+ else if (lsnrs != null && updRes.success()) {
+ ctx.continuousQueries().onEntryUpdated(
+ lsnrs,
+ entry.key(),
+ updRes.newValue(),
+ updRes.oldValue(),
+ internal,
+ entry.partition(),
+ primary,
+ false,
+ updRes.updateCounter(),
+ topVer);
}
if (hasNear) {
@@ -2275,6 +2298,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean intercept = ctx.config().getInterceptor() != null;
+ boolean initLsnrs = false;
+ Map<UUID, CacheContinuousQueryListener> lsnrs = null;
+
// Avoid iterator creation.
for (int i = 0; i < entries.size(); i++) {
GridDhtCacheEntry entry = entries.get(i);
@@ -2308,6 +2334,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
}
+ if (!initLsnrs) {
+ lsnrs = ctx.continuousQueries().updateListeners(
+ entry.isInternal() || !context().userCache(),
+ false);
+
+ initLsnrs = true;
+ }
+
GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
ver,
node.id(),
@@ -2317,7 +2351,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
null,
/*write-through*/false,
/*read-through*/false,
- /*retval*/sndPrevVal,
+ /*retval*/sndPrevVal || lsnrs != null,
req.keepBinary(),
expiry,
/*event*/true,
@@ -2366,6 +2400,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (dhtFut != null) {
+ dhtFut.listeners(lsnrs);
+
EntryProcessor<Object, Object, Object> entryProcessor =
entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
@@ -2763,6 +2799,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
+ boolean initLsnrs = false;
+ Map<UUID, CacheContinuousQueryListener> lsnrs = null;
+ boolean internal = false;
+
for (int i = 0; i < req.size(); i++) {
KeyCacheObject key = req.key(i);
@@ -2785,6 +2825,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
long ttl = req.ttl(i);
long expireTime = req.conflictExpireTime(i);
+ if (!initLsnrs) {
+ internal = entry.isInternal() || !context().userCache();
+
+ lsnrs = ctx.continuousQueries().updateListeners(internal, false);
+
+ initLsnrs = true;
+ }
+
GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
ver,
nodeId,
@@ -2794,7 +2842,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
op == TRANSFORM ? req.invokeArguments() : null,
/*write-through*/false,
/*read-through*/false,
- /*retval*/false,
+ /*retval*/lsnrs != null,
req.keepBinary(),
/*expiry policy*/null,
/*event*/true,
@@ -2817,10 +2865,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (updRes.removeVersion() != null)
ctx.onDeferredDelete(entry, updRes.removeVersion());
- if (updRes.success() && !entry.isNear())
- ctx.continuousQueries().onEntryUpdated(entry.key(), updRes.newValue(),
- updRes.oldValue(), entry.isInternal() || !context().userCache(), entry.partition(),
- false, false, updRes.updateCounter(), req.topologyVersion());
+ if (lsnrs != null && updRes.success()) {
+ ctx.continuousQueries().onEntryUpdated(
+ lsnrs,
+ entry.key(),
+ updRes.newValue(),
+ updRes.oldValue(),
+ internal,
+ entry.partition(),
+ false,
+ false,
+ updRes.updateCounter(),
+ req.topologyVersion());
+ }
entry.onUnlock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 06c8441..58d704d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -102,6 +103,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
/** Response count. */
private volatile int resCnt;
+ /** */
+ private Map<UUID, CacheContinuousQueryListener> lsnrs;
+
/**
* @param cctx Cache context.
* @param completionCb Callback to invoke when future is completed.
@@ -136,6 +140,13 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
waitForExchange = !topLocked;
}
+ /**
+ * @param lsnrs Continuous query listeners.
+ */
+ void listeners(@Nullable Map<UUID, CacheContinuousQueryListener> lsnrs) {
+ this.lsnrs = lsnrs;
+ }
+
/** {@inheritDoc} */
@Override public IgniteUuid futureId() {
return futVer.asGridUuid();
@@ -215,6 +226,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
* @param ttl TTL (optional).
* @param conflictExpireTime Conflict expire time (optional).
* @param conflictVer Conflict version (optional).
+ * @param addPrevVal If {@code true} sends previous value to backups.
+ * @param prevVal Previous value.
* @param updateCntr Partition update counter.
*/
public void addWriteEntry(GridDhtCacheEntry entry,
@@ -270,13 +283,22 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
addPrevVal,
entry.partition(),
prevVal,
- updateCntr);
+ updateCntr,
+ lsnrs != null);
}
- else if (dhtNodes.size() == 1) {
+ else if (lsnrs != null && dhtNodes.size() == 1) {
try {
- cctx.continuousQueries().onEntryUpdated(entry.key(), val, prevVal,
- entry.key().internal() || !cctx.userCache(), entry.partition(), true, false,
- updateCntr, updateReq.topologyVersion());
+ cctx.continuousQueries().onEntryUpdated(
+ lsnrs,
+ entry.key(),
+ val,
+ prevVal,
+ entry.key().internal() || !cctx.userCache(),
+ entry.partition(),
+ true,
+ false,
+ updateCntr,
+ updateReq.topologyVersion());
}
catch (IgniteCheckedException e) {
U.warn(log, "Failed to send continuous query message. [key=" + entry.key() + ", newVal="
@@ -352,7 +374,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
cctx.mvcc().removeAtomicFuture(version());
if (err != null) {
- if (!mappings.isEmpty()) {
+ if (!mappings.isEmpty() && lsnrs != null) {
Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
@@ -362,7 +384,11 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
if (!hndKeys.contains(key)) {
updateRes.addFailedKey(key, err);
- cctx.continuousQueries().skipUpdateEvent(key, req.partitionId(i), req.updateCounter(i),
+ cctx.continuousQueries().skipUpdateEvent(
+ lsnrs,
+ key,
+ req.partitionId(i),
+ req.updateCounter(i),
updateReq.topologyVersion());
hndKeys.add(key);
@@ -378,27 +404,38 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
updateRes.addFailedKey(key, err);
}
else {
- Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
+ if (lsnrs != null) {
+ Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
- exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
- for (int i = 0; i < req.size(); i++) {
- KeyCacheObject key = req.key(i);
+ exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
+ for (int i = 0; i < req.size(); i++) {
+ KeyCacheObject key = req.key(i);
- if (!hndKeys.contains(key)) {
- try {
- cctx.continuousQueries().onEntryUpdated(key, req.value(i), req.localPreviousValue(i),
- key.internal() || !cctx.userCache(), req.partitionId(i), true, false,
- req.updateCounter(i), updateReq.topologyVersion());
- }
- catch (IgniteCheckedException e) {
- U.warn(log, "Failed to send continuous query message. [key=" + key + ", newVal="
- + req.value(i) + ", err=" + e + "]");
- }
+ if (!hndKeys.contains(key)) {
+ try {
+ cctx.continuousQueries().onEntryUpdated(
+ lsnrs,
+ key,
+ req.value(i),
+ req.localPreviousValue(i),
+ key.internal() || !cctx.userCache(),
+ req.partitionId(i),
+ true,
+ false,
+ req.updateCounter(i),
+ updateReq.topologyVersion());
+ }
+ catch (IgniteCheckedException e) {
+ U.warn(log, "Failed to send continuous query message. [key=" + key +
+ ", newVal=" + req.value(i) +
+ ", err=" + e + "]");
+ }
- hndKeys.add(key);
+ hndKeys.add(key);
- if (hndKeys.size() == keys.size())
- break exit;
+ if (hndKeys.size() == keys.size())
+ break exit;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 7cc276f..e417cdb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.IgniteCodeGeneratingFail;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -49,6 +50,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Lite dht cache backup update request.
*/
+@IgniteCodeGeneratingFail // Need add 'cleanup' call in 'writeTo' method.
public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements GridCacheDeployable {
/** */
private static final long serialVersionUID = 0L;
@@ -215,7 +217,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
keys = new ArrayList<>();
partIds = new ArrayList<>();
- locPrevVals = new ArrayList<>();
if (forceTransformBackups) {
entryProcessors = new ArrayList<>();
@@ -240,7 +241,10 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
* @param conflictExpireTime Conflict expire time (optional).
* @param conflictVer Conflict version (optional).
* @param addPrevVal If {@code true} adds previous value.
+ * @param partId Partition.
* @param prevVal Previous value.
+ * @param updateCntr Update counter.
+ * @param storeLocPrevVal If {@code true} stores previous value.
*/
public void addWriteValue(KeyCacheObject key,
@Nullable CacheObject val,
@@ -251,12 +255,18 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
boolean addPrevVal,
int partId,
@Nullable CacheObject prevVal,
- @Nullable Long updateIdx) {
+ @Nullable Long updateCntr,
+ boolean storeLocPrevVal) {
keys.add(key);
partIds.add(partId);
- locPrevVals.add(prevVal);
+ if (storeLocPrevVal) {
+ if (locPrevVals == null)
+ locPrevVals = new ArrayList<>();
+
+ locPrevVals.add(prevVal);
+ }
if (forceTransformBackups) {
assert entryProcessor != null;
@@ -273,11 +283,11 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
prevVals.add(prevVal);
}
- if (updateIdx != null) {
+ if (updateCntr != null) {
if (updateCntrs == null)
updateCntrs = new GridLongList();
- updateCntrs.add(updateIdx);
+ updateCntrs.add(updateCntr);
}
// In case there is no conflict, do not create the list.
@@ -521,6 +531,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
* @return Value.
*/
@Nullable public CacheObject localPreviousValue(int idx) {
+ assert locPrevVals != null;
+
return locPrevVals.get(idx);
}
@@ -849,6 +861,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
}
+ cleanup();
+
return true;
}
@@ -1048,6 +1062,20 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
return reader.afterMessageRead(GridDhtAtomicUpdateRequest.class);
}
+ /**
+ * Cleanup values not needed after message was sent.
+ */
+ private void cleanup() {
+ nearVals = null;
+ prevVals = null;
+
+ // Do not keep values if they are not needed for continuous query notification.
+ if (locPrevVals == null) {
+ vals = null;
+ locPrevVals = null;
+ }
+ }
+
/** {@inheritDoc} */
@Override public byte directType() {
return 38;
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 8f0cab7..0d8f795 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -1107,7 +1107,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
next = null;
while (it.hasNext()) {
- final LazySwapEntry e = new LazySwapEntry(it.next(), keepBinary);
+ final LazySwapEntry e = new LazySwapEntry(it.next());
if (filter != null) {
K key = (K)cctx.unwrapBinaryIfNeeded(e.key(), keepBinary);
@@ -2524,15 +2524,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
/** */
private final Map.Entry<byte[], byte[]> e;
- /** */
- private boolean keepBinary;
-
/**
* @param e Entry with
*/
- LazySwapEntry(Map.Entry<byte[], byte[]> e, boolean keepBinary) {
+ LazySwapEntry(Map.Entry<byte[], byte[]> e) {
this.e = e;
- this.keepBinary = keepBinary;
}
/** {@inheritDoc} */
@@ -2545,9 +2541,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
@Override protected V unmarshalValue() throws IgniteCheckedException {
IgniteBiTuple<byte[], Byte> t = GridCacheSwapEntryImpl.getValue(e.getValue());
- CacheObject obj = cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1());
-
- return (V)cctx.cacheObjectContext().unwrapBinaryIfNeeded(obj, keepBinary);
+ return (V)cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1());
}
/** {@inheritDoc} */
@@ -2597,13 +2591,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
@Override protected V unmarshalValue() throws IgniteCheckedException {
long ptr = GridCacheOffheapSwapEntry.valueAddress(valPtr.get1(), valPtr.get2());
- CacheObject obj = cctx.fromOffheap(ptr, false);
-
- V val = CU.value(obj, cctx, false);
-
- assert val != null;
-
- return val;
+ return (V)cctx.fromOffheap(ptr, false);
}
/** {@inheritDoc} */
@@ -2661,7 +2649,15 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
if (!filter.apply(key, val))
return null;
- return new IgniteBiTuple<>(e.key(), (V)cctx.unwrapTemporary(e.value()));
+ if (key instanceof CacheObject)
+ ((CacheObject)key).prepareMarshal(cctx.cacheObjectContext());
+
+ val = (V)cctx.unwrapTemporary(e.value());
+
+ if (val instanceof CacheObject)
+ ((CacheObject)val).prepareMarshal(cctx.cacheObjectContext());
+
+ return new IgniteBiTuple<>(e.key(), val);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 7e66ad3..cf9b439 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -882,8 +882,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
* @return Continuous query entry.
*/
private CacheContinuousQueryEntry skipEntry(CacheContinuousQueryEntry e) {
- if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1) {
-
+ if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1L) {
e.markFiltered();
return e;
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 86abbef..dce04de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -24,7 +24,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
/**
* Continuous query listener.
*/
-interface CacheContinuousQueryListener<K, V> {
+public interface CacheContinuousQueryListener<K, V> {
/**
* Query execution callback.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 0e4cb40..cc59989 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import static javax.cache.event.EventType.CREATED;
@@ -155,37 +156,102 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/**
+ * @param lsnrs Listeners to notify.
+ * @param key Entry key.
* @param partId Partition id.
* @param updCntr Updated counter.
* @param topVer Topology version.
*/
- public void skipUpdateEvent(KeyCacheObject key, int partId, long updCntr, AffinityTopologyVersion topVer) {
- if (lsnrCnt.get() > 0) {
- for (CacheContinuousQueryListener lsnr : lsnrs.values()) {
- CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
- cctx.cacheId(),
- UPDATED,
- key,
- null,
- null,
- lsnr.keepBinary(),
- partId,
- updCntr,
- topVer);
+ public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs,
+ KeyCacheObject key, int partId, long updCntr, AffinityTopologyVersion topVer) {
+ assert lsnrs != null;
- CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
- cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
+ for (CacheContinuousQueryListener lsnr : lsnrs.values()) {
+ CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
+ cctx.cacheId(),
+ UPDATED,
+ key,
+ null,
+ null,
+ lsnr.keepBinary(),
+ partId,
+ updCntr,
+ topVer);
- lsnr.skipUpdateEvent(evt, topVer);
- }
+ CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
+ cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
+
+ lsnr.skipUpdateEvent(evt, topVer);
}
}
/**
+ * @param internal Internal entry flag (internal key or not user cache).
+ * @param preload Whether update happened during preloading.
+ * @return Registered listeners.
+ */
+ @Nullable public Map<UUID, CacheContinuousQueryListener> updateListeners(
+ boolean internal,
+ boolean preload) {
+ if (preload && !internal)
+ return null;
+
+ ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrCol;
+
+ if (internal)
+ lsnrCol = intLsnrCnt.get() > 0 ? intLsnrs : null;
+ else
+ lsnrCol = lsnrCnt.get() > 0 ? lsnrs : null;
+
+ return F.isEmpty(lsnrCol) ? null : lsnrCol;
+ }
+
+ /**
+ * @param key Key.
+ * @param newVal New value.
+ * @param oldVal Old value.
+ * @param internal Internal entry (internal key or not user cache).
+ * @param partId Partition.
+ * @param primary {@code True} if called on primary node.
+ * @param preload Whether update happened during preloading.
+ * @param updateCntr Update counter.
+ * @param topVer Topology version.
+ * @throws IgniteCheckedException In case of error.
+ */
+ public void onEntryUpdated(
+ KeyCacheObject key,
+ CacheObject newVal,
+ CacheObject oldVal,
+ boolean internal,
+ int partId,
+ boolean primary,
+ boolean preload,
+ long updateCntr,
+ AffinityTopologyVersion topVer) throws IgniteCheckedException {
+ Map<UUID, CacheContinuousQueryListener> lsnrCol = updateListeners(internal, preload);
+
+ if (lsnrCol != null) {
+ onEntryUpdated(
+ lsnrCol,
+ key,
+ newVal,
+ oldVal,
+ internal,
+ partId,
+ primary,
+ preload,
+ updateCntr,
+ topVer);
+ }
+ }
+
+ /**
+ * @param lsnrCol Listeners to notify.
* @param key Key.
* @param newVal New value.
* @param oldVal Old value.
* @param internal Internal entry (internal key or not user cache),
+ * @param partId Partition.
* @param primary {@code True} if called on primary node.
* @param preload Whether update happened during preloading.
* @param updateCntr Update counter.
@@ -193,6 +259,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
* @throws IgniteCheckedException In case of error.
*/
public void onEntryUpdated(
+ Map<UUID, CacheContinuousQueryListener> lsnrCol,
KeyCacheObject key,
CacheObject newVal,
CacheObject oldVal,
@@ -205,25 +272,16 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
throws IgniteCheckedException
{
assert key != null;
-
- if (preload && !internal)
- return;
-
- ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrCol;
-
- if (internal)
- lsnrCol = intLsnrCnt.get() > 0 ? intLsnrs : null;
- else
- lsnrCol = lsnrCnt.get() > 0 ? lsnrs : null;
-
- if (F.isEmpty(lsnrCol))
- return;
+ assert lsnrCol != null;
boolean hasNewVal = newVal != null;
boolean hasOldVal = oldVal != null;
- if (!hasNewVal && !hasOldVal)
+ if (!hasNewVal && !hasOldVal) {
+ skipUpdateEvent(lsnrCol, key, partId, updateCntr, topVer);
+
return;
+ }
EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : UPDATED;
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 7c7e3e3..0218897 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -893,11 +894,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
// Load partition counters.
- if (hnd.isQuery() && ctx.cache() != null && ctx.cache().internalCache(hnd.cacheName()) != null) {
- Map<Integer, Long> cntrs = ctx.cache().internalCache(hnd.cacheName())
- .context().topology().updateCounters();
+ if (hnd.isQuery()) {
+ GridCacheProcessor proc = ctx.cache();
- req.addUpdateCounters(cntrs);
+ if (proc != null) {
+ GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName());
+
+ if (cache != null && !cache.isLocal()) {
+ Map<Integer, Long> cntrs = cache.context().topology().updateCounters();
+
+ req.addUpdateCounters(cntrs);
+ }
+ }
}
if (err != null)
[10/22] ignite git commit: Added test.
Posted by sb...@apache.org.
Added test.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5539cbad
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5539cbad
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5539cbad
Branch: refs/heads/ignite-2407
Commit: 5539cbadb81195c35889b71486b1e449b637d8c0
Parents: 16927ab
Author: sboikov <sb...@gridgain.com>
Authored: Wed Feb 10 15:32:03 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Feb 10 15:32:03 2016 +0300
----------------------------------------------------------------------
...niteCacheEntryListenerExpiredEventsTest.java | 202 +++++++++++++++++++
1 file changed, 202 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5539cbad/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerExpiredEventsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerExpiredEventsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerExpiredEventsTest.java
new file mode 100644
index 0000000..d9fdfac
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerExpiredEventsTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryExpiredListener;
+import javax.cache.event.CacheEntryListener;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ModifiedExpiryPolicy;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheEntryListenerExpiredEventsTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static AtomicInteger evtCntr;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrid(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testExpiredEventAtomic() throws Exception {
+ checkExpiredEvents(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testExpiredEventAtomicOffheap() throws Exception {
+ checkExpiredEvents(cacheConfiguration(PARTITIONED, ATOMIC, OFFHEAP_TIERED));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testExpiredEventTx() throws Exception {
+ checkExpiredEvents(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testExpiredEventTxOffheap() throws Exception {
+ checkExpiredEvents(cacheConfiguration(PARTITIONED, TRANSACTIONAL, OFFHEAP_TIERED));
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ * @throws Exception If failed.
+ */
+ private void checkExpiredEvents(CacheConfiguration<Object, Object> ccfg) throws Exception {
+ IgniteCache<Object, Object> cache = ignite(0).createCache(ccfg);
+
+ try {
+ evtCntr = new AtomicInteger();
+
+ CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+ new ExpiredListenerFactory(),
+ null,
+ true,
+ false
+ );
+
+ cache.registerCacheEntryListener(lsnrCfg);
+
+ IgniteCache<Object, Object> expiryCache =
+ cache.withExpiryPolicy(new ModifiedExpiryPolicy(new Duration(MILLISECONDS, 500)));
+
+ expiryCache.put(1, 1);
+
+ for (int i = 0; i < 10; i++)
+ cache.get(i);
+
+ boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return evtCntr.get() > 0;
+ }
+ }, 5000);
+
+ assertTrue(wait);
+
+ U.sleep(100);
+
+ assertEquals(1, evtCntr.get());
+ }
+ finally {
+ ignite(0).destroyCache(cache.getName());
+ }
+ }
+
+ /**
+ *
+ * @param cacheMode Cache mode.
+ * @param atomicityMode Cache atomicity mode.
+ * @param memoryMode Cache memory mode.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Object, Object> cacheConfiguration(
+ CacheMode cacheMode,
+ CacheAtomicityMode atomicityMode,
+ CacheMemoryMode memoryMode) {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setCacheMode(cacheMode);
+ ccfg.setMemoryMode(memoryMode);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+ if (cacheMode == PARTITIONED)
+ ccfg.setBackups(1);
+
+ return ccfg;
+ }
+
+ /**
+ *
+ */
+ private static class ExpiredListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
+ /** {@inheritDoc} */
+ @Override public CacheEntryListener<Object, Object> create() {
+ return new ExpiredListener();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ExpiredListener implements CacheEntryExpiredListener<Object, Object> {
+ /** {@inheritDoc} */
+ @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts)
+ evtCntr.incrementAndGet();
+ }
+ }
+}