You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/12 14:09:07 UTC

[01/22] ignite git commit: IGNITE-2575: Added validation of IGFS endpoint port value. This closes #469.

Repository: ignite
Updated Branches:
  refs/heads/ignite-2407 570de20d4 -> 6c2910439


IGNITE-2575: Added validation of IGFS endpoint port value. This closes #469.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b7475f09
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b7475f09
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b7475f09

Branch: refs/heads/ignite-2407
Commit: b7475f09b1727e5cc93681ee229b60ad8e188732
Parents: a4d8a04
Author: dkarachentsev <dk...@gridgain.com>
Authored: Wed Feb 10 12:38:43 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 10 12:38:43 2016 +0300

----------------------------------------------------------------------
 .../internal/processors/igfs/IgfsProcessor.java | 14 ++++++++++
 .../igfs/IgfsProcessorValidationSelfTest.java   | 27 ++++++++++++++++++++
 2 files changed, 41 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b7475f09/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
index 21446e1..1b60252 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
@@ -67,6 +67,12 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
     /** Null IGFS name. */
     private static final String NULL_NAME = UUID.randomUUID().toString();
 
+    /** Min available TCP port. */
+    private static final int MIN_TCP_PORT = 1;
+
+    /** Max available TCP port. */
+    private static final int MAX_TCP_PORT = 0xFFFF;
+
     /** Converts context to IGFS. */
     private static final IgniteClosure<IgfsContext,IgniteFileSystem> CTX_TO_IGFS = new C1<IgfsContext, IgniteFileSystem>() {
         @Override public IgniteFileSystem apply(IgfsContext igfsCtx) {
@@ -307,6 +313,14 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
                 throw new IgniteCheckedException("Invalid IGFS data cache configuration (key affinity mapper class should be " +
                     IgfsGroupDataBlocksKeyMapper.class.getSimpleName() + "): " + cfg);
 
+            if (cfg.getIpcEndpointConfiguration() != null) {
+                final int tcpPort = cfg.getIpcEndpointConfiguration().getPort();
+
+                if (!(tcpPort >= MIN_TCP_PORT && tcpPort <= MAX_TCP_PORT))
+                    throw new IgniteCheckedException("IGFS endpoint TCP port is out of range [" + MIN_TCP_PORT +
+                        ".." + MAX_TCP_PORT + "]: " + tcpPort);
+            }
+
             long maxSpaceSize = cfg.getMaxSpaceSize();
 
             if (maxSpaceSize > 0) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7475f09/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java
index 11a80af..27f47e8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java
@@ -23,6 +23,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
+import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
 import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
@@ -442,6 +443,32 @@ public class IgfsProcessorValidationSelfTest extends IgfsCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testInvalidEndpointTcpPort() throws Exception {
+        final String failMsg = "IGFS endpoint TCP port is out of range";
+        g1Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class));
+
+        final String igfsCfgName = "igfs-cfg";
+        final IgfsIpcEndpointConfiguration igfsEndpointCfg = new IgfsIpcEndpointConfiguration();
+        igfsEndpointCfg.setPort(0);
+        g1IgfsCfg1.setName(igfsCfgName);
+        g1IgfsCfg1.setIpcEndpointConfiguration(igfsEndpointCfg);
+
+        checkGridStartFails(g1Cfg, failMsg, true);
+
+        igfsEndpointCfg.setPort(-1);
+        g1IgfsCfg1.setIpcEndpointConfiguration(igfsEndpointCfg);
+
+        checkGridStartFails(g1Cfg, failMsg, true);
+
+        igfsEndpointCfg.setPort(65536);
+        g1IgfsCfg1.setIpcEndpointConfiguration(igfsEndpointCfg);
+
+        checkGridStartFails(g1Cfg, failMsg, true);
+    }
+
+    /**
      * Checks that the given grid configuration will lead to {@link IgniteCheckedException} upon grid startup.
      *
      * @param cfg Grid configuration to check.


[04/22] ignite git commit: ignite-2587 Fixed continuous query notifications in offheap mode and BinaryObjectOffheapImpl usage.

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapValuesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapValuesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapValuesTest.java
new file mode 100644
index 0000000..2994af6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapValuesTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
+
+/**
+ *
+ */
+public class GridCacheContinuousQueryTxOffheapValuesTest extends GridCacheContinuousQueryTxSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode memoryMode() {
+        return OFFHEAP_VALUES;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 949290e..4fcc1ed 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -941,7 +941,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
      * @param cacheName Cache name.
      * @return Near cache for key.
      */
-    protected IgniteCache<Integer, Integer> primaryCache(Integer key, String cacheName) {
+    protected <K, V> IgniteCache<K, V> primaryCache(Object key, String cacheName) {
         return primaryNode(key, cacheName).cache(cacheName);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 90125b1..5af37a6 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -86,10 +86,14 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheAtomicPrimaryWrite
 import org.apache.ignite.internal.processors.cache.IgniteCacheAtomicPrimaryWriteOrderWithStoreInvokeTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheAtomicStopBusySelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerAtomicLocalTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerAtomicOffheapTieredTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerAtomicOffheapValuesTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerAtomicReplicatedTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerAtomicTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerEagerTtlDisabledTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerTxLocalTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerTxOffheapTieredTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerTxOffheapValuesTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerTxReplicatedTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerTxTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheEntryProcessorCallTest;
@@ -156,9 +160,13 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheEntryListenerAtomicReplicatedTest.class);
         suite.addTestSuite(IgniteCacheEntryListenerAtomicLocalTest.class);
         suite.addTestSuite(IgniteCacheEntryListenerTxTest.class);
+        suite.addTestSuite(IgniteCacheEntryListenerTxOffheapTieredTest.class);
+        suite.addTestSuite(IgniteCacheEntryListenerTxOffheapValuesTest.class);
         suite.addTestSuite(IgniteCacheEntryListenerTxReplicatedTest.class);
         suite.addTestSuite(IgniteCacheEntryListenerTxLocalTest.class);
         suite.addTestSuite(IgniteCacheEntryListenerEagerTtlDisabledTest.class);
+        suite.addTestSuite(IgniteCacheEntryListenerAtomicOffheapTieredTest.class);
+        suite.addTestSuite(IgniteCacheEntryListenerAtomicOffheapValuesTest.class);
 
         suite.addTestSuite(IgniteClientAffinityAssignmentSelfTest.class);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 359cdf3..3cd4579 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -70,11 +70,16 @@ import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQ
 import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSwapScanQuerySelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxOffheapTieredTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxReplicatedSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapTieredTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapValuesTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicP2PDisabledSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryLocalAtomicSelfTest;
@@ -89,6 +94,8 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedTxOneNodeTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedP2PDisabledSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxOffheapTieredTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxOffheapValuesTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
@@ -181,9 +188,13 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheContinuousQueryPartitionedOnlySelfTest.class);
         suite.addTestSuite(GridCacheContinuousQueryPartitionedP2PDisabledSelfTest.class);
         suite.addTestSuite(GridCacheContinuousQueryTxSelfTest.class);
+        suite.addTestSuite(GridCacheContinuousQueryTxOffheapTieredTest.class);
+        suite.addTestSuite(GridCacheContinuousQueryTxOffheapValuesTest.class);
         suite.addTestSuite(GridCacheContinuousQueryAtomicSelfTest.class);
         suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class);
         suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class);
+        suite.addTestSuite(GridCacheContinuousQueryAtomicOffheapTieredTest.class);
+        suite.addTestSuite(GridCacheContinuousQueryAtomicOffheapValuesTest.class);
         suite.addTestSuite(GridCacheContinuousQueryReplicatedTxOneNodeTest.class);
         suite.addTestSuite(GridCacheContinuousQueryReplicatedAtomicOneNodeTest.class);
         suite.addTestSuite(GridCacheContinuousQueryPartitionTxOneNodeTest.class);
@@ -195,6 +206,9 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(CacheContinuousQueryFailoverAtomicReplicatedSelfTest.class);
         suite.addTestSuite(CacheContinuousQueryFailoverTxSelfTest.class);
         suite.addTestSuite(CacheContinuousQueryFailoverTxReplicatedSelfTest.class);
+        suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.class);
+        suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class);
+        suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class);
 
         // Reduce fields queries.
         suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);


[20/22] ignite git commit: IGNITE-2603

Posted by sb...@apache.org.
IGNITE-2603


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a32dfc41
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a32dfc41
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a32dfc41

Branch: refs/heads/ignite-2407
Commit: a32dfc41ea9301f8b98c6a666e4b72c65c892659
Parents: 725d6cb
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Feb 12 14:30:08 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Feb 12 14:30:08 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    |   8 +-
 .../GridCacheReplicatedPreloadSelfTest.java     | 121 ++++++++++++++-----
 .../p2p/CacheDeploymentAffinityKeyMapper.java   |  35 ++++++
 .../CacheDeploymentAlwaysTruePredicate2.java    |  30 +++++
 ...oymentCacheEntryEventSerializableFilter.java |  32 +++++
 .../p2p/CacheDeploymentCacheEntryListener.java  |  31 +++++
 ...CacheDeploymentCachePluginConfiguration.java |  74 ++++++++++++
 ...heDeploymentStoreSessionListenerFactory.java |  83 +++++++++++++
 8 files changed, 383 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a32dfc41/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 5acad6c..7a36e73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -3425,8 +3425,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         try {
             if (val.getCacheStoreFactory() != null) {
                 try {
-                    marshaller.unmarshal(marshaller.marshal(val.getCacheStoreFactory()),
-                        val.getCacheStoreFactory().getClass().getClassLoader());
+                    ClassLoader ldr = ctx.config().getClassLoader();
+
+                    if (ldr == null)
+                        ldr = val.getCacheStoreFactory().getClass().getClassLoader();
+
+                    marshaller.unmarshal(marshaller.marshal(val.getCacheStoreFactory()), ldr);
                 }
                 catch (IgniteCheckedException e) {
                     throw new IgniteCheckedException("Failed to validate cache configuration. " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/a32dfc41/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
index 887fea4..1fae875 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
@@ -26,13 +26,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
 import javax.cache.configuration.Factory;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryListener;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cache.affinity.AffinityKeyMapper;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -43,6 +48,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.CachePluginConfiguration;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -71,7 +78,7 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
     private int poolSize = 2;
 
     /** */
-    private volatile boolean needStore = false;
+    private volatile boolean extClassloadingAtCfg = false;
 
     /** */
     private volatile boolean isClient = false;
@@ -136,21 +143,47 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
         cacheCfg.setRebalanceBatchSize(batchSize);
         cacheCfg.setRebalanceThreadPoolSize(poolSize);
 
-        if (needStore) {
-            Object sf = null;
+        if (extClassloadingAtCfg) {
+            loadExternalClassesToCfg(cacheCfg);
+        }
 
-            try {
-                sf = getExternalClassLoader().
-                    loadClass("org.apache.ignite.tests.p2p.CacheDeploymentTestStoreFactory").newInstance();
-            }
-            catch (Exception e) {
-                throw new RuntimeException(e);
-            }
+        return cacheCfg;
+    }
+
+    /**
+     *
+     * @param cacheCfg Configuration.
+     */
+    private void loadExternalClassesToCfg(CacheConfiguration cacheCfg) {
+        try {
+            Object sf = getExternalClassLoader().
+                loadClass("org.apache.ignite.tests.p2p.CacheDeploymentTestStoreFactory").newInstance();
 
             cacheCfg.setCacheStoreFactory((Factory)sf);
-        }
 
-        return cacheCfg;
+            Object sslf = getExternalClassLoader().
+                loadClass("org.apache.ignite.tests.p2p.CacheDeploymentStoreSessionListenerFactory").newInstance();
+
+            cacheCfg.setCacheStoreSessionListenerFactories((Factory)sslf);
+
+            Object cpc = getExternalClassLoader().
+                loadClass("org.apache.ignite.tests.p2p.CacheDeploymentCachePluginConfiguration").newInstance();
+
+            cacheCfg.setPluginConfigurations((CachePluginConfiguration)cpc);
+
+            Object akm = getExternalClassLoader().
+                loadClass("org.apache.ignite.tests.p2p.CacheDeploymentAffinityKeyMapper").newInstance();
+
+            cacheCfg.setAffinityMapper((AffinityKeyMapper)akm);
+
+            Object pred = getExternalClassLoader().
+                loadClass("org.apache.ignite.tests.p2p.CacheDeploymentAlwaysTruePredicate2").newInstance();
+
+            cacheCfg.setNodeFilter((IgnitePredicate)pred);
+        }
+        catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
     /**
@@ -299,9 +332,9 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If test failed.
      */
-    public void testStore() throws Exception {
+    public void testExternalClassesAtConfiguration() throws Exception {
         try {
-            needStore = true;
+            extClassloadingAtCfg = true;
             useExtClassLoader = true;
 
             Ignite g1 = startGrid(1);
@@ -316,13 +349,47 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
             IgniteCache<Integer, Object> cache2 = g2.cache(null);
             IgniteCache<Integer, Object> cache3 = g3.cache(null);
 
+            final Class<CacheEntryListener> cls1 = (Class<CacheEntryListener>) getExternalClassLoader().
+                loadClass("org.apache.ignite.tests.p2p.CacheDeploymentCacheEntryListener");
+            final Class<CacheEntryEventSerializableFilter> cls2 = (Class<CacheEntryEventSerializableFilter>) getExternalClassLoader().
+                loadClass("org.apache.ignite.tests.p2p.CacheDeploymentCacheEntryEventSerializableFilter");
+
+            CacheEntryListenerConfiguration<Integer, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+                new Factory<CacheEntryListener<Integer, Object>>() {
+                    @Override public CacheEntryListener<Integer, Object> create() {
+                        try {
+                            return cls1.newInstance();
+                        }
+                        catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                },
+                new Factory<CacheEntryEventSerializableFilter<Integer, Object>>() {
+                    /** {@inheritDoc} */
+                    @Override public CacheEntryEventSerializableFilter<Integer, Object> create() {
+                        try {
+
+                            return cls2.newInstance();
+                        }
+                        catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                },
+                true,
+                true
+            );
+
+            cache1.registerCacheEntryListener(lsnrCfg);
+
             cache1.put(1, 1);
 
             assertEquals(1, cache2.get(1));
             assertEquals(1, cache3.get(1));
         }
         finally {
-            needStore = false;
+            extClassloadingAtCfg = false;
             isClient = false;
             useExtClassLoader = false;
         }
@@ -331,9 +398,9 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If test failed.
      */
-    public void testStoreDynamicStart() throws Exception {
+    public void testExternalClassesAtConfigurationDynamicStart() throws Exception {
         try {
-            needStore = false;
+            extClassloadingAtCfg = false;
             useExtClassLoader = true;
 
             Ignite g1 = startGrid(1);
@@ -343,12 +410,10 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
 
             Ignite g3 = startGrid(3);
 
-            Object sf = getExternalClassLoader().loadClass(
-                "org.apache.ignite.tests.p2p.CacheDeploymentTestStoreFactory").newInstance();
-
             CacheConfiguration cfg = defaultCacheConfiguration();
 
-            cfg.setCacheStoreFactory((Factory)sf);
+            loadExternalClassesToCfg(cfg);
+
             cfg.setName("customStore");
 
             IgniteCache<Integer, Object> cache1 = g1.createCache(cfg);
@@ -362,7 +427,7 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
             assertEquals(1, cache3.get(1));
         }
         finally {
-            needStore = false;
+            extClassloadingAtCfg = false;
             isClient = false;
             useExtClassLoader = false;
         }
@@ -371,9 +436,9 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If test failed.
      */
-    public void testStoreDynamicStart2() throws Exception {
+    public void testExternalClassesAtConfigurationDynamicStart2() throws Exception {
         try {
-            needStore = false;
+            extClassloadingAtCfg = false;
             useExtClassLoader = true;
 
             Ignite g1 = startGrid(1);
@@ -383,12 +448,10 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
 
             Ignite g3 = startGrid(3);
 
-            Object sf = getExternalClassLoader().loadClass(
-                "org.apache.ignite.tests.p2p.CacheDeploymentTestStoreFactory").newInstance();
-
             CacheConfiguration cfg = defaultCacheConfiguration();
 
-            cfg.setCacheStoreFactory((Factory)sf);
+            loadExternalClassesToCfg(cfg);
+
             cfg.setName("customStore");
 
             IgniteCache<Integer, Object> cache1 = g1.getOrCreateCache(cfg);
@@ -402,7 +465,7 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
             assertEquals(1, cache3.get(1));
         }
         finally {
-            needStore = false;
+            extClassloadingAtCfg = false;
             isClient = false;
             useExtClassLoader = false;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a32dfc41/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentAffinityKeyMapper.java
----------------------------------------------------------------------
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentAffinityKeyMapper.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentAffinityKeyMapper.java
new file mode 100644
index 0000000..fbb74d2
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentAffinityKeyMapper.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import org.apache.ignite.cache.affinity.AffinityKeyMapper;
+
+/**
+ * Test affinity ley mapper for cache deployment tests.
+ */
+public class CacheDeploymentAffinityKeyMapper implements AffinityKeyMapper {
+    /** {@inheritDoc} */
+    @Override public Object affinityKey(Object key) {
+        return key;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void reset() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a32dfc41/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentAlwaysTruePredicate2.java
----------------------------------------------------------------------
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentAlwaysTruePredicate2.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentAlwaysTruePredicate2.java
new file mode 100644
index 0000000..d88c7bf
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentAlwaysTruePredicate2.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import org.apache.ignite.lang.IgnitePredicate;
+
+/**
+ *
+ */
+public class CacheDeploymentAlwaysTruePredicate2 implements IgnitePredicate<Object> {
+    /** */
+    @Override public boolean apply(Object o) {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a32dfc41/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCacheEntryEventSerializableFilter.java
----------------------------------------------------------------------
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCacheEntryEventSerializableFilter.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCacheEntryEventSerializableFilter.java
new file mode 100644
index 0000000..c29c1a4
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCacheEntryEventSerializableFilter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+
+/**
+ *
+ */
+public class CacheDeploymentCacheEntryEventSerializableFilter implements CacheEntryEventSerializableFilter {
+    /** {@inheritDoc} */
+    @Override public boolean evaluate(CacheEntryEvent event) throws CacheEntryListenerException {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a32dfc41/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCacheEntryListener.java
----------------------------------------------------------------------
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCacheEntryListener.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCacheEntryListener.java
new file mode 100644
index 0000000..64c13fb
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCacheEntryListener.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryListenerException;
+
+/**
+ *
+ */
+public class CacheDeploymentCacheEntryListener implements CacheEntryCreatedListener {
+    /** {@inheritDoc} */
+    @Override public void onCreated(Iterable iterable) throws CacheEntryListenerException {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a32dfc41/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCachePluginConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCachePluginConfiguration.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCachePluginConfiguration.java
new file mode 100644
index 0000000..bb37c25
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCachePluginConfiguration.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.plugin.CachePluginConfiguration;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Test cache plugin configuration for cache deployment tests.
+ */
+public class CacheDeploymentCachePluginConfiguration<K, V> implements CachePluginConfiguration<K, V> {
+    /** {@inheritDoc} */
+    @Override public CachePluginProvider createProvider(CachePluginContext ctx) {
+        return new CacheDeploymentCachePluginProvider();
+    }
+
+    private static class CacheDeploymentCachePluginProvider implements CachePluginProvider {
+        /** {@inheritDoc} */
+        @Nullable @Override public Object createComponent(Class cls) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void start() throws IgniteCheckedException {
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public void stop(boolean cancel) {
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onIgniteStart() throws IgniteCheckedException {
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onIgniteStop(boolean cancel) {
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public void validate() throws IgniteCheckedException {
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public void validateRemote(CacheConfiguration locCfg, CachePluginConfiguration locPluginCcfg,
+            CacheConfiguration rmtCfg, ClusterNode rmtNode) throws IgniteCheckedException {
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a32dfc41/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentStoreSessionListenerFactory.java
----------------------------------------------------------------------
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentStoreSessionListenerFactory.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentStoreSessionListenerFactory.java
new file mode 100644
index 0000000..74d9d21
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentStoreSessionListenerFactory.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import javax.cache.configuration.Factory;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.store.CacheStoreSession;
+import org.apache.ignite.cache.store.CacheStoreSessionListener;
+import org.apache.ignite.lifecycle.LifecycleAware;
+
+/**
+ * Test store session listener factory for cache deployment tests.
+ */
+public class CacheDeploymentStoreSessionListenerFactory implements Factory<CacheStoreSessionListener> {
+    /** */
+    private String name;
+
+    /**
+     *
+     */
+    public CacheDeploymentStoreSessionListenerFactory() {
+    }
+
+    /**
+     * @param name Name.
+     */
+    public CacheDeploymentStoreSessionListenerFactory(String name) {
+        this.name = name;
+    }
+
+    @Override public CacheStoreSessionListener create() {
+        return new CacheDeploymentSessionListener(name);
+    }
+
+    /**
+     */
+    private static class CacheDeploymentSessionListener implements CacheStoreSessionListener, LifecycleAware {
+        /** */
+        private final String name;
+
+        /**
+         * @param name Name.
+         */
+        private CacheDeploymentSessionListener(String name) {
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void start() throws IgniteException {
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public void stop() throws IgniteException {
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onSessionStart(CacheStoreSession ses) {
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
+
+        }
+    }
+}
\ No newline at end of file


[18/22] ignite git commit: IGNITE-2509 - Fixed offheap metrics - Fixes #470.

Posted by sb...@apache.org.
IGNITE-2509 - Fixed offheap metrics - Fixes #470.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/763bf578
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/763bf578
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/763bf578

Branch: refs/heads/ignite-2407
Commit: 763bf578e9f510e50bdfa6b9e51ea25348bfd2e9
Parents: 35b0e6b
Author: vershov <ve...@gridgain.com>
Authored: Fri Feb 12 12:51:50 2016 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Feb 12 12:51:50 2016 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/cache/CacheMemoryMode.java   |  2 ++
 .../processors/cache/GridCacheAdapter.java         |  7 +++++++
 .../processors/cache/GridCacheSwapManager.java     |  3 ++-
 .../internal/GridAffinityNoCacheSelfTest.java      |  4 +++-
 .../GridCacheOffHeapValuesEvictionSelfTest.java    | 17 +++++++++++++++--
 5 files changed, 29 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/763bf578/modules/core/src/main/java/org/apache/ignite/cache/CacheMemoryMode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheMemoryMode.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheMemoryMode.java
index a596824..0133327 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMemoryMode.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMemoryMode.java
@@ -56,6 +56,8 @@ public enum CacheMemoryMode {
      * Entry keys will be stored on heap memory, and values will be stored in offheap memory. Note
      * that in this mode entries can be evicted only to swap. The evictions will happen according
      * to configured {@link EvictionPolicy}.
+     * <p/>
+     * Size returned by {@link CachePeekMode#OFFHEAP} is always zero, for this mode.
      */
     OFFHEAP_VALUES,
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/763bf578/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 84eb0b8..3fac207 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -54,6 +54,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheEntry;
 import org.apache.ignite.cache.CacheInterceptor;
+import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.affinity.Affinity;
@@ -4121,6 +4122,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
     /** {@inheritDoc} */
     @Override public long offHeapAllocatedSize() {
+        if (ctx.config().getMemoryMode() == CacheMemoryMode.OFFHEAP_VALUES) {
+            assert ctx.unsafeMemory() != null;
+
+            return ctx.unsafeMemory().allocatedSize();
+        }
+
         GridCacheSwapManager swapMgr = ctx.swap();
 
         return swapMgr != null ? swapMgr.offHeapAllocatedSize() : -1;

http://git-wip-us.apache.org/repos/asf/ignite/blob/763bf578/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 37b5e15..cbf09bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentMap;
 import javax.cache.Cache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.internal.managers.swapspace.GridSwapSpaceManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
@@ -85,7 +86,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
     /** Flag to indicate if swap is enabled. */
     private boolean swapEnabled;
 
-    /** Flag to indicate if offheap is enabled. */
+    /** Flag to indicate if offheap is enabled. {@link CacheMemoryMode#OFFHEAP_VALUES} treated as offheap disabled. */
     private boolean offheapEnabled;
 
     /** Swap listeners. */

http://git-wip-us.apache.org/repos/asf/ignite/blob/763bf578/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java
index 6fb1280..5561f35 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java
@@ -101,13 +101,15 @@ public class GridAffinityNoCacheSelfTest extends GridCommonAbstractTest {
     /**
      * @param key Key.
      */
-    private void checkAffinityImplCacheDeleted(Object key) {
+    private void checkAffinityImplCacheDeleted(Object key) throws InterruptedException{
         IgniteEx grid = grid(0);
 
         final String cacheName = "cacheToBeDeleted";
 
         grid(1).getOrCreateCache(cacheName);
 
+        awaitPartitionMapExchange();
+
         Affinity<Object> affinity = grid.affinity(cacheName);
 
         assertTrue(affinity instanceof GridCacheAffinityImpl);

http://git-wip-us.apache.org/repos/asf/ignite/blob/763bf578/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapValuesEvictionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapValuesEvictionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapValuesEvictionSelfTest.java
index 9baab33..0efd89b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapValuesEvictionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapValuesEvictionSelfTest.java
@@ -33,12 +33,18 @@ import org.apache.ignite.testframework.GridTestUtils;
  */
 public class GridCacheOffHeapValuesEvictionSelfTest extends GridCacheAbstractSelfTest {
 
+    /** */
     private static final int VAL_SIZE = 512 * 1024; // bytes
+    /** */
     private static final int MAX_VALS_AMOUNT = 100;
+    /** */
     private static final int MAX_MEMORY_SIZE = MAX_VALS_AMOUNT * VAL_SIZE;
+    /** */
     private static final int VALS_AMOUNT = MAX_VALS_AMOUNT * 2;
+    /** */
     private static final int THREAD_COUNT = 4;
 
+    /** {@inheritDoc} */
     @Override protected int gridCount() {
         return 1;
     }
@@ -46,7 +52,7 @@ public class GridCacheOffHeapValuesEvictionSelfTest extends GridCacheAbstractSel
     /**
      * @throws Exception If failed.
      */
-    public void testPutOnHeap() throws Exception {
+    public void testPutValuesOffHeap() throws Exception {
         CacheConfiguration<Integer, Object> ccfg = cacheConfiguration(grid(0).name());
         ccfg.setName("testPutOffHeapValues");
         ccfg.setStatisticsEnabled(true);
@@ -70,6 +76,10 @@ public class GridCacheOffHeapValuesEvictionSelfTest extends GridCacheAbstractSel
         assertTrue(MAX_VALS_AMOUNT >= cache.size(CachePeekMode.ONHEAP));
         assertTrue(MAX_VALS_AMOUNT - 5 <= cache.size(CachePeekMode.ONHEAP));
         assertEquals(cache.size(CachePeekMode.ALL) - cache.size(CachePeekMode.ONHEAP), cache.size(CachePeekMode.SWAP));
+
+        assertTrue((MAX_VALS_AMOUNT + 5) * VAL_SIZE > cache.metrics().getOffHeapAllocatedSize());
+        assertTrue((MAX_VALS_AMOUNT - 5) * VAL_SIZE < cache.metrics().getOffHeapAllocatedSize());
+        assertTrue(cache.metrics().getOffHeapAllocatedSize() >= cache.size(CachePeekMode.ONHEAP) * VAL_SIZE);
     }
 
     /**
@@ -109,6 +119,7 @@ public class GridCacheOffHeapValuesEvictionSelfTest extends GridCacheAbstractSel
 
         assertTrue((MAX_VALS_AMOUNT + 5) * VAL_SIZE > cache.metrics().getOffHeapAllocatedSize());
         assertTrue((MAX_VALS_AMOUNT - 5) * VAL_SIZE < cache.metrics().getOffHeapAllocatedSize());
+        assertTrue(cache.metrics().getOffHeapAllocatedSize() >= cache.size(CachePeekMode.OFFHEAP) * VAL_SIZE);
     }
 
     /**
@@ -146,12 +157,14 @@ public class GridCacheOffHeapValuesEvictionSelfTest extends GridCacheAbstractSel
 
         assertTrue((MAX_VALS_AMOUNT + 5) * VAL_SIZE > cache.metrics().getOffHeapAllocatedSize());
         assertTrue((MAX_VALS_AMOUNT - 5) * VAL_SIZE < cache.metrics().getOffHeapAllocatedSize());
+        assertTrue(cache.metrics().getOffHeapAllocatedSize() >= cache.size(CachePeekMode.OFFHEAP) * VAL_SIZE);
     }
 
+    /** Fill cache with values. */
     private static void fillCache(final IgniteCache<Integer, Object> cache, long timeout) throws Exception{
         final byte[] val = new byte[VAL_SIZE];
         final AtomicInteger keyStart = new AtomicInteger(0);
-        final CountDownLatch latch = new CountDownLatch(4);
+        final CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
 
         GridTestUtils.runMultiThreaded(new Callable<Void>() {
             @Override public Void call() throws Exception {


[09/22] ignite git commit: Added test.

Posted by sb...@apache.org.
Added test.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/16927abb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/16927abb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/16927abb

Branch: refs/heads/ignite-2407
Commit: 16927abbb1ed1a6c3b47831562d263dd38f495d1
Parents: fa3706f
Author: sboikov <sb...@gridgain.com>
Authored: Wed Feb 10 15:05:56 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Feb 10 15:06:31 2016 +0300

----------------------------------------------------------------------
 .../cache/CacheQueryBuildValueTest.java         | 144 +++++++++++++++++++
 1 file changed, 144 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/16927abb/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryBuildValueTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryBuildValueTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryBuildValueTest.java
new file mode 100644
index 0000000..cb574bb
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryBuildValueTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteBinary;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.binary.BinaryTypeConfiguration;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.configuration.BinaryConfiguration;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class CacheQueryBuildValueTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setMarshaller(null);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        QueryEntity entity = new QueryEntity();
+        entity.setKeyType(Integer.class.getName());
+        entity.setValueType(TestBuilderValue.class.getName());
+
+        ArrayList<QueryIndex> idxs = new ArrayList<>();
+
+        QueryIndex idx = new QueryIndex("iVal");
+        idxs.add(idx);
+
+        LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+
+        fields.put("iVal", Integer.class.getName());
+
+        entity.setFields(fields);
+
+        entity.setIndexes(idxs);
+
+        ccfg.setQueryEntities(Collections.singleton(entity));
+
+        cfg.setCacheConfiguration(ccfg);
+
+        BinaryConfiguration binaryCfg = new BinaryConfiguration();
+
+        BinaryTypeConfiguration typeCfg = new BinaryTypeConfiguration();
+        typeCfg.setTypeName(TestBuilderValue.class.getName());
+
+        binaryCfg.setTypeConfigurations(Collections.singletonList(typeCfg));
+
+        cfg.setBinaryConfiguration(binaryCfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid(0);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBuilderAndQuery() throws Exception {
+        Ignite node = ignite(0);
+
+        final IgniteCache<Object, Object> cache = node.cache(null);
+
+        IgniteBinary binary = node.binary();
+
+        BinaryObjectBuilder builder = binary.builder(TestBuilderValue.class.getName());
+
+        cache.put(0, builder.build());
+
+        builder.setField("iVal", 1);
+
+        cache.put(1, builder.build());
+
+        List<Cache.Entry<Object, Object>> entries =
+            cache.query(new SqlQuery<>(TestBuilderValue.class, "true")).getAll();
+
+        assertEquals(2, entries.size());
+    }
+
+    /**
+     *
+     */
+    static class TestBuilderValue implements Serializable {
+        /** */
+        private int iVal;
+
+        /**
+         * @param iVal Integer value.
+         */
+        public TestBuilderValue(int iVal) {
+            this.iVal = iVal;
+        }
+    }
+}


[14/22] ignite git commit: Improved KerberosHadoopFileSystemFactory JavaDocs.

Posted by sb...@apache.org.
Improved KerberosHadoopFileSystemFactory JavaDocs.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a9937a6e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a9937a6e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a9937a6e

Branch: refs/heads/ignite-2407
Commit: a9937a6e20407189b8bd67b2cc30a30ddc8dd6ce
Parents: d08a779
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Feb 10 16:57:16 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 10 16:57:16 2016 +0300

----------------------------------------------------------------------
 .../apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java  | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a9937a6e/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
index fc768d6..a78cabc 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
@@ -106,6 +106,9 @@ public class KerberosHadoopFileSystemFactory extends BasicHadoopFileSystemFactor
 
     /**
      * Gets the key tab full file name (e.g. "/etc/security/keytabs/hdfs.headless.keytab" or "/etc/krb5.keytab").
+     * <p>
+     * <b>NOTE!</b> Factory can be serialized and transferred to other machines where instance of
+     * {@link IgniteHadoopFileSystem} resides. Corresponding path must exist on these machines as well.
      *
      * @return The key tab file name.
      */


[17/22] ignite git commit: IGNITE-2555

Posted by sb...@apache.org.
 IGNITE-2555


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/35b0e6bf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/35b0e6bf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/35b0e6bf

Branch: refs/heads/ignite-2407
Commit: 35b0e6bf149bb86a3eefefcbc657c822e25681f3
Parents: 877be93
Author: ruskim <ru...@gmail.com>
Authored: Thu Feb 11 18:53:50 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Thu Feb 11 18:53:50 2016 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    | 14 ++-
 .../internal/GridNodeMetricsLogSelfTest.java    | 98 ++++++++++++++++++++
 .../ignite/testsuites/IgniteBasicTestSuite.java |  2 +
 3 files changed, 113 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/35b0e6bf/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index e3017ff..5d8daf6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -989,6 +989,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                             double avgCpuLoadPct = m.getAverageCpuLoad() * 100;
                             double gcPct = m.getCurrentGcCpuLoad() * 100;
 
+                            //Heap params
                             long heapUsed = m.getHeapMemoryUsed();
                             long heapMax = m.getHeapMemoryMaximum();
 
@@ -997,6 +998,15 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
 
                             double freeHeapPct = heapMax > 0 ? ((double)((heapMax - heapUsed) * 100)) / heapMax : -1;
 
+                            //Non heap params
+                            long nonHeapUsed = m.getNonHeapMemoryUsed();
+                            long nonHeapMax = m.getNonHeapMemoryMaximum();
+
+                            long nonHeapUsedInMBytes = nonHeapUsed / 1024 / 1024;
+                            long nonHeapCommInMBytes = m.getNonHeapMemoryCommitted() / 1024 / 1024;
+
+                            double freeNonHeapPct = nonHeapMax > 0 ? ((double)((nonHeapMax - nonHeapUsed) * 100)) / nonHeapMax : -1;
+
                             int hosts = 0;
                             int nodes = 0;
                             int cpus = 0;
@@ -1046,12 +1056,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
 
                             String msg = NL +
                                 "Metrics for local node (to disable set 'metricsLogFrequency' to 0)" + NL +
-                                "    ^-- Node [id=" + id + ", name=" + name() + "]" + NL +
+                                "    ^-- Node [id=" + id + ", name=" + name() + ", uptime=" + getUpTimeFormatted() + "]" + NL +
                                 "    ^-- H/N/C [hosts=" + hosts + ", nodes=" + nodes + ", CPUs=" + cpus + "]" + NL +
                                 "    ^-- CPU [cur=" + dblFmt.format(cpuLoadPct) + "%, avg=" +
                                 dblFmt.format(avgCpuLoadPct) + "%, GC=" + dblFmt.format(gcPct) + "%]" + NL +
                                 "    ^-- Heap [used=" + dblFmt.format(heapUsedInMBytes) + "MB, free=" +
                                 dblFmt.format(freeHeapPct) + "%, comm=" + dblFmt.format(heapCommInMBytes) + "MB]" + NL +
+                                "    ^-- Non heap [used=" + dblFmt.format(nonHeapUsedInMBytes) + "MB, free=" +
+                                dblFmt.format(freeNonHeapPct) + "%, comm=" + dblFmt.format(nonHeapCommInMBytes) + "MB]" + NL +
                                 "    ^-- Public thread pool [active=" + pubPoolActiveThreads + ", idle=" +
                                 pubPoolIdleThreads + ", qSize=" + pubPoolQSize + "]" + NL +
                                 "    ^-- System thread pool [active=" + sysPoolActiveThreads + ", idle=" +

http://git-wip-us.apache.org/repos/asf/ignite/blob/35b0e6bf/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
new file mode 100644
index 0000000..fe5922e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+
+import java.io.StringWriter;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.testframework.junits.common.GridCommonTest;
+import org.apache.log4j.Layout;
+import org.apache.log4j.Logger;
+import org.apache.log4j.SimpleLayout;
+import org.apache.log4j.WriterAppender;
+
+/**
+ * Check logging local node metrics
+ */
+@SuppressWarnings({"ProhibitedExceptionDeclared"})
+@GridCommonTest(group = "Kernal")
+public class GridNodeMetricsLogSelfTest extends GridCommonAbstractTest {
+    /** */
+
+    public GridNodeMetricsLogSelfTest() {
+        super(false);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"unchecked"})
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setMetricsLogFrequency(1000);
+
+        return cfg;
+    }
+
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNodeMetricsLog() throws Exception {
+        // Log to string, to check log content
+        Layout layout = new SimpleLayout();
+
+        StringWriter strWr = new StringWriter();
+
+        WriterAppender app = new WriterAppender(layout, strWr);
+
+        Logger.getRootLogger().addAppender(app);
+
+        Ignite g1 = startGrid(1);
+
+        IgniteCache<Integer, String> cache1 = g1.createCache("TestCache1");
+
+        cache1.put(1, "one");
+
+        Ignite g2 = startGrid(2);
+
+        IgniteCache<Integer, String> cache2 = g2.createCache("TestCache2");
+
+        cache2.put(2, "two");
+
+        Thread.sleep(10000);
+
+        //Check that nodes are alie
+        assert cache1.get(1).equals("one");
+        assert cache2.get(2).equals("two");
+
+        String fullLog = strWr.toString();
+
+        Logger.getRootLogger().removeAppender(app);
+
+        assert fullLog.contains("Metrics for local node");
+        assert fullLog.contains("uptime=");
+        assert fullLog.contains("Non heap");
+        assert fullLog.contains("Outbound messages queue");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/35b0e6bf/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index c904ef4..3903910 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.ClusterGroupSelfTest;
 import org.apache.ignite.internal.GridFailFastNodeFailureDetectionSelfTest;
 import org.apache.ignite.internal.GridLifecycleAwareSelfTest;
 import org.apache.ignite.internal.GridLifecycleBeanSelfTest;
+import org.apache.ignite.internal.GridNodeMetricsLogSelfTest;
 import org.apache.ignite.internal.GridProjectionForCachesSelfTest;
 import org.apache.ignite.internal.GridReduceSelfTest;
 import org.apache.ignite.internal.GridReleaseTypeSelfTest;
@@ -114,6 +115,7 @@ public class IgniteBasicTestSuite extends TestSuite {
         suite.addTestSuite(IgniteSlowClientDetectionSelfTest.class);
         GridTestUtils.addTestIfNeeded(suite, IgniteDaemonNodeMarshallerCacheTest.class, ignoredTests);
         suite.addTestSuite(IgniteMarshallerCacheConcurrentReadWriteTest.class);
+        suite.addTestSuite(GridNodeMetricsLogSelfTest.class);
 
         suite.addTestSuite(IgniteExceptionInNioWorkerSelfTest.class);
 


[08/22] ignite git commit: Merge remote-tracking branch 'origin/master'

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/master'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fa3706f8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fa3706f8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fa3706f8

Branch: refs/heads/ignite-2407
Commit: fa3706f866b42418abaa54c5f4e73f2dedb166b0
Parents: c3aa137 4c05fc0
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Feb 10 15:01:00 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 10 15:01:00 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/CacheLazyEntry.java        |   3 +
 .../processors/cache/GridCacheContext.java      |   4 +-
 .../processors/cache/GridCacheMapEntry.java     | 118 +++-
 .../binary/CacheObjectBinaryProcessorImpl.java  |   6 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  79 ++-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  85 ++-
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |  38 +-
 .../cache/query/GridCacheQueryManager.java      |  30 +-
 .../continuous/CacheContinuousQueryHandler.java |   3 +-
 .../CacheContinuousQueryListener.java           |   2 +-
 .../continuous/CacheContinuousQueryManager.java | 120 +++-
 .../continuous/GridContinuousProcessor.java     |  16 +-
 .../IgniteCacheEntryListenerAbstractTest.java   | 454 ++++++++----
 ...cheEntryListenerAtomicOffheapTieredTest.java |  32 +
 ...cheEntryListenerAtomicOffheapValuesTest.java |  32 +
 ...teCacheEntryListenerTxOffheapTieredTest.java |  32 +
 ...teCacheEntryListenerTxOffheapValuesTest.java |  32 +
 .../cache/IgniteCacheEntryListenerTxTest.java   |   1 +
 ...ContinuousQueryFailoverAbstractSelfTest.java |  10 +
 ...tomicPrimaryWriteOrderOffheapTieredTest.java |  33 +
 ...tinuousQueryFailoverTxOffheapTieredTest.java |  32 +
 ...acheContinuousQueryRandomOperationsTest.java | 684 +++++++++++++++++++
 ...ridCacheContinuousQueryAbstractSelfTest.java |  19 +-
 ...eContinuousQueryAtomicOffheapTieredTest.java |  32 +
 ...eContinuousQueryAtomicOffheapValuesTest.java |  32 +
 ...CacheContinuousQueryTxOffheapTieredTest.java |  32 +
 ...CacheContinuousQueryTxOffheapValuesTest.java |  32 +
 .../junits/common/GridCommonAbstractTest.java   |   2 +-
 .../ignite/testsuites/IgniteCacheTestSuite.java |   8 +
 .../IgniteCacheQuerySelfTestSuite.java          |  14 +
 .../commands/tasks/VisorTasksCommand.scala      |   4 +-
 .../scala/org/apache/ignite/visor/visor.scala   |   4 +
 32 files changed, 1749 insertions(+), 276 deletions(-)
----------------------------------------------------------------------



[07/22] ignite git commit: IGNITE-2564: CPP: Fixed a bug preventing CPP memory reallocation from Java. This closes #460.

Posted by sb...@apache.org.
IGNITE-2564: CPP: Fixed a bug preventing CPP memory reallocation from Java. This closes #460.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c3aa1375
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c3aa1375
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c3aa1375

Branch: refs/heads/ignite-2407
Commit: c3aa1375171b5f3a97cd50fcd209256e46579b0d
Parents: b7475f0
Author: isapego <is...@gridgain.com>
Authored: Wed Feb 10 15:00:42 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 10 15:00:42 2016 +0300

----------------------------------------------------------------------
 modules/platforms/cpp/core-test/Makefile.am     |  1 +
 .../cpp/core-test/project/vs/core-test.vcxproj  |  1 +
 .../project/vs/core-test.vcxproj.filters        |  3 +
 .../platforms/cpp/core-test/src/cache_test.cpp  | 12 +++
 .../cpp/core-test/src/interop_memory_test.cpp   | 95 ++++++++++++++++++++
 .../include/ignite/impl/ignite_environment.h    | 19 ++--
 .../cpp/core/src/impl/cache/cache_impl.cpp      |  2 +-
 .../cpp/core/src/impl/ignite_environment.cpp    | 30 +++++--
 8 files changed, 149 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c3aa1375/modules/platforms/cpp/core-test/Makefile.am
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/Makefile.am b/modules/platforms/cpp/core-test/Makefile.am
index aa81c65..531fee0 100644
--- a/modules/platforms/cpp/core-test/Makefile.am
+++ b/modules/platforms/cpp/core-test/Makefile.am
@@ -29,6 +29,7 @@ ignite_tests_SOURCES = src/cache_test.cpp \
                          src/cache_query_test.cpp \
                          src/concurrent_test.cpp \
                          src/ignition_test.cpp \
+                         src/interop_memory_test.cpp \
                          src/handle_registry_test.cpp \
                          src/binary_test_defs.cpp \
                          src/binary_reader_writer_raw_test.cpp \

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3aa1375/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj
index 422199a..d98d202 100644
--- a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj
+++ b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj
@@ -40,6 +40,7 @@
     <ClCompile Include="..\..\src\binary_session_test.cpp" />
     <ClCompile Include="..\..\src\binary_test_defs.cpp" />
     <ClCompile Include="..\..\src\cache_query_test.cpp" />
+    <ClCompile Include="..\..\src\interop_memory_test.cpp" />
     <ClCompile Include="..\..\src\teamcity_boost.cpp" />
     <ClCompile Include="..\..\src\teamcity_messages.cpp" />
   </ItemGroup>

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3aa1375/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters
index 32737be..15b9c40 100644
--- a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters
+++ b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters
@@ -34,6 +34,9 @@
     <ClCompile Include="..\..\src\binary_reader_writer_raw_test.cpp">
       <Filter>Code</Filter>
     </ClCompile>
+    <ClCompile Include="..\..\src\interop_memory_test.cpp">
+      <Filter>Code</Filter>
+    </ClCompile>
   </ItemGroup>
   <ItemGroup>
     <ClInclude Include="..\..\include\teamcity_messages.h">

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3aa1375/modules/platforms/cpp/core-test/src/cache_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/src/cache_test.cpp b/modules/platforms/cpp/core-test/src/cache_test.cpp
index 32c5bd6..a11865d 100644
--- a/modules/platforms/cpp/core-test/src/cache_test.cpp
+++ b/modules/platforms/cpp/core-test/src/cache_test.cpp
@@ -476,4 +476,16 @@ BOOST_AUTO_TEST_CASE(TestGetOrCreateCache)
     BOOST_REQUIRE(7 == cache2.Get(5));
 }
 
+BOOST_AUTO_TEST_CASE(TestGetBigString)
+{
+    // Get existing cache
+    cache::Cache<int, std::string> cache = grid0.GetOrCreateCache<int, std::string>("partitioned");
+
+    std::string longStr(impl::IgniteEnvironment::DEFAULT_ALLOCATION_SIZE * 10, 'a');
+
+    cache.Put(5, longStr);
+
+    BOOST_REQUIRE(longStr == cache.Get(5));
+}
+
 BOOST_AUTO_TEST_SUITE_END()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3aa1375/modules/platforms/cpp/core-test/src/interop_memory_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/src/interop_memory_test.cpp b/modules/platforms/cpp/core-test/src/interop_memory_test.cpp
new file mode 100644
index 0000000..07e928c
--- /dev/null
+++ b/modules/platforms/cpp/core-test/src/interop_memory_test.cpp
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _MSC_VER
+    #define BOOST_TEST_DYN_LINK
+#endif
+
+#include <boost/test/unit_test.hpp>
+
+#include "ignite/ignite.h"
+#include "ignite/ignition.h"
+
+using namespace ignite;
+using namespace impl;
+using namespace boost::unit_test;
+
+BOOST_AUTO_TEST_SUITE(MemoryTestSuite)
+
+BOOST_AUTO_TEST_CASE(MemoryReallocationTest)
+{
+    using impl::interop::InteropMemory;
+    using common::concurrent::SharedPointer;
+
+    IgniteEnvironment env;
+
+    SharedPointer<InteropMemory> mem = env.AllocateMemory();
+
+    BOOST_CHECK_EQUAL(mem.Get()->Capacity(), IgniteEnvironment::DEFAULT_ALLOCATION_SIZE);
+
+    BOOST_CHECK(mem.Get()->Data() != NULL);
+
+    // Checking memory for write access.
+    int32_t capBeforeReallocation = mem.Get()->Capacity();
+
+    for (int32_t i = 0; i <capBeforeReallocation; ++i)
+    {
+        int8_t *data = mem.Get()->Data();
+
+        data[i] = static_cast<int8_t>(i);
+    }
+
+    mem.Get()->Reallocate(mem.Get()->Capacity() * 3);
+
+    BOOST_CHECK(mem.Get()->Capacity() >= IgniteEnvironment::DEFAULT_ALLOCATION_SIZE * 3);
+
+    // Checking memory data.
+    for (int32_t i = 0; i <capBeforeReallocation; ++i)
+    {
+        int8_t *data = mem.Get()->Data();
+
+        BOOST_REQUIRE_EQUAL(data[i], static_cast<int8_t>(i));
+    }
+
+    // Checking memory for write access.
+    capBeforeReallocation = mem.Get()->Capacity();
+
+    for (int32_t i = 0; i <capBeforeReallocation; ++i)
+    {
+        int8_t *data = mem.Get()->Data();
+
+        data[i] = static_cast<int8_t>(i + 42);
+    }
+
+    // Trying reallocate memory once more.
+    mem.Get()->Reallocate(mem.Get()->Capacity() * 3);
+
+    // Checking memory data.
+    for (int32_t i = 0; i <capBeforeReallocation; ++i)
+    {
+        int8_t *data = mem.Get()->Data();
+
+        BOOST_REQUIRE_EQUAL(data[i], static_cast<int8_t>(i + 42));
+    }
+
+    BOOST_CHECK(mem.Get()->Capacity() >= IgniteEnvironment::DEFAULT_ALLOCATION_SIZE * 9);
+
+    // Checking memory for write access.
+    memset(mem.Get()->Data(), 0xF0F0F0F0, mem.Get()->Capacity());
+}
+
+BOOST_AUTO_TEST_SUITE_END()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3aa1375/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
index 2fbdb44..02facfc 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
@@ -25,16 +25,21 @@
 #include "binary/binary_type_manager.h"
 
 namespace ignite 
-{    
+{
     namespace impl 
     {
         /**
          * Defines environment in which Ignite operates.
          */
         class IGNITE_IMPORT_EXPORT IgniteEnvironment
-        {                
+        {
         public:
             /**
+             * Default memory block allocation size.
+             */
+            enum { DEFAULT_ALLOCATION_SIZE = 1024 };
+
+            /**
              * Default constructor.
              */
             IgniteEnvironment();
@@ -51,7 +56,7 @@ namespace ignite
              * @return JNI handlers.
              */
             ignite::common::java::JniHandlers GetJniHandlers(ignite::common::concurrent::SharedPointer<IgniteEnvironment>* target);
-                
+
             /**
              * Perform initialization on successful start.
              *
@@ -64,8 +69,8 @@ namespace ignite
              *
              * @param memPtr Memory pointer.
              */
-            void OnStartCallback(long long memPtr);        
-            
+            void OnStartCallback(long long memPtr);
+
             /**
              * Get name of Ignite instance.
              *
@@ -120,11 +125,11 @@ namespace ignite
             char* name;
 
             /** Type manager. */
-            binary::BinaryTypeManager* metaMgr;       
+            binary::BinaryTypeManager* metaMgr;
 
             IGNITE_NO_COPY_ASSIGNMENT(IgniteEnvironment);
         };
-    }    
+    }
 }
 
 #endif
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3aa1375/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
index 08526b5..f66a228 100644
--- a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
@@ -379,7 +379,7 @@ namespace ignite
 
                 if (outPtr)
                 {
-                    env.Get()->Context()->TargetInStreamOutStream(javaRef, opType, WriteTo(outMem.Get(), inOp, err), 
+                    env.Get()->Context()->TargetInStreamOutStream(javaRef, opType, outPtr,
                         inMem.Get()->PointerLong(), &jniErr);
 
                     IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3aa1375/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
index 013a139..c9c57a0 100644
--- a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
+++ b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
@@ -53,7 +53,23 @@ namespace ignite
             SharedPointer<IgniteEnvironment>* ptr = static_cast<SharedPointer<IgniteEnvironment>*>(target);
 
             delete ptr;
-        } 
+        }
+
+        /**
+         * Memory reallocate callback.
+         *
+         * @param target Target environment.
+         * @param memPtr Memory pointer.
+         * @param cap Required capasity.
+         */
+        void IGNITE_CALL MemoryReallocate(void* target, long long memPtr, int cap)
+        {
+            SharedPointer<IgniteEnvironment>* env = static_cast<SharedPointer<IgniteEnvironment>*>(target);
+
+            SharedPointer<InteropMemory> mem = env->Get()->GetMemory(memPtr);
+
+            mem.Get()->Reallocate(cap);
+        }
 
         IgniteEnvironment::IgniteEnvironment() : ctx(SharedPointer<JniContext>()), latch(new SingleLatch), name(NULL),
             metaMgr(new BinaryTypeManager())
@@ -80,18 +96,20 @@ namespace ignite
             hnds.onStart = OnStart;
             hnds.onStop = OnStop;
 
+            hnds.memRealloc = MemoryReallocate;
+
             hnds.error = NULL;
 
             return hnds;
         }
-            
+
         void IgniteEnvironment::Initialize(SharedPointer<JniContext> ctx)
         {
             this->ctx = ctx;
-                
+
             latch->CountDown();
         }
-        
+
         const char* IgniteEnvironment::InstanceName() const
         {
             return name;
@@ -104,7 +122,7 @@ namespace ignite
 
         SharedPointer<InteropMemory> IgniteEnvironment::AllocateMemory()
         {
-            SharedPointer<InteropMemory> ptr(new InteropUnpooledMemory(1024));
+            SharedPointer<InteropMemory> ptr(new InteropUnpooledMemory(DEFAULT_ALLOCATION_SIZE));
 
             return ptr;
         }
@@ -147,7 +165,7 @@ namespace ignite
             InteropInputStream stream(&mem);
 
             BinaryReaderImpl reader(&stream);
-            
+
             int32_t nameLen = reader.ReadString(NULL, 0);
 
             if (nameLen >= 0)


[21/22] ignite git commit: Merge remote-tracking branch 'remotes/origin/master' into ignite-2407

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-2407


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/112e76e6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/112e76e6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/112e76e6

Branch: refs/heads/ignite-2407
Commit: 112e76e6e06b9cda1867dc46f6a8380ca4e2bcff
Parents: 570de20 a32dfc4
Author: sboikov <sb...@gridgain.com>
Authored: Fri Feb 12 15:40:59 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Feb 12 15:40:59 2016 +0300

----------------------------------------------------------------------
 DEVNOTES.txt                                    |   4 +-
 .../apache/ignite/cache/CacheMemoryMode.java    |   2 +
 .../internal/GridMessageListenHandler.java      |  16 +
 .../apache/ignite/internal/IgniteKernal.java    |  14 +-
 .../processors/cache/CacheLazyEntry.java        |   3 +
 .../processors/cache/GridCacheAdapter.java      |   7 +
 .../processors/cache/GridCacheContext.java      |   4 +-
 .../processors/cache/GridCacheMapEntry.java     | 118 +++-
 .../processors/cache/GridCacheProcessor.java    |   8 +-
 .../processors/cache/GridCacheSwapManager.java  |   3 +-
 .../binary/CacheObjectBinaryProcessorImpl.java  |   6 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  79 ++-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  85 ++-
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |  38 +-
 .../cache/query/GridCacheQueryManager.java      |  30 +-
 .../continuous/CacheContinuousQueryHandler.java |   3 +-
 .../CacheContinuousQueryListener.java           |   2 +-
 .../continuous/CacheContinuousQueryManager.java | 120 +++-
 .../continuous/GridContinuousProcessor.java     |  62 +-
 .../internal/processors/igfs/IgfsProcessor.java |  14 +
 .../internal/GridAffinityNoCacheSelfTest.java   |   4 +-
 .../internal/GridNodeMetricsLogSelfTest.java    |  98 +++
 ...eClientReconnectContinuousProcessorTest.java |  32 +-
 .../GridCacheOffHeapValuesEvictionSelfTest.java |  17 +-
 .../IgniteCacheEntryListenerAbstractTest.java   | 454 ++++++++----
 ...cheEntryListenerAtomicOffheapTieredTest.java |  32 +
 ...cheEntryListenerAtomicOffheapValuesTest.java |  32 +
 ...niteCacheEntryListenerExpiredEventsTest.java | 202 ++++++
 ...teCacheEntryListenerTxOffheapTieredTest.java |  32 +
 ...teCacheEntryListenerTxOffheapValuesTest.java |  32 +
 .../cache/IgniteCacheEntryListenerTxTest.java   |   1 +
 .../GridCacheReplicatedPreloadSelfTest.java     | 121 +++-
 ...ContinuousQueryFailoverAbstractSelfTest.java |  10 +
 ...tomicPrimaryWriteOrderOffheapTieredTest.java |  33 +
 ...tinuousQueryFailoverTxOffheapTieredTest.java |  32 +
 ...acheContinuousQueryRandomOperationsTest.java | 684 +++++++++++++++++++
 ...ridCacheContinuousQueryAbstractSelfTest.java |  19 +-
 ...eContinuousQueryAtomicOffheapTieredTest.java |  32 +
 ...eContinuousQueryAtomicOffheapValuesTest.java |  32 +
 ...CacheContinuousQueryTxOffheapTieredTest.java |  32 +
 ...CacheContinuousQueryTxOffheapValuesTest.java |  32 +
 ...IgniteCacheContinuousQueryReconnectTest.java | 192 ++++++
 .../igfs/IgfsProcessorValidationSelfTest.java   |  27 +
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |   2 +-
 .../junits/common/GridCommonAbstractTest.java   |   2 +-
 .../ignite/testsuites/IgniteBasicTestSuite.java |   2 +
 .../ignite/testsuites/IgniteCacheTestSuite.java |   8 +
 .../p2p/CacheDeploymentAffinityKeyMapper.java   |  35 +
 .../CacheDeploymentAlwaysTruePredicate2.java    |  30 +
 ...oymentCacheEntryEventSerializableFilter.java |  32 +
 .../p2p/CacheDeploymentCacheEntryListener.java  |  31 +
 ...CacheDeploymentCachePluginConfiguration.java |  74 ++
 ...heDeploymentStoreSessionListenerFactory.java |  83 +++
 .../hadoop/fs/BasicHadoopFileSystemFactory.java |  22 +-
 .../fs/CachingHadoopFileSystemFactory.java      |   2 +-
 .../fs/KerberosHadoopFileSystemFactory.java     | 217 ++++++
 ...KerberosHadoopFileSystemFactorySelfTest.java | 121 ++++
 .../testsuites/IgniteHadoopTestSuite.java       |   3 +
 .../cache/CacheQueryBuildValueTest.java         | 144 ++++
 .../IgniteCacheQuerySelfTestSuite.java          |  16 +
 modules/platforms/cpp/core-test/Makefile.am     |   1 +
 .../cpp/core-test/project/vs/core-test.vcxproj  |   1 +
 .../project/vs/core-test.vcxproj.filters        |   3 +
 .../platforms/cpp/core-test/src/cache_test.cpp  |  12 +
 .../cpp/core-test/src/interop_memory_test.cpp   |  95 +++
 .../include/ignite/impl/ignite_environment.h    |  19 +-
 .../cpp/core/src/impl/cache/cache_impl.cpp      |   2 +-
 .../cpp/core/src/impl/ignite_environment.cpp    |  30 +-
 .../Common/IgniteConfigurationXmlSerializer.cs  |   7 +-
 .../commands/tasks/VisorTasksCommand.scala      |   4 +-
 .../scala/org/apache/ignite/visor/visor.scala   |   4 +
 71 files changed, 3450 insertions(+), 352 deletions(-)
----------------------------------------------------------------------



[16/22] ignite git commit: Fix R# analysis warnings

Posted by sb...@apache.org.
Fix R# analysis warnings


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/877be93e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/877be93e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/877be93e

Branch: refs/heads/ignite-2407
Commit: 877be93ee35afddcc126a147cfd3cd1dda4a46ce
Parents: 0491a5f
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Thu Feb 11 16:31:28 2016 +0300
Committer: Pavel Tupitsyn <pt...@gridgain.com>
Committed: Thu Feb 11 16:31:28 2016 +0300

----------------------------------------------------------------------
 .../Impl/Common/IgniteConfigurationXmlSerializer.cs           | 7 ++-----
 1 file changed, 2 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/877be93e/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/IgniteConfigurationXmlSerializer.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/IgniteConfigurationXmlSerializer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/IgniteConfigurationXmlSerializer.cs
index af25bfa..c27012a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/IgniteConfigurationXmlSerializer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/IgniteConfigurationXmlSerializer.cs
@@ -23,7 +23,6 @@ namespace Apache.Ignite.Core.Impl.Common
     using System.ComponentModel;
     using System.Configuration;
     using System.Diagnostics;
-    using System.Diagnostics.CodeAnalysis;
     using System.Linq;
     using System.Reflection;
     using System.Xml;
@@ -99,14 +98,13 @@ namespace Apache.Ignite.Core.Impl.Common
         /// <summary>
         /// Writes the property of a basic type (primitives, strings, types).
         /// </summary>
-        [SuppressMessage("ReSharper", "AssignNullToNotNullAttribute")]
         private static void WriteBasicProperty(object obj, XmlWriter writer, Type valueType, PropertyInfo property)
         {
             var converter = GetConverter(property, valueType);
 
             var stringValue = converter.ConvertToInvariantString(obj);
 
-            writer.WriteString(stringValue);
+            writer.WriteString(stringValue ?? "");
         }
 
         /// <summary>
@@ -125,7 +123,6 @@ namespace Apache.Ignite.Core.Impl.Common
         /// <summary>
         /// Writes the complex property (nested object).
         /// </summary>
-        [SuppressMessage("ReSharper", "AssignNullToNotNullAttribute")]
         private static void WriteComplexProperty(object obj, XmlWriter writer, Type valueType)
         {
             var props = GetNonDefaultProperties(obj).ToList();
@@ -139,7 +136,7 @@ namespace Apache.Ignite.Core.Impl.Common
             {
                 var converter = GetConverter(prop, prop.PropertyType);
                 var stringValue = converter.ConvertToInvariantString(prop.GetValue(obj, null));
-                writer.WriteAttributeString(PropertyNameToXmlName(prop.Name), stringValue);
+                writer.WriteAttributeString(PropertyNameToXmlName(prop.Name), stringValue ?? "");
             }
 
             // Write elements


[03/22] ignite git commit: IGNITE-1507 Correct help text for 'tasks' command in visorcmd.

Posted by sb...@apache.org.
IGNITE-1507 Correct help text for 'tasks' command in visorcmd.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0b47d5cb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0b47d5cb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0b47d5cb

Branch: refs/heads/ignite-2407
Commit: 0b47d5cbc6a6bf87e2ab121a0e50d795e3f9ed17
Parents: c67e2ea
Author: Vasiliy Sisko <vs...@gridgain.com>
Authored: Wed Feb 10 17:13:50 2016 +0700
Committer: Andrey <an...@gridgain.com>
Committed: Wed Feb 10 17:13:50 2016 +0700

----------------------------------------------------------------------
 .../apache/ignite/visor/commands/tasks/VisorTasksCommand.scala   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0b47d5cb/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/tasks/VisorTasksCommand.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/tasks/VisorTasksCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/tasks/VisorTasksCommand.scala
index e158506..660c5f1 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/tasks/VisorTasksCommand.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/tasks/VisorTasksCommand.scala
@@ -250,7 +250,7 @@ private case class VisorTask(
  * |       | of Event Storage SPI that is responsible for temporary storage of generated   |
  * |       | events on each node can also affect the functionality of this command.        |
  * |       |                                                                               |
- * |       | By default - all events are enabled and Ignite stores last 10,000 local       |
+ * |       | By default - all events are disabled and Ignite stores last 10,000 local       |
  * |       | events on each node. Both of these defaults can be changed in configuration.  |
  * +---------------------------------------------------------------------------------------+
  * }}}
@@ -1341,7 +1341,7 @@ object VisorTasksCommand {
             "of Event Storage SPI that is responsible for temporary storage of generated",
             "events on each node can also affect the functionality of this command.",
             " ",
-            "By default - all events are enabled and Ignite stores last 10,000 local",
+            "By default - all events are disabled and Ignite stores last 10,000 local",
             "events on each node. Both of these defaults can be changed in configuration."
         ),
         spec = List(


[12/22] ignite git commit: Fixed devnotes.txt for yarn and mesos modules.

Posted by sb...@apache.org.
Fixed devnotes.txt for yarn and mesos modules.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/14824a19
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/14824a19
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/14824a19

Branch: refs/heads/ignite-2407
Commit: 14824a19847091d5599f41eeaf52aa2694b7da87
Parents: 5539cba
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Wed Feb 10 15:46:27 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Feb 10 15:46:27 2016 +0300

----------------------------------------------------------------------
 DEVNOTES.txt | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/14824a19/DEVNOTES.txt
----------------------------------------------------------------------
diff --git a/DEVNOTES.txt b/DEVNOTES.txt
index 3e19243..e920b79 100644
--- a/DEVNOTES.txt
+++ b/DEVNOTES.txt
@@ -110,7 +110,7 @@ cd to ./modules/mesos
 
 mvn clean package
 
-Look for ignite-mesos-<version>-jar-with-dependencies.jar in ./target directory.
+Look for ignite-mesos-<version>.jar in ./target directory.
 
 Ignite Yarn Maven Build Instructions
 ============================================
@@ -118,7 +118,7 @@ cd to ./modules/yarn
 
 mvn clean package
 
-Look for ignite-yarn-<version>-jar-with-dependencies.jar in ./target directory.
+Look for ignite-yarn-<version>.jar in ./target directory.
 
 Run tests
 ==========


[13/22] ignite git commit: Merge remote-tracking branch 'origin/master'

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/master'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d08a7790
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d08a7790
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d08a7790

Branch: refs/heads/ignite-2407
Commit: d08a779083634b2d332961c8ae098e2e2cf041ed
Parents: 008c8cd 14824a1
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Feb 10 15:46:36 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 10 15:46:36 2016 +0300

----------------------------------------------------------------------
 DEVNOTES.txt                                    |   4 +-
 ...niteCacheEntryListenerExpiredEventsTest.java | 202 +++++++++++++++++++
 .../cache/CacheQueryBuildValueTest.java         | 144 +++++++++++++
 3 files changed, 348 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[11/22] ignite git commit: IGNITE-2195: Implemented Hadoop FileSystem factory capable of working with kerberized file systems. This closes #464.

Posted by sb...@apache.org.
IGNITE-2195: Implemented Hadoop FileSystem factory capable of working with kerberized file systems. This closes #464.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/008c8cd3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/008c8cd3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/008c8cd3

Branch: refs/heads/ignite-2407
Commit: 008c8cd3f33b9c2cace43a9d1f2b4e4542fb58fe
Parents: fa3706f
Author: iveselovskiy <iv...@gridgain.com>
Authored: Wed Feb 10 15:45:58 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 10 15:45:58 2016 +0300

----------------------------------------------------------------------
 .../hadoop/fs/BasicHadoopFileSystemFactory.java |  22 +-
 .../fs/CachingHadoopFileSystemFactory.java      |   2 +-
 .../fs/KerberosHadoopFileSystemFactory.java     | 214 +++++++++++++++++++
 ...KerberosHadoopFileSystemFactorySelfTest.java | 104 +++++++++
 .../testsuites/IgniteHadoopTestSuite.java       |   3 +
 5 files changed, 339 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/008c8cd3/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
index c791e9a..01fe6c9 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
@@ -66,7 +66,7 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
 
     /** {@inheritDoc} */
     @Override public FileSystem get(String usrName) throws IOException {
-        return create0(IgfsUtils.fixUserName(usrName));
+        return get0(IgfsUtils.fixUserName(usrName));
     }
 
     /**
@@ -76,7 +76,7 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
      * @return File system.
      * @throws IOException If failed.
      */
-    protected FileSystem create0(String usrName) throws IOException {
+    protected FileSystem get0(String usrName) throws IOException {
         assert cfg != null;
 
         try {
@@ -87,12 +87,12 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
             ClassLoader clsLdr = getClass().getClassLoader();
 
             if (ctxClsLdr == clsLdr)
-                return FileSystem.get(fullUri, cfg, usrName);
+                return create(usrName);
             else {
                 Thread.currentThread().setContextClassLoader(clsLdr);
 
                 try {
-                    return FileSystem.get(fullUri, cfg, usrName);
+                    return create(usrName);
                 }
                 finally {
                     Thread.currentThread().setContextClassLoader(ctxClsLdr);
@@ -107,6 +107,18 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
     }
 
     /**
+     * Internal file system creation routine, invoked in correct class loader context.
+     *
+     * @param usrName User name.
+     * @return File system.
+     * @throws IOException If failed.
+     * @throws InterruptedException if the current thread is interrupted.
+     */
+    protected FileSystem create(String usrName) throws IOException, InterruptedException {
+        return FileSystem.get(fullUri, cfg, usrName);
+    }
+
+    /**
      * Gets file system URI.
      * <p>
      * This URI will be used as a first argument when calling {@link FileSystem#get(URI, Configuration, String)}.
@@ -152,7 +164,7 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
      *
      * @param cfgPaths Paths to file system configuration files.
      */
-    public void setConfigPaths(String... cfgPaths) {
+    public void setConfigPaths(@Nullable String... cfgPaths) {
         this.cfgPaths = cfgPaths;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/008c8cd3/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
index 91f7777..e1b30c4 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
@@ -47,7 +47,7 @@ public class CachingHadoopFileSystemFactory extends BasicHadoopFileSystemFactory
     private final transient HadoopLazyConcurrentMap<String, FileSystem> cache = new HadoopLazyConcurrentMap<>(
         new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() {
             @Override public FileSystem createValue(String key) throws IOException {
-                return create0(key);
+                return get0(key);
             }
         }
     );

http://git-wip-us.apache.org/repos/asf/ignite/blob/008c8cd3/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
new file mode 100644
index 0000000..fc768d6
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.fs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * Secure Hadoop file system factory that can work with underlying file system protected with Kerberos.
+ * It uses "impersonation" mechanism, to be able to work on behalf of arbitrary client user.
+ * Please see https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html for details.
+ * The principal and the key tab name to be used for Kerberos authentication are set explicitly
+ * in the factory configuration.
+ *
+ * <p>This factory does not cache any file system instances. If {@code "fs.[prefix].impl.disable.cache"} is set
+ * to {@code true}, file system instances will be cached by Hadoop.
+ */
+public class KerberosHadoopFileSystemFactory extends BasicHadoopFileSystemFactory {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** The default interval used to re-login from the key tab, in milliseconds. */
+    public static final long DFLT_RELOGIN_INTERVAL = 10 * 60 * 1000L;
+
+    /** Keytab full file name. */
+    private String keyTab;
+
+    /** Keytab principal. */
+    private String keyTabPrincipal;
+
+    /** The re-login interval. See {@link #getReloginInterval()} for more information. */
+    private long reloginInterval = DFLT_RELOGIN_INTERVAL;
+
+    /** Time of last re-login attempt, in system milliseconds. */
+    private transient volatile long lastReloginTime;
+
+    /**
+     * Constructor.
+     */
+    public KerberosHadoopFileSystemFactory() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileSystem get(String userName) throws IOException {
+        reloginIfNeeded();
+
+        return super.get(userName);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected FileSystem create(String usrName) throws IOException, InterruptedException {
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUser(usrName,
+            UserGroupInformation.getLoginUser());
+
+        return proxyUgi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+            @Override public FileSystem run() throws Exception {
+                return FileSystem.get(fullUri, cfg);
+            }
+        });
+    }
+
+    /**
+     * Gets the key tab principal short name (e.g. "hdfs").
+     *
+     * @return The key tab principal.
+     */
+    @Nullable public String getKeyTabPrincipal() {
+        return keyTabPrincipal;
+    }
+
+    /**
+     * Set the key tab principal name. See {@link #getKeyTabPrincipal()} for more information.
+     *
+     * @param keyTabPrincipal The key tab principal name.
+     */
+    public void setKeyTabPrincipal(@Nullable String keyTabPrincipal) {
+        this.keyTabPrincipal = keyTabPrincipal;
+    }
+
+    /**
+     * Gets the key tab full file name (e.g. "/etc/security/keytabs/hdfs.headless.keytab" or "/etc/krb5.keytab").
+     *
+     * @return The key tab file name.
+     */
+    @Nullable public String getKeyTab() {
+        return keyTab;
+    }
+
+    /**
+     * Sets the key tab file name. See {@link #getKeyTab()} for more information.
+     *
+     * @param keyTab The key tab file name.
+     */
+    public void setKeyTab(@Nullable String keyTab) {
+        this.keyTab = keyTab;
+    }
+
+    /**
+     * The interval used to re-login from the key tab, in milliseconds.
+     * Important that the value should not be larger than the Kerberos ticket life time multiplied by 0.2. This is
+     * because the ticket renew window starts from {@code 0.8 * ticket life time}.
+     * Default ticket life time is 1 day (24 hours), so the default re-login interval (10 min)
+     * is obeys this rule well.
+     *
+     * <p>Zero value means that re-login should be attempted on each file system operation.
+     * Negative values are not allowed.
+     *
+     * <p>Note, however, that it does not make sense to make this value small, because Hadoop does not allow to
+     * login if less than {@link org.apache.hadoop.security.UserGroupInformation#MIN_TIME_BEFORE_RELOGIN} milliseconds
+     * have passed since the time of the previous login.
+     * See {@link org.apache.hadoop.security.UserGroupInformation#hasSufficientTimeElapsed(long)} and its usages for
+     * more detail.
+     *
+     * @return The re-login interval, in milliseconds.
+     */
+    public long getReloginInterval() {
+        return reloginInterval;
+    }
+
+    /**
+     * Sets the relogin interval in milliseconds. See {@link #getReloginInterval()} for more information.
+     *
+     * @param reloginInterval The re-login interval, in milliseconds.
+     */
+    public void setReloginInterval(long reloginInterval) {
+        this.reloginInterval = reloginInterval;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteException {
+        A.ensure(!F.isEmpty(keyTab), "keyTab cannot not be empty.");
+        A.ensure(!F.isEmpty(keyTabPrincipal), "keyTabPrincipal cannot not be empty.");
+        A.ensure(reloginInterval >= 0, "reloginInterval cannot not be negative.");
+
+        super.start();
+
+        try {
+            UserGroupInformation.setConfiguration(cfg);
+            UserGroupInformation.loginUserFromKeytab(keyTabPrincipal, keyTab);
+        }
+        catch (IOException ioe) {
+            throw new IgniteException("Failed login from keytab [keyTab=" + keyTab +
+                ", keyTabPrincipal=" + keyTabPrincipal + ']', ioe);
+        }
+    }
+
+    /**
+     * Re-logins the user if needed.
+     * First, the re-login interval defined in factory is checked. The re-login attempts will be not more
+     * frequent than one attempt per {@code reloginInterval}.
+     * Second, {@link UserGroupInformation#checkTGTAndReloginFromKeytab()} method invoked that gets existing
+     * TGT and checks its validity. If the TGT is expired or is close to expiry, it performs re-login.
+     *
+     * <p>This operation expected to be called upon each operation with the file system created with the factory.
+     * As long as {@link #get(String)} operation is invoked upon each file {@link IgniteHadoopFileSystem}, there
+     * is no need to invoke it otherwise specially.
+     *
+     * @throws IOException If login fails.
+     */
+    private void reloginIfNeeded() throws IOException {
+        long now = System.currentTimeMillis();
+
+        if (now >= lastReloginTime + reloginInterval) {
+            UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
+
+            lastReloginTime = now;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+
+        U.writeString(out, keyTab);
+        U.writeString(out, keyTabPrincipal);
+        out.writeLong(reloginInterval);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        super.readExternal(in);
+
+        keyTab = U.readString(in);
+        keyTabPrincipal = U.readString(in);
+        reloginInterval = in.readLong();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/008c8cd3/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java
new file mode 100644
index 0000000..8fb1612
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java
@@ -0,0 +1,104 @@
+package org.apache.ignite.hadoop.fs;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.concurrent.Callable;
+
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+
+/**
+ * Tests KerberosHadoopFileSystemFactory.
+ */
+public class KerberosHadoopFileSystemFactorySelfTest extends GridCommonAbstractTest {
+    /**
+     * Test parameters validation.
+     *
+     * @throws Exception If failed.
+     */
+    public void testParameters() throws Exception {
+        checkParameters(null, null, -1);
+
+        checkParameters(null, null, 100);
+        checkParameters(null, "b", -1);
+        checkParameters("a", null, -1);
+
+        checkParameters(null, "b", 100);
+        checkParameters("a", null, 100);
+        checkParameters("a", "b", -1);
+    }
+
+    /**
+     * Check parameters.
+     *
+     * @param keyTab Key tab.
+     * @param keyTabPrincipal Key tab principal.
+     * @param reloginInterval Re-login interval.
+     */
+    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+    private void checkParameters(String keyTab, String keyTabPrincipal, long reloginInterval) {
+        final KerberosHadoopFileSystemFactory fac = new KerberosHadoopFileSystemFactory();
+
+        fac.setKeyTab(keyTab);
+        fac.setKeyTabPrincipal(keyTabPrincipal);
+        fac.setReloginInterval(reloginInterval);
+
+        GridTestUtils.assertThrows(null, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                fac.start();
+
+                return null;
+            }
+        }, IllegalArgumentException.class, null);
+    }
+
+    /**
+     * Checks serializatuion and deserialization of the secure factory.
+     *
+     * @throws Exception If failed.
+     */
+    public void testSerialization() throws Exception {
+        KerberosHadoopFileSystemFactory fac = new KerberosHadoopFileSystemFactory();
+
+        checkSerialization(fac);
+
+        fac = new KerberosHadoopFileSystemFactory();
+
+        fac.setUri("igfs://igfs@localhost:10500/");
+        fac.setConfigPaths("/a/core-sute.xml", "/b/mapred-site.xml");
+        fac.setKeyTabPrincipal("foo");
+        fac.setKeyTab("/etc/krb5.keytab");
+        fac.setReloginInterval(30 * 60 * 1000L);
+
+        checkSerialization(fac);
+    }
+
+    /**
+     * Serializes the factory,
+     *
+     * @param fac The facory to check.
+     * @throws Exception If failed.
+     */
+    private void checkSerialization(KerberosHadoopFileSystemFactory fac) throws Exception {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        ObjectOutput oo = new ObjectOutputStream(baos);
+
+        oo.writeObject(fac);
+
+        ObjectInput in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+
+        KerberosHadoopFileSystemFactory fac2 = (KerberosHadoopFileSystemFactory)in.readObject();
+
+        assertEquals(fac.getUri(), fac2.getUri());
+        Assert.assertArrayEquals(fac.getConfigPaths(), fac2.getConfigPaths());
+        assertEquals(fac.getKeyTab(), fac2.getKeyTab());
+        assertEquals(fac.getKeyTabPrincipal(), fac2.getKeyTabPrincipal());
+        assertEquals(fac.getReloginInterval(), fac2.getReloginInterval());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/008c8cd3/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 9092f32..acd255c 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -35,6 +35,7 @@ import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.client.hadoop.HadoopClientProtocolEmbeddedSelfTest;
 import org.apache.ignite.client.hadoop.HadoopClientProtocolSelfTest;
+import org.apache.ignite.hadoop.fs.KerberosHadoopFileSystemFactorySelfTest;
 import org.apache.ignite.igfs.Hadoop1OverIgfsDualAsyncTest;
 import org.apache.ignite.igfs.Hadoop1OverIgfsDualSyncTest;
 import org.apache.ignite.igfs.HadoopFIleSystemFactorySelfTest;
@@ -101,6 +102,8 @@ public class IgniteHadoopTestSuite extends TestSuite {
 
         TestSuite suite = new TestSuite("Ignite Hadoop MR Test Suite");
 
+        suite.addTest(new TestSuite(ldr.loadClass(KerberosHadoopFileSystemFactorySelfTest.class.getName())));
+
         suite.addTest(new TestSuite(ldr.loadClass(HadoopSnappyTest.class.getName())));
         suite.addTest(new TestSuite(ldr.loadClass(HadoopSnappyFullMapReduceTest.class.getName())));
 


[05/22] ignite git commit: ignite-2587 Fixed continuous query notifications in offheap mode and BinaryObjectOffheapImpl usage.

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index 27edb0c..e6bfd87 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -56,6 +57,7 @@ import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
@@ -70,6 +72,7 @@ import static javax.cache.event.EventType.CREATED;
 import static javax.cache.event.EventType.EXPIRED;
 import static javax.cache.event.EventType.REMOVED;
 import static javax.cache.event.EventType.UPDATED;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
 import static org.apache.ignite.cache.CacheMode.LOCAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -79,7 +82,7 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED;
  */
 public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAbstractTest {
     /** */
-    private static volatile List<CacheEntryEvent<? extends Integer, ? extends Integer>> evts;
+    private static volatile List<CacheEntryEvent<?, ?>> evts;
 
     /** */
     private static volatile CountDownLatch evtsLatch;
@@ -91,7 +94,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     private Integer lastKey = 0;
 
     /** */
-    private CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg;
+    private CacheEntryListenerConfiguration<Object, Object> lsnrCfg;
+
+    /** */
+    private boolean useObjects;
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
@@ -103,9 +109,18 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
         cfg.setEagerTtl(eagerTtl());
 
+        cfg.setMemoryMode(memoryMode());
+
         return cfg;
     }
 
+    /**
+     * @return Cache memory mode.
+     */
+    protected CacheMemoryMode memoryMode() {
+        return ONHEAP_TIERED;
+    }
+
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         super.afterTest();
@@ -129,9 +144,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      * @throws Exception If failed.
      */
     public void testExceptionIgnored() throws Exception {
-        CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
-            new Factory<CacheEntryListener<Integer, Integer>>() {
-                @Override public CacheEntryListener<Integer, Integer> create() {
+        CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+            new Factory<CacheEntryListener<Object, Object>>() {
+                @Override public CacheEntryListener<Object, Object> create() {
                     return new ExceptionListener();
                 }
             },
@@ -140,7 +155,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
             false
         );
 
-        IgniteCache<Integer, Integer> cache = jcache();
+        IgniteCache<Object, Object> cache = jcache();
 
         cache.registerCacheEntryListener(lsnrCfg);
 
@@ -158,13 +173,13 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         }
 
         lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
-            new Factory<CacheEntryListener<Integer, Integer>>() {
-                @Override public CacheEntryListener<Integer, Integer> create() {
+            new Factory<CacheEntryListener<Object, Object>>() {
+                @Override public CacheEntryListener<Object, Object> create() {
                     return new CreateUpdateRemoveExpireListener();
                 }
             },
-            new Factory<CacheEntryEventSerializableFilter<? super Integer, ? super Integer>>() {
-                @Override public CacheEntryEventSerializableFilter<? super Integer, ? super Integer> create() {
+            new Factory<CacheEntryEventSerializableFilter<Object, Object>>() {
+                @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
                     return new ExceptionFilter();
                 }
             },
@@ -192,9 +207,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      * @throws Exception If failed.
      */
     public void testNoOldValue() throws Exception {
-        CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
-            new Factory<CacheEntryListener<Integer, Integer>>() {
-                @Override public CacheEntryListener<Integer, Integer> create() {
+        CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+            new Factory<CacheEntryListener<Object, Object>>() {
+                @Override public CacheEntryListener<Object, Object> create() {
                     return new CreateUpdateRemoveExpireListener();
                 }
             },
@@ -203,7 +218,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
             true
         );
 
-        IgniteCache<Integer, Integer> cache = jcache();
+        IgniteCache<Object, Object> cache = jcache();
 
         try {
             for (Integer key : keys()) {
@@ -222,21 +237,30 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      * @throws Exception If failed.
      */
+    public void testSynchronousEventsObjectKeyValue() throws Exception {
+        useObjects = true;
+
+        testSynchronousEvents();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testSynchronousEvents() throws Exception {
-        final CacheEntryCreatedListener<Integer, Integer> lsnr = new CreateUpdateRemoveExpireListener() {
-            @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+        final CacheEntryCreatedListener<Object, Object> lsnr = new CreateUpdateRemoveExpireListener() {
+            @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) {
                 super.onRemoved(evts);
 
                 awaitLatch();
             }
 
-            @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+            @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
                 super.onCreated(evts);
 
                 awaitLatch();
             }
 
-            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
                 super.onUpdated(evts);
 
                 awaitLatch();
@@ -252,9 +276,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
             }
         };
 
-        CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
-            new Factory<CacheEntryListener<Integer, Integer>>() {
-                @Override public CacheEntryListener<Integer, Integer> create() {
+        CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+            new Factory<CacheEntryListener<Object, Object>>() {
+                @Override public CacheEntryListener<Object, Object> create() {
                     return lsnr;
                 }
             },
@@ -263,7 +287,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
             true
         );
 
-        IgniteCache<Integer, Integer> cache = jcache();
+        IgniteCache<Object, Object> cache = jcache();
 
         cache.registerCacheEntryListener(lsnrCfg);
 
@@ -299,7 +323,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
                 if (!eagerTtl()) {
                     U.sleep(1100);
 
-                    assertNull(primaryCache(key, cache.getName()).get(key));
+                    assertNull(primaryCache(key, cache.getName()).get(key(key)));
 
                     evtsLatch.await(5000, MILLISECONDS);
 
@@ -378,13 +402,13 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
         final CyclicBarrier barrier = new CyclicBarrier(THREADS);
 
-        final IgniteCache<Integer, Integer> cache = jcache(0);
+        final IgniteCache<Object, Object> cache = jcache(0);
 
         GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
             @Override public Void call() throws Exception {
-                CacheEntryListenerConfiguration<Integer, Integer> cfg = new MutableCacheEntryListenerConfiguration<>(
-                    new Factory<CacheEntryListener<Integer, Integer>>() {
-                        @Override public CacheEntryListener<Integer, Integer> create() {
+                CacheEntryListenerConfiguration<Object, Object> cfg = new MutableCacheEntryListenerConfiguration<>(
+                    new Factory<CacheEntryListener<Object, Object>>() {
+                        @Override public CacheEntryListener<Object, Object> create() {
                             return new CreateUpdateRemoveExpireListener();
                         }
                     },
@@ -441,9 +465,13 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      * @param expEvts Expected events number.
      * @throws Exception If failed.
      */
-    private void syncEvent(Integer key, Integer val, IgniteCache<Integer, Integer> cache, int expEvts)
+    private void syncEvent(
+        Integer key,
+        Integer val,
+        IgniteCache<Object, Object> cache,
+        int expEvts)
         throws Exception {
-        evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<? extends Integer, ? extends Integer>>());
+        evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>());
 
         evtsLatch = new CountDownLatch(expEvts);
 
@@ -466,9 +494,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         });
 
         if (val != null)
-            cache.put(key, val);
+            cache.put(key(key), value(val));
         else
-            cache.remove(key);
+            cache.remove(key(key));
 
         done.set(true);
 
@@ -480,15 +508,45 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     }
 
     /**
+     * @param key Integer key.
+     * @return Key instance.
+     */
+    private Object key(Integer key) {
+        assert key != null;
+
+        return useObjects ? new ListenerTestKey(key) : key;
+    }
+
+    /**
+     * @param val Integer value.
+     * @return Value instance.
+     */
+    private Object value(Integer val) {
+        if (val == null)
+            return null;
+
+        return useObjects ? new ListenerTestValue(val) : val;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEventsObjectKeyValue() throws Exception {
+        useObjects = true;
+
+        testEvents();
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testEvents() throws Exception {
-        IgniteCache<Integer, Integer> cache = jcache();
+        IgniteCache<Object, Object> cache = jcache();
 
-        Map<Integer, Integer> vals = new HashMap<>();
+        Map<Object, Object> vals = new HashMap<>();
 
         for (int i = 0; i < 100; i++)
-            vals.put(i + 2_000_000, i);
+            vals.put(key(i + 2_000_000), value(i));
 
         cache.putAll(vals); // Put some data in cache to make sure events are not generated for existing entries.
 
@@ -518,7 +576,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
             checkEvents(cache, new CreateUpdateRemoveExpireListenerFactory(), key, true, true, true, true);
         }
 
-        CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+        CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
             new CreateUpdateRemoveExpireListenerFactory(),
             new TestFilterFactory(),
             true,
@@ -551,7 +609,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      * @throws Exception If failed.
      */
     @SuppressWarnings("unchecked")
-    private void checkListenerOnStart(Map<Integer, Integer> vals) throws Exception {
+    private void checkListenerOnStart(Map<Object, Object> vals) throws Exception {
         lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
             new CreateUpdateRemoveExpireListenerFactory(),
             null,
@@ -564,7 +622,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         try {
             awaitPartitionMapExchange();
 
-            IgniteCache<Integer, Integer> cache = grid.cache(null);
+            IgniteCache<Object, Object> cache = grid.cache(null);
 
             Integer key = Integer.MAX_VALUE;
 
@@ -588,7 +646,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         try {
             awaitPartitionMapExchange();
 
-            IgniteCache<Integer, Integer> cache = grid.cache(null);
+            IgniteCache<Object, Object> cache = grid.cache(null);
 
             log.info("Check filter for listener in configuration.");
 
@@ -613,14 +671,14 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      * @throws Exception If failed.
      */
     private void checkEvents(
-        final IgniteCache<Integer, Integer> cache,
-        final Factory<CacheEntryListener<Integer, Integer>> lsnrFactory,
+        final IgniteCache<Object, Object> cache,
+        final Factory<CacheEntryListener<Object, Object>> lsnrFactory,
         Integer key,
         boolean create,
         boolean update,
         boolean rmv,
         boolean expire) throws Exception {
-        CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+        CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
             lsnrFactory,
             null,
             true,
@@ -642,8 +700,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      * @param vals Values in cache.
      * @throws Exception If failed.
      */
-    private void checkFilter(final IgniteCache<Integer, Integer> cache, Map<Integer, Integer> vals) throws Exception {
-        evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<? extends Integer, ? extends Integer>>());
+    private void checkFilter(final IgniteCache<Object, Object> cache, Map<Object, Object> vals) throws Exception {
+        evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>());
 
         final int expEvts = (vals.size() / 2) * 4; // Remove, create, update and expire for half of modified entries.
 
@@ -653,16 +711,18 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
         cache.putAll(vals);
 
-        final Map<Integer, Integer> newVals = new HashMap<>();
+        final Map<Object, Object> newVals = new HashMap<>();
 
-        for (Integer key : vals.keySet())
-            newVals.put(key, -1);
+        for (Object key : vals.keySet())
+            newVals.put(key, value(-1));
 
         cache.withExpiryPolicy(new ModifiedExpiryPolicy(new Duration(MILLISECONDS, 500))).putAll(newVals);
 
+        U.sleep(1000);
+
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
-                for (Integer key : newVals.keySet()) {
+                for (Object key : newVals.keySet()) {
                     if (primaryCache(key, cache.getName()).get(key) != null)
                         return false;
                 }
@@ -675,13 +735,20 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
         assertEquals(expEvts, evts.size());
 
-        Set<Integer> rmvd = new HashSet<>();
-        Set<Integer> created = new HashSet<>();
-        Set<Integer> updated = new HashSet<>();
-        Set<Integer> expired = new HashSet<>();
+        Set<Object> rmvd = new HashSet<>();
+        Set<Object> created = new HashSet<>();
+        Set<Object> updated = new HashSet<>();
+        Set<Object> expired = new HashSet<>();
+
+        for (CacheEntryEvent<?, ?> evt : evts) {
+            Integer key;
+
+            if (useObjects)
+                key = ((ListenerTestKey)evt.getKey()).key;
+            else
+                key = (Integer)evt.getKey();
 
-        for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) {
-            assertTrue(evt.getKey() % 2 == 0);
+            assertTrue(key % 2 == 0);
 
             assertTrue(vals.keySet().contains(evt.getKey()));
 
@@ -707,7 +774,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
                     break;
 
                 case UPDATED:
-                    assertEquals(-1, (int)evt.getValue());
+                    assertEquals(value(-1), evt.getValue());
 
                     assertEquals(vals.get(evt.getKey()), evt.getOldValue());
 
@@ -722,7 +789,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
                 case EXPIRED:
                     assertNull(evt.getValue());
 
-                    assertEquals(-1, (int)evt.getOldValue());
+                    assertEquals(value(-1), evt.getOldValue());
 
                     assertTrue(rmvd.contains(evt.getKey()));
 
@@ -757,8 +824,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      * @throws Exception If failed.
      */
     private void checkEvents(
-        final IgniteCache<Integer, Integer> cache,
-        final CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg,
+        final IgniteCache<Object, Object> cache,
+        final CacheEntryListenerConfiguration<Object, Object> lsnrCfg,
         Integer key,
         boolean create,
         boolean update,
@@ -789,64 +856,64 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         if (expire)
             expEvts += 2;
 
-        evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<? extends Integer, ? extends Integer>>());
+        evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>());
 
         evtsLatch = new CountDownLatch(expEvts);
 
-        cache.put(key, 0);
+        cache.put(key(key), value(0));
 
         for (int i = 0; i < UPDATES; i++) {
             if (i % 2 == 0)
-                cache.put(key, i + 1);
+                cache.put(key(key), value(i + 1));
             else
-                cache.invoke(key, new EntrySetValueProcessor(i + 1));
+                cache.invoke(key(key), new EntrySetValueProcessor(value(i + 1)));
         }
 
         // Invoke processor does not update value, should not trigger event.
-        assertEquals(String.valueOf(UPDATES), cache.invoke(key, new EntryToStringProcessor()));
+        assertEquals(String.valueOf(UPDATES), cache.invoke(key(key), new EntryToStringProcessor()));
 
-        assertFalse(cache.putIfAbsent(key, -1));
+        assertFalse(cache.putIfAbsent(key(key), value(-1)));
 
-        assertFalse(cache.remove(key, -1));
+        assertFalse(cache.remove(key(key), value(-1)));
 
-        assertTrue(cache.remove(key));
+        assertTrue(cache.remove(key(key)));
 
-        IgniteCache<Integer, Integer> expirePlcCache =
+        IgniteCache<Object, Object> expirePlcCache =
             cache.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(MILLISECONDS, 100)));
 
-        expirePlcCache.put(key, 10);
+        expirePlcCache.put(key(key), value(10));
 
         U.sleep(700);
 
         if (!eagerTtl())
-            assertNull(primaryCache(key, cache.getName()).get(key)); // Provoke expire event if eager ttl is disabled.
+            assertNull(primaryCache(key, cache.getName()).get(key(key))); // Provoke expire event if eager ttl is disabled.
 
-        IgniteCache<Integer, Integer> cache1 = cache;
+        IgniteCache<Object, Object> cache1 = cache;
 
         if (gridCount() > 1)
             cache1 = jcache(1); // Do updates from another node.
 
-        cache1.put(key, 1);
+        cache1.put(key(key), value(1));
 
-        cache1.put(key, 2);
+        cache1.put(key(key), value(2));
 
-        assertTrue(cache1.remove(key));
+        assertTrue(cache1.remove(key(key)));
 
-        IgniteCache<Integer, Integer> expirePlcCache1 =
+        IgniteCache<Object, Object> expirePlcCache1 =
             cache1.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(MILLISECONDS, 100)));
 
-        expirePlcCache1.put(key, 20);
+        expirePlcCache1.put(key(key), value(20));
 
         U.sleep(200);
 
         if (!eagerTtl())
-            assertNull(primaryCache(key, cache.getName()).get(key)); // Provoke expire event if eager ttl is disabled.
+            assertNull(primaryCache(key, cache.getName()).get(key(key))); // Provoke expire event if eager ttl is disabled.
 
         evtsLatch.await(5000, MILLISECONDS);
 
         assertEquals(expEvts, evts.size());
 
-        Iterator<CacheEntryEvent<? extends Integer, ? extends Integer>> iter = evts.iterator();
+        Iterator<CacheEntryEvent<?, ?>> iter = evts.iterator();
 
         if (create)
             checkEvent(iter, key, CREATED, 0, null);
@@ -886,11 +953,11 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
         cache.deregisterCacheEntryListener(lsnrCfg);
 
-        cache.put(key, 1);
+        cache.put(key(key), value(1));
 
-        cache.put(key, 2);
+        cache.put(key(key), value(2));
 
-        assertTrue(cache.remove(key));
+        assertTrue(cache.remove(key(key)));
 
         U.sleep(500); // Sleep some time to ensure listener was really removed.
 
@@ -908,26 +975,26 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      * @param expVal Expected value.
      * @param expOld Expected old value.
      */
-    private void checkEvent(Iterator<CacheEntryEvent<? extends Integer, ? extends Integer>> iter,
+    private void checkEvent(Iterator<CacheEntryEvent<?, ?>> iter,
         Integer expKey,
         EventType expType,
         @Nullable Integer expVal,
         @Nullable Integer expOld) {
         assertTrue(iter.hasNext());
 
-        CacheEntryEvent<? extends Integer, ? extends Integer> evt = iter.next();
+        CacheEntryEvent<?, ?> evt = iter.next();
 
         iter.remove();
 
         assertTrue(evt.getSource() instanceof IgniteCacheProxy);
 
-        assertEquals(expKey, evt.getKey());
+        assertEquals(key(expKey), evt.getKey());
 
         assertEquals(expType, evt.getEventType());
 
-        assertEquals(expVal, evt.getValue());
+        assertEquals(value(expVal), evt.getValue());
 
-        assertEquals(expOld, evt.getOldValue());
+        assertEquals(value(expOld), evt.getOldValue());
 
         if (expOld == null)
             assertFalse(evt.isOldValueAvailable());
@@ -977,7 +1044,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      * @param evt Event.
      */
-    private static void onEvent(CacheEntryEvent<? extends Integer, ? extends Integer> evt) {
+    private static void onEvent(CacheEntryEvent<?, ?> evt) {
         // System.out.println("Received event [evt=" + evt + ", thread=" + Thread.currentThread().getName() + ']');
 
         assertNotNull(evt);
@@ -993,9 +1060,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class CreateUpdateRemoveExpireListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+    private static class CreateUpdateRemoveExpireListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryListener<Integer, Integer> create() {
+        @Override public CacheEntryListener<Object, Object> create() {
             return new CreateUpdateRemoveExpireListener();
         }
     }
@@ -1003,9 +1070,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class NoOpCreateUpdateListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+    private static class NoOpCreateUpdateListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryListener<Integer, Integer> create() {
+        @Override public CacheEntryListener<Object, Object> create() {
             return new NoOpCreateUpdateListener();
         }
     }
@@ -1013,9 +1080,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class CreateUpdateListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+    private static class CreateUpdateListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryListener<Integer, Integer> create() {
+        @Override public CacheEntryListener<Object, Object> create() {
             return new CreateUpdateListener();
         }
     }
@@ -1023,9 +1090,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class CreateListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+    private static class CreateListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryListener<Integer, Integer> create() {
+        @Override public CacheEntryListener<Object, Object> create() {
             return new CreateListener();
         }
     }
@@ -1033,9 +1100,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class RemoveListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+    private static class RemoveListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryListener<Integer, Integer> create() {
+        @Override public CacheEntryListener<Object, Object> create() {
             return new RemoveListener();
         }
     }
@@ -1043,9 +1110,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class UpdateListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+    private static class UpdateListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryListener<Integer, Integer> create() {
+        @Override public CacheEntryListener<Object, Object> create() {
             return new UpdateListener();
         }
     }
@@ -1053,9 +1120,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class ExpireListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> {
+    private static class ExpireListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryListener<Integer, Integer> create() {
+        @Override public CacheEntryListener<Object, Object> create() {
             return new ExpireListener();
         }
     }
@@ -1063,9 +1130,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class TestFilterFactory implements Factory<CacheEntryEventSerializableFilter<Integer, Integer>> {
+    private static class TestFilterFactory implements Factory<CacheEntryEventSerializableFilter<Object, Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryEventSerializableFilter<Integer, Integer> create() {
+        @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
             return new TestFilter();
         }
     }
@@ -1073,10 +1140,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class CreateListener implements CacheEntryCreatedListener<Integer, Integer> {
+    private static class CreateListener implements CacheEntryCreatedListener<Object, Object> {
         /** {@inheritDoc} */
-        @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+        @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
+            for (CacheEntryEvent<?, ?> evt : evts)
                 onEvent(evt);
         }
     }
@@ -1084,10 +1151,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class UpdateListener implements CacheEntryUpdatedListener<Integer, Integer> {
+    private static class UpdateListener implements CacheEntryUpdatedListener<Object, Object> {
         /** {@inheritDoc} */
-        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+            for (CacheEntryEvent<?, ?> evt : evts)
                 onEvent(evt);
         }
     }
@@ -1095,10 +1162,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class RemoveListener implements CacheEntryRemovedListener<Integer, Integer> {
+    private static class RemoveListener implements CacheEntryRemovedListener<Object, Object> {
         /** {@inheritDoc} */
-        @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+        @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) {
+            for (CacheEntryEvent<?, ?> evt : evts)
                 onEvent(evt);
         }
     }
@@ -1106,10 +1173,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class ExpireListener implements CacheEntryExpiredListener<Integer, Integer> {
+    private static class ExpireListener implements CacheEntryExpiredListener<Object, Object> {
         /** {@inheritDoc} */
-        @Override public void onExpired(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+        @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts) {
+            for (CacheEntryEvent<?, ?> evt : evts)
                 onEvent(evt);
         }
     }
@@ -1117,32 +1184,39 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class TestFilter implements CacheEntryEventSerializableFilter<Integer, Integer> {
+    private static class TestFilter implements CacheEntryEventSerializableFilter<Object, Object> {
         /** {@inheritDoc} */
-        @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) {
+        @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
             assert evt != null;
             assert evt.getSource() != null : evt;
             assert evt.getEventType() != null : evt;
             assert evt.getKey() != null : evt;
 
-            return evt.getKey() % 2 == 0;
+            Integer key;
+
+            if (evt.getKey() instanceof ListenerTestKey)
+                key = ((ListenerTestKey)evt.getKey()).key;
+            else
+                key = (Integer)evt.getKey();
+
+            return key % 2 == 0;
         }
     }
 
     /**
      *
      */
-    private static class CreateUpdateListener implements CacheEntryCreatedListener<Integer, Integer>,
-        CacheEntryUpdatedListener<Integer, Integer> {
+    private static class CreateUpdateListener implements CacheEntryCreatedListener<Object, Object>,
+        CacheEntryUpdatedListener<Object, Object> {
         /** {@inheritDoc} */
-        @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+        @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
+            for (CacheEntryEvent<?, ?> evt : evts)
                 onEvent(evt);
         }
 
         /** {@inheritDoc} */
-        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+            for (CacheEntryEvent<?, ?> evt : evts)
                 onEvent(evt);
         }
     }
@@ -1150,11 +1224,11 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class NoOpCreateUpdateListener implements CacheEntryCreatedListener<Integer, Integer>,
-        CacheEntryUpdatedListener<Integer, Integer> {
+    private static class NoOpCreateUpdateListener implements CacheEntryCreatedListener<Object, Object>,
+        CacheEntryUpdatedListener<Object, Object> {
         /** {@inheritDoc} */
-        @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) {
+        @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
+            for (CacheEntryEvent<?, ?> evt : evts) {
                 assertNotNull(evt);
                 assertNotNull(evt.getSource());
                 assertNotNull(evt.getEventType());
@@ -1163,8 +1237,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         }
 
         /** {@inheritDoc} */
-        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) {
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+            for (CacheEntryEvent<?, ?> evt : evts) {
                 assertNotNull(evt);
                 assertNotNull(evt.getSource());
                 assertNotNull(evt.getEventType());
@@ -1177,16 +1251,16 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      *
      */
     private static class CreateUpdateRemoveExpireListener extends CreateUpdateListener
-        implements CacheEntryRemovedListener<Integer, Integer>, CacheEntryExpiredListener<Integer, Integer> {
+        implements CacheEntryRemovedListener<Object, Object>, CacheEntryExpiredListener<Object, Object> {
         /** {@inheritDoc} */
-        @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+        @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) {
+            for (CacheEntryEvent<?, ?> evt : evts)
                 onEvent(evt);
         }
 
         /** {@inheritDoc} */
-        @Override public void onExpired(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
-            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+        @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts) {
+            for (CacheEntryEvent<?, ?> evt : evts)
                 onEvent(evt);
         }
     }
@@ -1194,9 +1268,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class ExceptionFilter implements CacheEntryEventSerializableFilter<Integer, Integer> {
+    private static class ExceptionFilter implements CacheEntryEventSerializableFilter<Object, Object> {
         /** {@inheritDoc} */
-        @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) {
+        @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
             throw new RuntimeException("Test filter error.");
         }
     }
@@ -1205,24 +1279,24 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
      *
      */
     private static class ExceptionListener extends CreateUpdateListener
-        implements CacheEntryRemovedListener<Integer, Integer>, CacheEntryExpiredListener<Integer, Integer> {
+        implements CacheEntryRemovedListener<Object, Object>, CacheEntryExpiredListener<Object, Object> {
         /** {@inheritDoc} */
-        @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+        @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
             error();
         }
 
         /** {@inheritDoc} */
-        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
             error();
         }
 
         /** {@inheritDoc} */
-        @Override public void onExpired(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+        @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts) {
             error();
         }
 
         /** {@inheritDoc} */
-        @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+        @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) {
             error();
         }
 
@@ -1237,10 +1311,12 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    protected static class EntryToStringProcessor implements EntryProcessor<Integer, Integer, String> {
+    protected static class EntryToStringProcessor implements EntryProcessor<Object, Object, String> {
         /** {@inheritDoc} */
-        @Override public String process(MutableEntry<Integer, Integer> e, Object... arguments)
-            throws EntryProcessorException {
+        @Override public String process(MutableEntry<Object, Object> e, Object... args) {
+            if (e.getValue() instanceof ListenerTestValue)
+                return String.valueOf(((ListenerTestValue)e.getValue()).val1);
+
             return String.valueOf(e.getValue());
         }
 
@@ -1253,19 +1329,19 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    protected static class EntrySetValueProcessor implements EntryProcessor<Integer, Integer, String> {
+    protected static class EntrySetValueProcessor implements EntryProcessor<Object, Object, String> {
         /** */
-        private Integer val;
+        private Object val;
 
         /**
          * @param val Value to set.
          */
-        public EntrySetValueProcessor(Integer val) {
+        public EntrySetValueProcessor(Object val) {
             this.val = val;
         }
 
         /** {@inheritDoc} */
-        @Override public String process(MutableEntry<Integer, Integer> e, Object... arguments)
+        @Override public String process(MutableEntry<Object, Object> e, Object... args)
             throws EntryProcessorException {
             e.setValue(val);
 
@@ -1307,4 +1383,88 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
             // No-op.
         }
     }
+
+    /**
+     *
+     */
+    static class ListenerTestKey implements Serializable {
+        /** */
+        private final Integer key;
+
+        /**
+         * @param key Key.
+         */
+        public ListenerTestKey(Integer key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            ListenerTestKey that = (ListenerTestKey)o;
+
+            return key.equals(that.key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ListenerTestKey.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    static class ListenerTestValue implements Serializable {
+        /** */
+        private final Integer val1;
+
+        /** */
+        private final String val2;
+
+        /**
+         * @param val Value.
+         */
+        public ListenerTestValue(Integer val) {
+            this.val1 = val;
+            this.val2 = String.valueOf(val);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            ListenerTestValue that = (ListenerTestValue) o;
+
+            return val1.equals(that.val1) && val2.equals(that.val2);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = val1.hashCode();
+
+            res = 31 * res + val2.hashCode();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ListenerTestValue.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapTieredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapTieredTest.java
new file mode 100644
index 0000000..69efb84
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapTieredTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+
+/**
+ *
+ */
+public class IgniteCacheEntryListenerAtomicOffheapTieredTest extends IgniteCacheEntryListenerAtomicTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode memoryMode() {
+        return OFFHEAP_TIERED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapValuesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapValuesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapValuesTest.java
new file mode 100644
index 0000000..23b1bc0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapValuesTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheEntryListenerAtomicOffheapValuesTest extends IgniteCacheEntryListenerAtomicTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode memoryMode() {
+        return OFFHEAP_VALUES;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapTieredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapTieredTest.java
new file mode 100644
index 0000000..d552195
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapTieredTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+
+/**
+ *
+ */
+public class IgniteCacheEntryListenerTxOffheapTieredTest extends IgniteCacheEntryListenerTxTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode memoryMode() {
+        return OFFHEAP_TIERED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapValuesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapValuesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapValuesTest.java
new file mode 100644
index 0000000..32555c8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapValuesTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
+
+/**
+ *
+ */
+public class IgniteCacheEntryListenerTxOffheapValuesTest extends IgniteCacheEntryListenerTxTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode memoryMode() {
+        return OFFHEAP_VALUES;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
index a9e43d4..41725e7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
@@ -48,6 +48,7 @@ public class IgniteCacheEntryListenerTxTest extends IgniteCacheEntryListenerAbst
         return null;
     }
 
+    /** {@inheritDoc} */
     @Override public void testEvents(){
         fail("https://issues.apache.org/jira/browse/IGNITE-1600");
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 1c65f9b..a42f056 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -55,6 +55,7 @@ import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.query.ContinuousQuery;
@@ -97,6 +98,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheMemoryMode.*;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -142,6 +144,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
         ccfg.setBackups(backups);
         ccfg.setWriteSynchronizationMode(FULL_SYNC);
         ccfg.setNearConfiguration(nearCacheConfiguration());
+        ccfg.setMemoryMode(memoryMode());
 
         cfg.setCacheConfiguration(ccfg);
 
@@ -151,6 +154,13 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
     }
 
     /**
+     * @return Cache memory mode.
+     */
+    protected CacheMemoryMode memoryMode() {
+        return ONHEAP_TIERED;
+    }
+
+    /**
      * @return Near cache configuration.
      */
     protected NearCacheConfiguration nearCacheConfiguration() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.java
new file mode 100644
index 0000000..cc8590d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest
+    extends CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode memoryMode() {
+        return OFFHEAP_TIERED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxOffheapTieredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxOffheapTieredTest.java
new file mode 100644
index 0000000..cae06c3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxOffheapTieredTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverTxOffheapTieredTest extends CacheContinuousQueryFailoverTxSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode memoryMode() {
+        return OFFHEAP_TIERED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
new file mode 100644
index 0000000..d9b2091
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
@@ -0,0 +1,684 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 5;
+
+    /** */
+    private static final int KEYS = 10;
+
+    /** */
+    private static final int VALS = 10;
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(NODES - 1);
+
+        client = true;
+
+        startGrid(NODES - 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomic() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicated() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapValues() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            OFFHEAP_VALUES,
+            false);
+
+        testContinuousQuery(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapTiered() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            OFFHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicNoBackups() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTx() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxReplicated() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapValues() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            OFFHEAP_VALUES,
+            false);
+
+        testContinuousQuery(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapTiered() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            OFFHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxNoBackups() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg) throws Exception {
+        ignite(0).createCache(ccfg);
+
+        try {
+            IgniteCache<Object, Object> cache = ignite(NODES - 1).cache(ccfg.getName());
+
+            long seed = System.currentTimeMillis();
+
+            Random rnd = new Random(seed);
+
+            log.info("Random seed: " + seed);
+
+            ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+            final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue =
+                new ArrayBlockingQueue<>(10_000);
+
+            qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+                @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+                    for (CacheEntryEvent<?, ?> evt : evts) {
+                        // System.out.println("Event: " + evt);
+
+                        evtsQueue.add(evt);
+                    }
+                }
+            });
+
+            QueryCursor<?> cur = cache.query(qry);
+
+            ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
+
+            try {
+                for (int i = 0; i < 1000; i++) {
+                    if (i % 100 == 0)
+                        log.info("Iteration: " + i);
+
+                    randomUpdate(rnd, evtsQueue, expData, cache);
+                }
+            }
+            finally {
+                cur.close();
+            }
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @param evtsQueue Events queue.
+     * @param expData Expected cache data.
+     * @param cache Cache.
+     * @throws Exception If failed.
+     */
+    private void randomUpdate(
+        Random rnd,
+        BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue,
+        ConcurrentMap<Object, Object> expData,
+        IgniteCache<Object, Object> cache)
+        throws Exception {
+        Object key = new QueryTestKey(rnd.nextInt(KEYS));
+        Object newVal = value(rnd);
+        Object oldVal = expData.get(key);
+
+        int op = rnd.nextInt(11);
+
+        // log.info("Random operation [key=" + key + ", op=" + op + ']');
+
+        switch (op) {
+            case 0: {
+                cache.put(key, newVal);
+
+                waitEvent(evtsQueue, key, newVal, oldVal);
+
+                expData.put(key, newVal);
+
+                break;
+            }
+
+            case 1: {
+                cache.getAndPut(key, newVal);
+
+                waitEvent(evtsQueue, key, newVal, oldVal);
+
+                expData.put(key, newVal);
+
+                break;
+            }
+
+            case 2: {
+                cache.remove(key);
+
+                waitEvent(evtsQueue, key, null, oldVal);
+
+                expData.remove(key);
+
+                break;
+            }
+
+            case 3: {
+                cache.getAndRemove(key);
+
+                waitEvent(evtsQueue, key, null, oldVal);
+
+                expData.remove(key);
+
+                break;
+            }
+
+            case 4: {
+                cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+
+                waitEvent(evtsQueue, key, newVal, oldVal);
+
+                expData.put(key, newVal);
+
+                break;
+            }
+
+            case 5: {
+                cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
+
+                waitEvent(evtsQueue, key, null, oldVal);
+
+                expData.remove(key);
+
+                break;
+            }
+
+            case 6: {
+                cache.putIfAbsent(key, newVal);
+
+                if (oldVal == null) {
+                    waitEvent(evtsQueue, key, newVal, null);
+
+                    expData.put(key, newVal);
+                }
+                else
+                    checkNoEvent(evtsQueue);
+
+                break;
+            }
+
+            case 7: {
+                cache.getAndPutIfAbsent(key, newVal);
+
+                if (oldVal == null) {
+                    waitEvent(evtsQueue, key, newVal, null);
+
+                    expData.put(key, newVal);
+                }
+                else
+                    checkNoEvent(evtsQueue);
+
+                break;
+            }
+
+            case 8: {
+                cache.replace(key, newVal);
+
+                if (oldVal != null) {
+                    waitEvent(evtsQueue, key, newVal, oldVal);
+
+                    expData.put(key, newVal);
+                }
+                else
+                    checkNoEvent(evtsQueue);
+
+                break;
+            }
+
+            case 9: {
+                cache.getAndReplace(key, newVal);
+
+                if (oldVal != null) {
+                    waitEvent(evtsQueue, key, newVal, oldVal);
+
+                    expData.put(key, newVal);
+                }
+                else
+                    checkNoEvent(evtsQueue);
+
+                break;
+            }
+
+            case 10: {
+                if (oldVal != null) {
+                    Object replaceVal = value(rnd);
+
+                    boolean success = replaceVal.equals(oldVal);
+
+                    if (success) {
+                        cache.replace(key, replaceVal, newVal);
+
+                        waitEvent(evtsQueue, key, newVal, oldVal);
+
+                        expData.put(key, newVal);
+                    }
+                    else {
+                        cache.replace(key, replaceVal, newVal);
+
+                        checkNoEvent(evtsQueue);
+                    }
+                }
+                else {
+                    cache.replace(key, value(rnd), newVal);
+
+                    checkNoEvent(evtsQueue);
+                }
+
+                break;
+            }
+
+            default:
+                fail();
+        }
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @return Cache value.
+     */
+    private static Object value(Random rnd) {
+        return new QueryTestValue(rnd.nextInt(VALS));
+    }
+
+    /**
+     * @param evtsQueue Event queue.
+     * @param key Key.
+     * @param val Value.
+     * @param oldVal Old value.
+     * @throws Exception If failed.
+     */
+    private void waitEvent(BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue,
+        Object key, Object val, Object oldVal) throws Exception {
+        if (val == null && oldVal == null) {
+            checkNoEvent(evtsQueue);
+
+            return;
+        }
+
+        CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
+
+        assertNotNull("Failed to wait for event [key=" + key +
+            ", val=" + val +
+            ", oldVal=" + oldVal + ']', evt);
+        assertEquals(key, evt.getKey());
+        assertEquals(val, evt.getValue());
+        assertEquals(oldVal, evt.getOldValue());
+    }
+
+    /**
+     * @param evtsQueue Event queue.
+     * @throws Exception If failed.
+     */
+    private void checkNoEvent(BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue) throws Exception {
+        CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS);
+
+        assertNull(evt);
+    }
+
+    /**
+     *
+     * @param cacheMode Cache mode.
+     * @param backups Number of backups.
+     * @param atomicityMode Cache atomicity mode.
+     * @param memoryMode Cache memory mode.
+     * @param store If {@code true} configures dummy cache store.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Object, Object> cacheConfiguration(
+        CacheMode cacheMode,
+        int backups,
+        CacheAtomicityMode atomicityMode,
+        CacheMemoryMode memoryMode,
+        boolean store) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setMemoryMode(memoryMode);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
+        if (store) {
+            ccfg.setCacheStoreFactory(new TestStoreFactory());
+            ccfg.setReadThrough(true);
+            ccfg.setWriteThrough(true);
+        }
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> {
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public CacheStore<Object, Object> create() {
+            return new CacheStoreAdapter() {
+                @Override public Object load(Object key) throws CacheLoaderException {
+                    return null;
+                }
+
+                @Override public void write(Cache.Entry entry) throws CacheWriterException {
+                    // No-op.
+                }
+
+                @Override public void delete(Object key) throws CacheWriterException {
+                    // No-op.
+                }
+            };
+        }
+    }
+
+    /**
+     *
+     */
+    static class QueryTestKey implements Serializable {
+        /** */
+        private final Integer key;
+
+        /**
+         * @param key Key.
+         */
+        public QueryTestKey(Integer key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            QueryTestKey that = (QueryTestKey)o;
+
+            return key.equals(that.key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(QueryTestKey.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    static class QueryTestValue implements Serializable {
+        /** */
+        private final Integer val1;
+
+        /** */
+        private final String val2;
+
+        /**
+         * @param val Value.
+         */
+        public QueryTestValue(Integer val) {
+            this.val1 = val;
+            this.val2 = String.valueOf(val);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            QueryTestValue that = (QueryTestValue) o;
+
+            return val1.equals(that.val1) && val2.equals(that.val2);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = val1.hashCode();
+
+            res = 31 * res + val2.hashCode();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(QueryTestValue.class, this);
+        }
+    }
+    /**
+     *
+     */
+    protected static class EntrySetValueProcessor implements EntryProcessor<Object, Object, Object> {
+        /** */
+        private Object val;
+
+        /** */
+        private boolean retOld;
+
+        /**
+         * @param val Value to set.
+         * @param retOld Return old value flag.
+         */
+        public EntrySetValueProcessor(Object val, boolean retOld) {
+            this.val = val;
+            this.retOld = retOld;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object process(MutableEntry<Object, Object> e, Object... args) {
+            Object old = retOld ? e.getValue() : null;
+
+            if (val != null)
+                e.setValue(val);
+            else
+                e.remove();
+
+            return old;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(EntrySetValueProcessor.class, this);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index 5abb98d..dbe282e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -40,6 +40,7 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.query.ContinuousQuery;
@@ -73,9 +74,9 @@ import org.jsr166.ConcurrentHashMap8;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
-
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
 import static org.apache.ignite.cache.CacheMode.LOCAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -117,6 +118,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             cacheCfg.setReadThrough(true);
             cacheCfg.setWriteThrough(true);
             cacheCfg.setLoadPreviousValue(true);
+            cacheCfg.setMemoryMode(memoryMode());
 
             cfg.setCacheConfiguration(cacheCfg);
         }
@@ -135,6 +137,13 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
     }
 
     /**
+     * @return Cache memory mode.
+     */
+    protected CacheMemoryMode memoryMode() {
+        return ONHEAP_TIERED;
+    }
+
+    /**
      * @return Peer class loading enabled flag.
      */
     protected boolean peerClassLoadingEnabled() {
@@ -393,8 +402,8 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             }
         });
 
-        try (QueryCursor<Cache.Entry<Integer, Integer>> query2 = cache1.query(qry2);
-            QueryCursor<Cache.Entry<Integer, Integer>> query1 = cache.query(qry1)) {
+        try (QueryCursor<Cache.Entry<Integer, Integer>> qryCur2 = cache1.query(qry2);
+             QueryCursor<Cache.Entry<Integer, Integer>> qryCur1 = cache.query(qry1)) {
             for (int i = 0; i < gridCount(); i++) {
                 IgniteCache<Object, Object> cache0 = grid(i).cache(null);
 
@@ -448,7 +457,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
                     }
                 });
 
-                QueryCursor<Cache.Entry<Integer, Integer>> query = cache.query(qry);
+                QueryCursor<Cache.Entry<Integer, Integer>> qryCur = cache.query(qry);
 
                 for (int key = 0; key < keyCnt; key++)
                     cache.put(key, key);
@@ -461,7 +470,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
                     }, 2000L);
                 }
                 finally {
-                    query.close();
+                    qryCur.close();
                 }
             }
             else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapTieredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapTieredTest.java
new file mode 100644
index 0000000..d6948e2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapTieredTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+
+/**
+ *
+ */
+public class GridCacheContinuousQueryAtomicOffheapTieredTest extends GridCacheContinuousQueryAtomicSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode memoryMode() {
+        return OFFHEAP_TIERED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapValuesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapValuesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapValuesTest.java
new file mode 100644
index 0000000..4002435
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapValuesTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
+
+/**
+ *
+ */
+public class GridCacheContinuousQueryAtomicOffheapValuesTest extends GridCacheContinuousQueryAtomicSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode memoryMode() {
+        return OFFHEAP_VALUES;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapTieredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapTieredTest.java
new file mode 100644
index 0000000..bcba7b6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapTieredTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+
+/**
+ *
+ */
+public class GridCacheContinuousQueryTxOffheapTieredTest extends GridCacheContinuousQueryTxSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode memoryMode() {
+        return OFFHEAP_TIERED;
+    }
+}


[22/22] ignite git commit: ignite-2407

Posted by sb...@apache.org.
ignite-2407


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6c291043
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6c291043
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6c291043

Branch: refs/heads/ignite-2407
Commit: 6c29104396b29c2752855e3cd3c5f5b35e6a1304
Parents: 112e76e
Author: sboikov <sb...@gridgain.com>
Authored: Fri Feb 12 16:08:51 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Feb 12 16:08:51 2016 +0300

----------------------------------------------------------------------
 .../distributed/IgniteCachePrimarySyncTest.java | 45 +++++++++++++++++---
 1 file changed, 39 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6c291043/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePrimarySyncTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePrimarySyncTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePrimarySyncTest.java
index cef73fd..183d4bf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePrimarySyncTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePrimarySyncTest.java
@@ -21,16 +21,26 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
 
 /**
  *
@@ -94,21 +104,36 @@ public class IgniteCachePrimarySyncTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testPutGet() throws Exception {
-        checkPutGet(ignite(SRVS).cache("cache1"));
+        Ignite ignite = ignite(SRVS);
 
-        checkPutGet(ignite(SRVS).cache("cache2"));
+        checkPutGet(ignite.cache("cache1"), null, null, null);
+
+        checkPutGet(ignite.cache("cache2"), null, null, null);
+
+        checkPutGet(ignite.cache("cache2"), ignite.transactions(), OPTIMISTIC, REPEATABLE_READ);
+
+        checkPutGet(ignite.cache("cache2"), ignite.transactions(), OPTIMISTIC, SERIALIZABLE);
+
+        checkPutGet(ignite.cache("cache2"), ignite.transactions(), PESSIMISTIC, READ_COMMITTED);
     }
 
     /**
      * @param cache Cache.
+     * @param txs Transactions instance if explicit transaction should be used.
+     * @param concurrency Transaction concurrency.
+     * @param isolation Transaction isolation.
      */
-    private void checkPutGet(IgniteCache<Object, Object> cache) {
+    private void checkPutGet(IgniteCache<Object, Object> cache,
+        @Nullable IgniteTransactions txs,
+        TransactionConcurrency concurrency,
+        TransactionIsolation isolation) {
         log.info("Check cache: " + cache.getName());
 
         final int KEYS = 50;
 
         for (int iter = 0; iter < 100; iter++) {
-            log.info("Iteration: " + iter);
+            if (iter % 10 == 0)
+                log.info("Iteration: " + iter);
 
             for (int i = 0; i < KEYS; i++)
                 cache.remove(i);
@@ -118,12 +143,20 @@ public class IgniteCachePrimarySyncTest extends GridCommonAbstractTest {
             for (int i = 0; i < KEYS; i++)
                 putBatch.put(i, iter);
 
-            cache.putAll(putBatch);
+            if (txs != null) {
+                try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                    cache.putAll(putBatch);
+
+                    tx.commit();
+                }
+            }
+            else
+                cache.putAll(putBatch);
 
             Map<Object, Object> vals = cache.getAll(putBatch.keySet());
 
             for (int i = 0; i < KEYS; i++)
-                assertNotNull(vals.get(i));
+                assertEquals(iter, vals.get(i));
         }
     }
 }


[19/22] ignite git commit: IGNITE-2468

Posted by sb...@apache.org.
 IGNITE-2468


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/725d6cb5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/725d6cb5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/725d6cb5

Branch: refs/heads/ignite-2407
Commit: 725d6cb557684ac8f31dfde8f5fcb4ddb95a18dd
Parents: 763bf57
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Feb 12 14:08:25 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Feb 12 14:08:25 2016 +0300

----------------------------------------------------------------------
 .../internal/GridMessageListenHandler.java      |  16 ++
 .../continuous/GridContinuousProcessor.java     |  50 +++--
 ...eClientReconnectContinuousProcessorTest.java |  32 +++-
 ...IgniteCacheContinuousQueryReconnectTest.java | 192 +++++++++++++++++++
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |   2 +-
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 6 files changed, 279 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 13aeb54..bf81944 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -83,6 +83,22 @@ public class GridMessageListenHandler implements GridContinuousHandler {
         this.pred = pred;
     }
 
+    /**
+     *
+     * @param orig Handler to be copied.
+     */
+    public GridMessageListenHandler(GridMessageListenHandler orig) {
+        assert orig != null;
+
+        this.clsName = orig.clsName;
+        this.depInfo = orig.depInfo;
+        this.pred = orig.pred;
+        this.predBytes = orig.predBytes;
+        this.topic = orig.topic;
+        this.topicBytes = orig.topicBytes;
+        this.depEnabled = false;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean isEvents() {
         return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 0218897..496f820 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -40,6 +40,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridMessageListenHandler;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -428,11 +429,17 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                         ctx.resource().injectGeneric(item.prjPred);
 
                     // Register handler only if local node passes projection predicate.
-                    if (item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) {
+                    if ((item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) &&
+                        !locInfos.containsKey(item.routineId)) {
                         if (registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval,
                             item.autoUnsubscribe, false))
                             item.hnd.onListenerRegistered(item.routineId, ctx);
                     }
+
+                    if (!item.autoUnsubscribe)
+                        // Register routine locally.
+                        locInfos.putIfAbsent(item.routineId, new LocalRoutineInfo(
+                            item.prjPred, item.hnd, item.bufSize, item.interval, item.autoUnsubscribe));
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to register continuous handler.", e);
@@ -854,6 +861,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e);
         }
 
+        GridContinuousHandler hnd0 = hnd instanceof GridMessageListenHandler ?
+            new GridMessageListenHandler((GridMessageListenHandler)hnd) :
+            hnd;
+
         if (node.isClient()) {
             Map<UUID, LocalRoutineInfo> clientRoutineMap = clientInfos.get(node.id());
 
@@ -866,7 +877,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             }
 
             clientRoutineMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(),
-                hnd,
+                hnd0,
                 data.bufferSize(),
                 data.interval(),
                 data.autoUnsubscribe()));
@@ -881,10 +892,16 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 if (prjPred != null)
                     ctx.resource().injectGeneric(prjPred);
 
-                if (prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) {
-                    registered = registerHandler(node.id(), routineId, hnd, data.bufferSize(), data.interval(),
+                if ((prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) &&
+                    !locInfos.containsKey(routineId)) {
+                    registered = registerHandler(node.id(), routineId, hnd0, data.bufferSize(), data.interval(),
                         data.autoUnsubscribe(), false);
                 }
+
+                if (!data.autoUnsubscribe())
+                    // Register routine locally.
+                    locInfos.putIfAbsent(routineId, new LocalRoutineInfo(
+                        prjPred, hnd0, data.bufferSize(), data.interval(), data.autoUnsubscribe()));
             }
             catch (IgniteCheckedException e) {
                 err = e;
@@ -894,11 +911,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         }
 
         // Load partition counters.
-        if (hnd.isQuery()) {
+        if (hnd0.isQuery()) {
             GridCacheProcessor proc = ctx.cache();
 
             if (proc != null) {
-                GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName());
+                GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
 
                 if (cache != null && !cache.isLocal()) {
                     Map<Integer, Long> cntrs = cache.context().topology().updateCounters();
@@ -912,7 +929,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             req.addError(ctx.localNodeId(), err);
 
         if (registered)
-            hnd.onListenerRegistered(routineId, ctx);
+            hnd0.onListenerRegistered(routineId, ctx);
     }
 
     /**
@@ -1095,22 +1112,31 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      */
     @SuppressWarnings("TooBroadScope")
     private void unregisterRemote(UUID routineId) {
-        RemoteRoutineInfo info;
+        RemoteRoutineInfo remote;
+        LocalRoutineInfo loc;
 
         stopLock.lock();
 
         try {
-            info = rmtInfos.remove(routineId);
+            remote = rmtInfos.remove(routineId);
 
-            if (info == null)
+            loc = locInfos.remove(routineId);
+
+            if (remote == null)
                 stopped.add(routineId);
         }
         finally {
             stopLock.unlock();
         }
 
-        if (info != null)
-            unregisterHandler(routineId, info.hnd, false);
+        if (remote != null)
+            unregisterHandler(routineId, remote.hnd, false);
+        else {
+            assert loc != null;
+
+            // Removes routine at node started it when stopRoutine called from another node.
+            unregisterHandler(routineId, loc.hnd, false);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
index dc94c96..4c44adc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
@@ -113,7 +113,21 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
     /**
      * @throws Exception If failed.
      */
-    public void testMessageListenerReconnect() throws Exception {
+    public void testMessageListenerReconnectAndStopFromServer() throws Exception {
+        testMessageListenerReconnect(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMessageListenerReconnectAndStopFromClient() throws Exception {
+        testMessageListenerReconnect(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void testMessageListenerReconnect(boolean stopFromClient) throws Exception {
         Ignite client = grid(serverCount());
 
         assertTrue(client.cluster().localNode().isClient());
@@ -166,7 +180,7 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
 
         log.info("Stop listen, should not get remote messages anymore.");
 
-        client.message().stopRemoteListen(opId);
+        (stopFromClient ? client : srv).message().stopRemoteListen(opId);
 
         srv.message().send(topic, "msg3");
 
@@ -175,6 +189,20 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
 
         assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
         assertFalse(latch.await(3000, MILLISECONDS));
+
+        log.info("New nodes should not register stopped listeners.");
+
+        startGrid(serverCount() + 1);
+
+        srv.message().send(topic, "msg4");
+
+        locLsnr.latch = new CountDownLatch(1);
+        latch = new CountDownLatch(1);
+
+        assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
+        assertFalse(latch.await(3000, MILLISECONDS));
+
+        stopGrid(serverCount() + 1);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryReconnectTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryReconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryReconnectTest.java
new file mode 100644
index 0000000..b1d8a49
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryReconnectTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteClientDisconnectedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheContinuousQueryReconnectTest extends GridCommonAbstractTest implements Serializable {
+    /** */
+    final private static AtomicInteger cnt = new AtomicInteger();
+
+    /** */
+    private volatile boolean isClient = false;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setAtomicityMode(atomicMode());
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setBackups(1);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        if (isClient)
+            cfg.setClientMode(true);
+
+        return cfg;
+    }
+
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @return Atomic mode.
+     */
+    protected CacheAtomicityMode atomicMode() {
+        return ATOMIC;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectServer() throws Exception {
+        testReconnect(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectClient() throws Exception {
+        testReconnect(true);
+    }
+
+    /**
+     *
+     */
+    private void putAndCheck(IgniteCache<Object, Object> cache, int diff) {
+        cnt.set(0);
+
+        cache.put(1, "1");
+
+        assertEquals(diff, cnt.get());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void testReconnect(boolean clientQuery) throws Exception {
+        Ignite srv1 = startGrid(0);
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+            @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+                // No-op.
+            }
+        });
+
+        qry.setAutoUnsubscribe(false);
+
+        qry.setRemoteFilter(new CacheEntryEventSerializableFilter<Object, Object>() {
+            @Override public boolean evaluate(CacheEntryEvent<?, ?> event) throws CacheEntryListenerException {
+                cnt.incrementAndGet();
+
+                return true;
+            }
+        });
+
+        isClient = true;
+
+        Ignite client = startGrid(1);
+
+        isClient = false;
+
+        IgniteCache<Object, Object> cache1 = srv1.cache(null);
+        IgniteCache<Object, Object> clCache = client.cache(null);
+
+        putAndCheck(clCache, 0); // 0 remote listeners.
+
+        QueryCursor<Cache.Entry<Object, Object>> cur = (clientQuery ? clCache : cache1).query(qry);
+
+        putAndCheck(clCache, 1); // 1 remote listener.
+
+        final Ignite srv2 = startGrid(2);
+
+        putAndCheck(clCache, 2); // 2 remote listeners.
+
+        stopGrid(0);
+
+        while (true) {
+            try {
+                clCache.get(1);
+
+                break;
+            }
+            catch (IgniteClientDisconnectedException e) {
+                e.reconnectFuture().get(); // Wait for reconnect.
+
+            }
+            catch (CacheException e) {
+                if (e.getCause() instanceof IgniteClientDisconnectedException)
+                    ((IgniteClientDisconnectedException)e.getCause()).reconnectFuture().get(); // Wait for reconnect.
+            }
+        }
+
+        putAndCheck(clCache, 1); // 1 remote listener.
+
+        Ignite srv3 = startGrid(3);
+
+        putAndCheck(clCache, 2); // 2 remote listeners.
+
+        stopGrid(1); // Client node.
+
+        isClient = true;
+
+        client = startGrid(4);
+
+        isClient = false;
+
+        clCache = client.cache(null);
+
+        putAndCheck(clCache, 2); // 2 remote listeners.
+
+        Ignite srv4 = startGrid(5);
+
+        putAndCheck(clCache, 3); // 3 remote listeners.
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 030c653..7debb41 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -2050,7 +2050,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override public boolean apply(UUID uuid, Object msg) {
-            X.println(">>> Received [locNodeId=" + ignite.configuration().getNodeId() + ", msg=" + msg + ']');
+            X.println(">>> Received [node=" + ignite.name() + ", msg=" + msg + ']');
 
             msgLatch.countDown();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 3cd4579..cecb8ad 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -100,6 +100,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
 import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTxReconnectTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryReconnectTest;
 import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryAtomicSelfTest;
 import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryLocalSelfTest;
 import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryPartitionedSelfTest;
@@ -200,6 +201,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheContinuousQueryPartitionTxOneNodeTest.class);
         suite.addTestSuite(GridCacheContinuousQueryPartitionAtomicOneNodeTest.class);
         suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class);
+        suite.addTestSuite(IgniteCacheContinuousQueryReconnectTest.class);
         suite.addTestSuite(IgniteCacheContinuousQueryClientReconnectTest.class);
         suite.addTestSuite(IgniteCacheContinuousQueryClientTxReconnectTest.class);
         suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest.class);


[02/22] ignite git commit: IGNITE-864 Disabled version check for visorcmd

Posted by sb...@apache.org.
IGNITE-864 Disabled version check for visorcmd


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c67e2ea5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c67e2ea5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c67e2ea5

Branch: refs/heads/ignite-2407
Commit: c67e2ea5bfb7b73d76744d54047eb1b692e0907a
Parents: b7475f0
Author: Andrey <an...@gridgain.com>
Authored: Wed Feb 10 16:56:23 2016 +0700
Committer: Andrey <an...@gridgain.com>
Committed: Wed Feb 10 16:56:23 2016 +0700

----------------------------------------------------------------------
 .../src/main/scala/org/apache/ignite/visor/visor.scala           | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c67e2ea5/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala
index a4eed69..335eb9f 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.visor
 
+import org.apache.ignite.IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER
 import org.apache.ignite._
 import org.apache.ignite.cluster.{ClusterGroup, ClusterGroupEmptyException, ClusterMetrics, ClusterNode}
 import org.apache.ignite.events.EventType._
@@ -303,6 +304,9 @@ object visor extends VisorTag {
         }
     })
 
+    // Make sure visor starts without version checker print.
+    System.setProperty(IGNITE_UPDATE_NOTIFIER, "false")
+
     addHelp(
         name = "mlist",
         shortInfo = "Prints Visor console memory variables.",


[15/22] ignite git commit: added missing file header to org.apache.ignite.hadoop.fs.KerberosHadoopFileSystemFactorySelfTest

Posted by sb...@apache.org.
added missing file header to org.apache.ignite.hadoop.fs.KerberosHadoopFileSystemFactorySelfTest

Signed-off-by: Anton Vinogradov <av...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0491a5f8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0491a5f8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0491a5f8

Branch: refs/heads/ignite-2407
Commit: 0491a5f814365723b53ff1820aad76f646307f13
Parents: a9937a6
Author: iveselovskiy <iv...@gridgain.com>
Authored: Wed Feb 10 18:54:36 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Wed Feb 10 19:31:41 2016 +0300

----------------------------------------------------------------------
 .../KerberosHadoopFileSystemFactorySelfTest.java   | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0491a5f8/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java
index 8fb1612..ea7fa99 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite.hadoop.fs;
 
 import java.io.ByteArrayInputStream;


[06/22] ignite git commit: ignite-2587 Fixed continuous query notifications in offheap mode and BinaryObjectOffheapImpl usage.

Posted by sb...@apache.org.
ignite-2587 Fixed continuous query notifications in offheap mode and BinaryObjectOffheapImpl usage.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4c05fc02
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4c05fc02
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4c05fc02

Branch: refs/heads/ignite-2407
Commit: 4c05fc0254f446ef040f5d22a066a0d4916a589e
Parents: 0b47d5c
Author: sboikov <sb...@gridgain.com>
Authored: Wed Feb 10 14:07:40 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Feb 10 14:07:40 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/CacheLazyEntry.java        |   3 +
 .../processors/cache/GridCacheContext.java      |   4 +-
 .../processors/cache/GridCacheMapEntry.java     | 118 +++-
 .../binary/CacheObjectBinaryProcessorImpl.java  |   6 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  79 ++-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  85 ++-
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |  38 +-
 .../cache/query/GridCacheQueryManager.java      |  30 +-
 .../continuous/CacheContinuousQueryHandler.java |   3 +-
 .../CacheContinuousQueryListener.java           |   2 +-
 .../continuous/CacheContinuousQueryManager.java | 120 +++-
 .../continuous/GridContinuousProcessor.java     |  16 +-
 .../IgniteCacheEntryListenerAbstractTest.java   | 454 ++++++++----
 ...cheEntryListenerAtomicOffheapTieredTest.java |  32 +
 ...cheEntryListenerAtomicOffheapValuesTest.java |  32 +
 ...teCacheEntryListenerTxOffheapTieredTest.java |  32 +
 ...teCacheEntryListenerTxOffheapValuesTest.java |  32 +
 .../cache/IgniteCacheEntryListenerTxTest.java   |   1 +
 ...ContinuousQueryFailoverAbstractSelfTest.java |  10 +
 ...tomicPrimaryWriteOrderOffheapTieredTest.java |  33 +
 ...tinuousQueryFailoverTxOffheapTieredTest.java |  32 +
 ...acheContinuousQueryRandomOperationsTest.java | 684 +++++++++++++++++++
 ...ridCacheContinuousQueryAbstractSelfTest.java |  19 +-
 ...eContinuousQueryAtomicOffheapTieredTest.java |  32 +
 ...eContinuousQueryAtomicOffheapValuesTest.java |  32 +
 ...CacheContinuousQueryTxOffheapTieredTest.java |  32 +
 ...CacheContinuousQueryTxOffheapValuesTest.java |  32 +
 .../junits/common/GridCommonAbstractTest.java   |   2 +-
 .../ignite/testsuites/IgniteCacheTestSuite.java |   8 +
 .../IgniteCacheQuerySelfTestSuite.java          |  14 +
 30 files changed, 1743 insertions(+), 274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
index 05a6fef..30933e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
@@ -50,6 +50,7 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> {
      * @param cctx Cache context.
      * @param keyObj Key cache object.
      * @param valObj Cache object value.
+     * @param keepBinary Keep binary flag.
      */
     public CacheLazyEntry(GridCacheContext cctx, KeyCacheObject keyObj, CacheObject valObj, boolean keepBinary) {
         this.cctx = cctx;
@@ -61,6 +62,7 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> {
     /**
      * @param keyObj Key cache object.
      * @param val Value.
+     * @param keepBinary Keep binary flag.
      * @param cctx Cache context.
      */
     public CacheLazyEntry(GridCacheContext cctx, KeyCacheObject keyObj, V val, boolean keepBinary) {
@@ -75,6 +77,7 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> {
      * @param keyObj Key cache object.
      * @param key Key value.
      * @param valObj Cache object
+     * @param keepBinary Keep binary flag.
      * @param val Cache value.
      */
     public CacheLazyEntry(GridCacheContext<K, V> ctx,

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index e875df0..5729959 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -1729,10 +1729,10 @@ public class GridCacheContext<K, V> implements Externalizable {
      * @return Heap-based object.
      */
     @Nullable public <T> T unwrapTemporary(@Nullable Object obj) {
-        if (!offheapTiered())
+        if (!useOffheapEntry())
             return (T)obj;
 
-        return (T) cacheObjects().unwrapTemporary(this, obj);
+        return (T)cacheObjects().unwrapTemporary(this, obj);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index ae40295..9336e0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.Cache;
@@ -44,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtr
 import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheTtlEntryExtras;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -1122,7 +1124,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             assert newVer != null : "Failed to get write version for tx: " + tx;
 
-            old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : this.val;
+            boolean internal = isInternal() || !context().userCache();
+
+            Map<UUID, CacheContinuousQueryListener> lsnrCol =
+                notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null;
+
+            old = (retval || intercept || lsnrCol != null) ?
+                rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : this.val;
 
             if (intercept) {
                 val0 = CU.value(val, cctx, false);
@@ -1206,10 +1214,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     keepBinary);
             }
 
-            if (cctx.isLocal() || cctx.isReplicated() ||
-                (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local())))
-                cctx.continuousQueries().onEntryUpdated(key, val, old, isInternal() || !context().userCache(),
-                    partition(), tx.local(), false, updateCntr0, topVer);
+            if (lsnrCol != null) {
+                cctx.continuousQueries().onEntryUpdated(
+                    lsnrCol,
+                    key,
+                    val,
+                    old,
+                    internal,
+                    partition(),
+                    tx.local(),
+                    false,
+                    updateCntr0,
+                    topVer);
+            }
 
             cctx.dataStructures().onEntryUpdated(key, false, keepBinary);
         }
@@ -1304,7 +1321,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             newVer = explicitVer != null ? explicitVer : tx == null ? nextVersion() : tx.writeVersion();
 
-            old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : val;
+            boolean internal = isInternal() || !context().userCache();
+
+            Map<UUID, CacheContinuousQueryListener> lsnrCol =
+                notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null;
+
+            old = (retval || intercept || lsnrCol != null) ?
+                rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val;
 
             if (intercept) {
                 entry0 = new CacheLazyEntry(cctx, key, old, keepBinary);
@@ -1388,10 +1411,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     keepBinary);
             }
 
-            if (cctx.isLocal() || cctx.isReplicated() ||
-                (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local())))
-                cctx.continuousQueries().onEntryUpdated(key, null, old, isInternal()
-                    || !context().userCache(),partition(), tx.local(), false, updateCntr0, topVer);
+            if (lsnrCol != null) {
+                cctx.continuousQueries().onEntryUpdated(
+                    lsnrCol,
+                    key,
+                    null,
+                    old,
+                    internal,
+                    partition(),
+                    tx.local(),
+                    false,
+                    updateCntr0,
+                    topVer);
+            }
 
             cctx.dataStructures().onEntryUpdated(key, true, keepBinary);
 
@@ -1440,6 +1472,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             return new GridCacheUpdateTxResult(false, null);
     }
 
+    /**
+     * @param tx Transaction.
+     * @return {@code True} if should notify continuous query manager.
+     */
+    private boolean notifyContinuousQueries(@Nullable IgniteInternalTx tx) {
+        return cctx.isLocal() ||
+            cctx.isReplicated() ||
+            (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local()));
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public GridTuple3<Boolean, Object, EntryProcessorResult<Object>> innerUpdateLocal(
@@ -1470,7 +1512,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         EntryProcessorResult<Object> invokeRes = null;
 
         synchronized (this) {
-            boolean needVal = retval || intercept || op == GridCacheOperation.TRANSFORM || !F.isEmpty(filter);
+            boolean internal = isInternal() || !context().userCache();
+
+            Map<UUID, CacheContinuousQueryListener> lsnrCol =
+                cctx.continuousQueries().updateListeners(internal, false);
+
+            boolean needVal = retval ||
+                intercept ||
+                op == GridCacheOperation.TRANSFORM ||
+                !F.isEmpty(filter) ||
+                lsnrCol != null;
 
             checkObsolete();
 
@@ -1479,7 +1530,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 unswap(retval);
 
             // Possibly get old value form store.
-            old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val;
+            old = needVal ? rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val;
 
             boolean readFromStore = false;
 
@@ -1731,11 +1782,20 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (res)
                 updateMetrics(op, metrics);
 
-            if (!isNear()) {
+            if (lsnrCol != null) {
                 long updateCntr = nextPartCounter(AffinityTopologyVersion.NONE);
 
-                cctx.continuousQueries().onEntryUpdated(key, val, old, isInternal() || !context().userCache(),
-                    partition(), true, false, updateCntr, AffinityTopologyVersion.NONE);
+                cctx.continuousQueries().onEntryUpdated(
+                    lsnrCol,
+                    key,
+                    val,
+                    old,
+                    internal,
+                    partition(),
+                    true,
+                    false,
+                    updateCntr,
+                    AffinityTopologyVersion.NONE);
             }
 
             cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary);
@@ -1997,8 +2057,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             if (updateCntr != null)
                                 updateCntr0 = updateCntr;
 
-                            cctx.continuousQueries().onEntryUpdated(key, evtVal, prevVal, isInternal()
-                                || !context().userCache(), partition(), primary, false, updateCntr0, topVer);
+                            cctx.continuousQueries().onEntryUpdated(
+                                key,
+                                evtVal,
+                                prevVal,
+                                isInternal() || !context().userCache(),
+                                partition(),
+                                primary,
+                                false,
+                                updateCntr0,
+                                topVer);
                         }
 
                         return new GridCacheUpdateAtomicResult(false,
@@ -2019,7 +2087,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             }
 
             // Prepare old value and value bytes.
-            oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val;
+            oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val;
 
             // Possibly read value from store.
             boolean readFromStore = false;
@@ -2937,7 +3005,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     /**
      * @return {@code True} if values should be stored off-heap.
      */
-    protected boolean isOffHeapValuesOnly() {
+    protected final boolean isOffHeapValuesOnly() {
         return cctx.config().getMemoryMode() == CacheMemoryMode.OFFHEAP_VALUES;
     }
 
@@ -3236,8 +3304,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 drReplicate(drType, val, ver);
 
                 if (!skipQryNtf) {
-                    cctx.continuousQueries().onEntryUpdated(key, val, null, this.isInternal()
-                        || !this.context().userCache(), this.partition(), true, preload, updateCntr, topVer);
+                    cctx.continuousQueries().onEntryUpdated(
+                        key,
+                        val,
+                        null,
+                        this.isInternal() || !this.context().userCache(),
+                        this.partition(),
+                        true,
+                        preload,
+                        updateCntr,
+                        topVer);
 
                     cctx.dataStructures().onEntryUpdated(key, false, true);
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 0fef6f8..f091fc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -822,10 +822,10 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
 
         Object val = unmarshal(valPtr, !tmp);
 
-        if (val instanceof BinaryObjectOffheapImpl)
-            return (BinaryObjectOffheapImpl)val;
+        if (val instanceof CacheObject)
+            return (CacheObject)val;
 
-        return new CacheObjectImpl(val, null);
+        return toCacheObject(ctx.cacheObjectContext(), val, false);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 6c7bac5..fec61df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -76,6 +76,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
@@ -1992,6 +1993,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         boolean intercept = ctx.config().getInterceptor() != null;
 
+        boolean initLsnrs = false;
+        Map<UUID, CacheContinuousQueryListener> lsnrs = null;
+        boolean internal = false;
+
         // Avoid iterator creation.
         for (int i = 0; i < keys.size(); i++) {
             KeyCacheObject k = keys.get(i);
@@ -2006,6 +2011,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 if (entry == null)
                     continue;
 
+                if (!initLsnrs) {
+                    internal = entry.isInternal() || !context().userCache();
+
+                    lsnrs = ctx.continuousQueries().updateListeners(internal, false);
+
+                    initLsnrs = true;
+                }
+
                 GridCacheVersion newConflictVer = req.conflictVersion(i);
                 long newConflictTtl = req.conflictTtl(i);
                 long newConflictExpireTime = req.conflictExpireTime(i);
@@ -2034,7 +2047,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     req.invokeArguments(),
                     primary && writeThrough() && !req.skipStore(),
                     !req.skipStore(),
-                    sndPrevVal || req.returnValue(),
+                    lsnrs != null || sndPrevVal || req.returnValue(),
                     req.keepBinary(),
                     expiry,
                     true,
@@ -2061,6 +2074,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
 
                 if (dhtFut != null) {
+                    dhtFut.listeners(lsnrs);
+
                     if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios.
                         GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult();
 
@@ -2097,10 +2112,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']');
                     }
                 }
-                else if (!entry.isNear() && updRes.success()) {
-                    ctx.continuousQueries().onEntryUpdated(entry.key(), updRes.newValue(), updRes.oldValue(),
-                        entry.isInternal() || !context().userCache(), entry.partition(), primary, false,
-                        updRes.updateCounter(), topVer);
+                else if (lsnrs != null && updRes.success()) {
+                    ctx.continuousQueries().onEntryUpdated(
+                        lsnrs,
+                        entry.key(),
+                        updRes.newValue(),
+                        updRes.oldValue(),
+                        internal,
+                        entry.partition(),
+                        primary,
+                        false,
+                        updRes.updateCounter(),
+                        topVer);
                 }
 
                 if (hasNear) {
@@ -2275,6 +2298,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
             boolean intercept = ctx.config().getInterceptor() != null;
 
+            boolean initLsnrs = false;
+            Map<UUID, CacheContinuousQueryListener> lsnrs = null;
+
             // Avoid iterator creation.
             for (int i = 0; i < entries.size(); i++) {
                 GridDhtCacheEntry entry = entries.get(i);
@@ -2308,6 +2334,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
                     }
 
+                    if (!initLsnrs) {
+                        lsnrs = ctx.continuousQueries().updateListeners(
+                            entry.isInternal() || !context().userCache(),
+                            false);
+
+                        initLsnrs = true;
+                    }
+
                     GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
                         ver,
                         node.id(),
@@ -2317,7 +2351,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         null,
                         /*write-through*/false,
                         /*read-through*/false,
-                        /*retval*/sndPrevVal,
+                        /*retval*/sndPrevVal || lsnrs != null,
                         req.keepBinary(),
                         expiry,
                         /*event*/true,
@@ -2366,6 +2400,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     }
 
                     if (dhtFut != null) {
+                        dhtFut.listeners(lsnrs);
+
                         EntryProcessor<Object, Object, Object> entryProcessor =
                             entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
 
@@ -2763,6 +2799,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
 
+        boolean initLsnrs = false;
+        Map<UUID, CacheContinuousQueryListener> lsnrs = null;
+        boolean internal = false;
+
         for (int i = 0; i < req.size(); i++) {
             KeyCacheObject key = req.key(i);
 
@@ -2785,6 +2825,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         long ttl = req.ttl(i);
                         long expireTime = req.conflictExpireTime(i);
 
+                        if (!initLsnrs) {
+                            internal = entry.isInternal() || !context().userCache();
+
+                            lsnrs = ctx.continuousQueries().updateListeners(internal, false);
+
+                            initLsnrs = true;
+                        }
+
                         GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
                             ver,
                             nodeId,
@@ -2794,7 +2842,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             op == TRANSFORM ? req.invokeArguments() : null,
                             /*write-through*/false,
                             /*read-through*/false,
-                            /*retval*/false,
+                            /*retval*/lsnrs != null,
                             req.keepBinary(),
                             /*expiry policy*/null,
                             /*event*/true,
@@ -2817,10 +2865,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         if (updRes.removeVersion() != null)
                             ctx.onDeferredDelete(entry, updRes.removeVersion());
 
-                        if (updRes.success() && !entry.isNear())
-                            ctx.continuousQueries().onEntryUpdated(entry.key(), updRes.newValue(),
-                                updRes.oldValue(), entry.isInternal() || !context().userCache(), entry.partition(),
-                                false, false, updRes.updateCounter(), req.topologyVersion());
+                        if (lsnrs != null && updRes.success()) {
+                            ctx.continuousQueries().onEntryUpdated(
+                                lsnrs,
+                                entry.key(),
+                                updRes.newValue(),
+                                updRes.oldValue(),
+                                internal,
+                                entry.partition(),
+                                false,
+                                false,
+                                updRes.updateCounter(),
+                                req.topologyVersion());
+                        }
 
                         entry.onUnlock();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 06c8441..58d704d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -102,6 +103,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     /** Response count. */
     private volatile int resCnt;
 
+    /** */
+    private Map<UUID, CacheContinuousQueryListener> lsnrs;
+
     /**
      * @param cctx Cache context.
      * @param completionCb Callback to invoke when future is completed.
@@ -136,6 +140,13 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         waitForExchange = !topLocked;
     }
 
+    /**
+     * @param lsnrs Continuous query listeners.
+     */
+    void listeners(@Nullable Map<UUID, CacheContinuousQueryListener> lsnrs) {
+        this.lsnrs = lsnrs;
+    }
+
     /** {@inheritDoc} */
     @Override public IgniteUuid futureId() {
         return futVer.asGridUuid();
@@ -215,6 +226,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
      * @param ttl TTL (optional).
      * @param conflictExpireTime Conflict expire time (optional).
      * @param conflictVer Conflict version (optional).
+     * @param addPrevVal If {@code true} sends previous value to backups.
+     * @param prevVal Previous value.
      * @param updateCntr Partition update counter.
      */
     public void addWriteEntry(GridDhtCacheEntry entry,
@@ -270,13 +283,22 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                     addPrevVal,
                     entry.partition(),
                     prevVal,
-                    updateCntr);
+                    updateCntr,
+                    lsnrs != null);
             }
-            else if (dhtNodes.size() == 1) {
+            else if (lsnrs != null && dhtNodes.size() == 1) {
                 try {
-                    cctx.continuousQueries().onEntryUpdated(entry.key(), val, prevVal,
-                        entry.key().internal() || !cctx.userCache(), entry.partition(), true, false,
-                        updateCntr, updateReq.topologyVersion());
+                    cctx.continuousQueries().onEntryUpdated(
+                        lsnrs,
+                        entry.key(),
+                        val,
+                        prevVal,
+                        entry.key().internal() || !cctx.userCache(),
+                        entry.partition(),
+                        true,
+                        false,
+                        updateCntr,
+                        updateReq.topologyVersion());
                 }
                 catch (IgniteCheckedException e) {
                     U.warn(log, "Failed to send continuous query message. [key=" + entry.key() + ", newVal="
@@ -352,7 +374,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
             cctx.mvcc().removeAtomicFuture(version());
 
             if (err != null) {
-                if (!mappings.isEmpty()) {
+                if (!mappings.isEmpty() && lsnrs != null) {
                     Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
 
                     exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
@@ -362,7 +384,11 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                             if (!hndKeys.contains(key)) {
                                 updateRes.addFailedKey(key, err);
 
-                                cctx.continuousQueries().skipUpdateEvent(key, req.partitionId(i), req.updateCounter(i),
+                                cctx.continuousQueries().skipUpdateEvent(
+                                    lsnrs,
+                                    key,
+                                    req.partitionId(i),
+                                    req.updateCounter(i),
                                     updateReq.topologyVersion());
 
                                 hndKeys.add(key);
@@ -378,27 +404,38 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                         updateRes.addFailedKey(key, err);
             }
             else {
-                Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
+                if (lsnrs != null) {
+                    Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
 
-                exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
-                    for (int i = 0; i < req.size(); i++) {
-                        KeyCacheObject key = req.key(i);
+                    exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
+                        for (int i = 0; i < req.size(); i++) {
+                            KeyCacheObject key = req.key(i);
 
-                        if (!hndKeys.contains(key)) {
-                            try {
-                                cctx.continuousQueries().onEntryUpdated(key, req.value(i), req.localPreviousValue(i),
-                                    key.internal() || !cctx.userCache(), req.partitionId(i), true, false,
-                                    req.updateCounter(i), updateReq.topologyVersion());
-                            }
-                            catch (IgniteCheckedException e) {
-                                U.warn(log, "Failed to send continuous query message. [key=" + key + ", newVal="
-                                    + req.value(i) + ", err=" + e + "]");
-                            }
+                            if (!hndKeys.contains(key)) {
+                                try {
+                                    cctx.continuousQueries().onEntryUpdated(
+                                        lsnrs,
+                                        key,
+                                        req.value(i),
+                                        req.localPreviousValue(i),
+                                        key.internal() || !cctx.userCache(),
+                                        req.partitionId(i),
+                                        true,
+                                        false,
+                                        req.updateCounter(i),
+                                        updateReq.topologyVersion());
+                                }
+                                catch (IgniteCheckedException e) {
+                                    U.warn(log, "Failed to send continuous query message. [key=" + key +
+                                        ", newVal=" + req.value(i) +
+                                        ", err=" + e + "]");
+                                }
 
-                            hndKeys.add(key);
+                                hndKeys.add(key);
 
-                            if (hndKeys.size() == keys.size())
-                                break exit;
+                                if (hndKeys.size() == keys.size())
+                                    break exit;
+                            }
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 7cc276f..e417cdb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.IgniteCodeGeneratingFail;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -49,6 +50,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Lite dht cache backup update request.
  */
+@IgniteCodeGeneratingFail // Need add 'cleanup' call in 'writeTo' method.
 public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements GridCacheDeployable {
     /** */
     private static final long serialVersionUID = 0L;
@@ -215,7 +217,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
         keys = new ArrayList<>();
         partIds = new ArrayList<>();
-        locPrevVals = new ArrayList<>();
 
         if (forceTransformBackups) {
             entryProcessors = new ArrayList<>();
@@ -240,7 +241,10 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
      * @param conflictExpireTime Conflict expire time (optional).
      * @param conflictVer Conflict version (optional).
      * @param addPrevVal If {@code true} adds previous value.
+     * @param partId Partition.
      * @param prevVal Previous value.
+     * @param updateCntr Update counter.
+     * @param storeLocPrevVal If {@code true} stores previous value.
      */
     public void addWriteValue(KeyCacheObject key,
         @Nullable CacheObject val,
@@ -251,12 +255,18 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         boolean addPrevVal,
         int partId,
         @Nullable CacheObject prevVal,
-        @Nullable Long updateIdx) {
+        @Nullable Long updateCntr,
+        boolean storeLocPrevVal) {
         keys.add(key);
 
         partIds.add(partId);
 
-        locPrevVals.add(prevVal);
+        if (storeLocPrevVal) {
+            if (locPrevVals == null)
+                locPrevVals = new ArrayList<>();
+
+            locPrevVals.add(prevVal);
+        }
 
         if (forceTransformBackups) {
             assert entryProcessor != null;
@@ -273,11 +283,11 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
             prevVals.add(prevVal);
         }
 
-        if (updateIdx != null) {
+        if (updateCntr != null) {
             if (updateCntrs == null)
                 updateCntrs = new GridLongList();
 
-            updateCntrs.add(updateIdx);
+            updateCntrs.add(updateCntr);
         }
 
         // In case there is no conflict, do not create the list.
@@ -521,6 +531,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
      * @return Value.
      */
     @Nullable public CacheObject localPreviousValue(int idx) {
+        assert locPrevVals != null;
+
         return locPrevVals.get(idx);
     }
 
@@ -849,6 +861,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
         }
 
+        cleanup();
+
         return true;
     }
 
@@ -1048,6 +1062,20 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         return reader.afterMessageRead(GridDhtAtomicUpdateRequest.class);
     }
 
+    /**
+     * Cleanup values not needed after message was sent.
+     */
+    private void cleanup() {
+        nearVals = null;
+        prevVals = null;
+
+        // Do not keep values if they are not needed for continuous query notification.
+        if (locPrevVals == null) {
+           vals = null;
+           locPrevVals = null;
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public byte directType() {
         return 38;

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 8f0cab7..0d8f795 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -1107,7 +1107,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 next = null;
 
                 while (it.hasNext()) {
-                    final LazySwapEntry e = new LazySwapEntry(it.next(), keepBinary);
+                    final LazySwapEntry e = new LazySwapEntry(it.next());
 
                     if (filter != null) {
                         K key = (K)cctx.unwrapBinaryIfNeeded(e.key(), keepBinary);
@@ -2524,15 +2524,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         /** */
         private final Map.Entry<byte[], byte[]> e;
 
-        /** */
-        private boolean keepBinary;
-
         /**
          * @param e Entry with
          */
-        LazySwapEntry(Map.Entry<byte[], byte[]> e, boolean keepBinary) {
+        LazySwapEntry(Map.Entry<byte[], byte[]> e) {
             this.e = e;
-            this.keepBinary = keepBinary;
         }
 
         /** {@inheritDoc} */
@@ -2545,9 +2541,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         @Override protected V unmarshalValue() throws IgniteCheckedException {
             IgniteBiTuple<byte[], Byte> t = GridCacheSwapEntryImpl.getValue(e.getValue());
 
-            CacheObject obj = cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1());
-
-            return (V)cctx.cacheObjectContext().unwrapBinaryIfNeeded(obj, keepBinary);
+            return (V)cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1());
         }
 
         /** {@inheritDoc} */
@@ -2597,13 +2591,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         @Override protected V unmarshalValue() throws IgniteCheckedException {
             long ptr = GridCacheOffheapSwapEntry.valueAddress(valPtr.get1(), valPtr.get2());
 
-            CacheObject obj = cctx.fromOffheap(ptr, false);
-
-            V val = CU.value(obj, cctx, false);
-
-            assert val != null;
-
-            return val;
+            return (V)cctx.fromOffheap(ptr, false);
         }
 
         /** {@inheritDoc} */
@@ -2661,7 +2649,15 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             if (!filter.apply(key, val))
                 return null;
 
-            return new IgniteBiTuple<>(e.key(), (V)cctx.unwrapTemporary(e.value()));
+            if (key instanceof CacheObject)
+                ((CacheObject)key).prepareMarshal(cctx.cacheObjectContext());
+
+            val = (V)cctx.unwrapTemporary(e.value());
+
+            if (val instanceof CacheObject)
+                ((CacheObject)val).prepareMarshal(cctx.cacheObjectContext());
+
+            return new IgniteBiTuple<>(e.key(), val);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 7e66ad3..cf9b439 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -882,8 +882,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
          * @return Continuous query entry.
          */
         private CacheContinuousQueryEntry skipEntry(CacheContinuousQueryEntry e) {
-            if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1) {
-
+            if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1L) {
                 e.markFiltered();
 
                 return e;

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 86abbef..dce04de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -24,7 +24,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 /**
  * Continuous query listener.
  */
-interface CacheContinuousQueryListener<K, V> {
+public interface CacheContinuousQueryListener<K, V> {
     /**
      * Query execution callback.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 0e4cb40..cc59989 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
 import static javax.cache.event.EventType.CREATED;
@@ -155,37 +156,102 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * @param lsnrs Listeners to notify.
+     * @param key Entry key.
      * @param partId Partition id.
      * @param updCntr Updated counter.
      * @param topVer Topology version.
      */
-    public void skipUpdateEvent(KeyCacheObject key, int partId, long updCntr, AffinityTopologyVersion topVer) {
-        if (lsnrCnt.get() > 0) {
-            for (CacheContinuousQueryListener lsnr : lsnrs.values()) {
-                CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
-                    cctx.cacheId(),
-                    UPDATED,
-                    key,
-                    null,
-                    null,
-                    lsnr.keepBinary(),
-                    partId,
-                    updCntr,
-                    topVer);
+    public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs,
+        KeyCacheObject key, int partId, long updCntr, AffinityTopologyVersion topVer) {
+        assert lsnrs != null;
 
-                CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
-                    cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
+        for (CacheContinuousQueryListener lsnr : lsnrs.values()) {
+            CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
+                cctx.cacheId(),
+                UPDATED,
+                key,
+                null,
+                null,
+                lsnr.keepBinary(),
+                partId,
+                updCntr,
+                topVer);
 
-                lsnr.skipUpdateEvent(evt, topVer);
-            }
+            CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
+                cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
+
+            lsnr.skipUpdateEvent(evt, topVer);
         }
     }
 
     /**
+     * @param internal Internal entry flag (internal key or not user cache).
+     * @param preload Whether update happened during preloading.
+     * @return Registered listeners.
+     */
+    @Nullable public Map<UUID, CacheContinuousQueryListener> updateListeners(
+        boolean internal,
+        boolean preload) {
+        if (preload && !internal)
+            return null;
+
+        ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrCol;
+
+        if (internal)
+            lsnrCol = intLsnrCnt.get() > 0 ? intLsnrs : null;
+        else
+            lsnrCol = lsnrCnt.get() > 0 ? lsnrs : null;
+
+        return F.isEmpty(lsnrCol) ? null : lsnrCol;
+    }
+
+    /**
+     * @param key Key.
+     * @param newVal New value.
+     * @param oldVal Old value.
+     * @param internal Internal entry (internal key or not user cache).
+     * @param partId Partition.
+     * @param primary {@code True} if called on primary node.
+     * @param preload Whether update happened during preloading.
+     * @param updateCntr Update counter.
+     * @param topVer Topology version.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public void onEntryUpdated(
+        KeyCacheObject key,
+        CacheObject newVal,
+        CacheObject oldVal,
+        boolean internal,
+        int partId,
+        boolean primary,
+        boolean preload,
+        long updateCntr,
+        AffinityTopologyVersion topVer) throws IgniteCheckedException {
+        Map<UUID, CacheContinuousQueryListener> lsnrCol = updateListeners(internal, preload);
+
+        if (lsnrCol != null) {
+            onEntryUpdated(
+                lsnrCol,
+                key,
+                newVal,
+                oldVal,
+                internal,
+                partId,
+                primary,
+                preload,
+                updateCntr,
+                topVer);
+        }
+    }
+
+    /**
+     * @param lsnrCol Listeners to notify.
      * @param key Key.
      * @param newVal New value.
      * @param oldVal Old value.
      * @param internal Internal entry (internal key or not user cache),
+     * @param partId Partition.
      * @param primary {@code True} if called on primary node.
      * @param preload Whether update happened during preloading.
      * @param updateCntr Update counter.
@@ -193,6 +259,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @throws IgniteCheckedException In case of error.
      */
     public void onEntryUpdated(
+        Map<UUID, CacheContinuousQueryListener> lsnrCol,
         KeyCacheObject key,
         CacheObject newVal,
         CacheObject oldVal,
@@ -205,25 +272,16 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         throws IgniteCheckedException
     {
         assert key != null;
-
-        if (preload && !internal)
-            return;
-
-        ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrCol;
-
-        if (internal)
-            lsnrCol = intLsnrCnt.get() > 0 ? intLsnrs : null;
-        else
-            lsnrCol = lsnrCnt.get() > 0 ? lsnrs : null;
-
-        if (F.isEmpty(lsnrCol))
-            return;
+        assert lsnrCol != null;
 
         boolean hasNewVal = newVal != null;
         boolean hasOldVal = oldVal != null;
 
-        if (!hasNewVal && !hasOldVal)
+        if (!hasNewVal && !hasOldVal) {
+            skipUpdateEvent(lsnrCol, key, partId, updateCntr, topVer);
+
             return;
+        }
 
         EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : UPDATED;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 7c7e3e3..0218897 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -893,11 +894,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         }
 
         // Load partition counters.
-        if (hnd.isQuery() && ctx.cache() != null && ctx.cache().internalCache(hnd.cacheName()) != null) {
-            Map<Integer, Long> cntrs = ctx.cache().internalCache(hnd.cacheName())
-                .context().topology().updateCounters();
+        if (hnd.isQuery()) {
+            GridCacheProcessor proc = ctx.cache();
 
-            req.addUpdateCounters(cntrs);
+            if (proc != null) {
+                GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName());
+
+                if (cache != null && !cache.isLocal()) {
+                    Map<Integer, Long> cntrs = cache.context().topology().updateCounters();
+
+                    req.addUpdateCounters(cntrs);
+                }
+            }
         }
 
         if (err != null)


[10/22] ignite git commit: Added test.

Posted by sb...@apache.org.
Added test.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5539cbad
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5539cbad
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5539cbad

Branch: refs/heads/ignite-2407
Commit: 5539cbadb81195c35889b71486b1e449b637d8c0
Parents: 16927ab
Author: sboikov <sb...@gridgain.com>
Authored: Wed Feb 10 15:32:03 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Feb 10 15:32:03 2016 +0300

----------------------------------------------------------------------
 ...niteCacheEntryListenerExpiredEventsTest.java | 202 +++++++++++++++++++
 1 file changed, 202 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5539cbad/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerExpiredEventsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerExpiredEventsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerExpiredEventsTest.java
new file mode 100644
index 0000000..d9fdfac
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerExpiredEventsTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryExpiredListener;
+import javax.cache.event.CacheEntryListener;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ModifiedExpiryPolicy;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheEntryListenerExpiredEventsTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static AtomicInteger evtCntr;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid(0);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testExpiredEventAtomic() throws Exception {
+        checkExpiredEvents(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testExpiredEventAtomicOffheap() throws Exception {
+        checkExpiredEvents(cacheConfiguration(PARTITIONED, ATOMIC, OFFHEAP_TIERED));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testExpiredEventTx() throws Exception {
+        checkExpiredEvents(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testExpiredEventTxOffheap() throws Exception {
+        checkExpiredEvents(cacheConfiguration(PARTITIONED, TRANSACTIONAL, OFFHEAP_TIERED));
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void checkExpiredEvents(CacheConfiguration<Object, Object> ccfg) throws Exception {
+        IgniteCache<Object, Object> cache = ignite(0).createCache(ccfg);
+
+        try {
+            evtCntr = new AtomicInteger();
+
+            CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+                new ExpiredListenerFactory(),
+                null,
+                true,
+                false
+            );
+
+            cache.registerCacheEntryListener(lsnrCfg);
+
+            IgniteCache<Object, Object> expiryCache =
+                cache.withExpiryPolicy(new ModifiedExpiryPolicy(new Duration(MILLISECONDS, 500)));
+
+            expiryCache.put(1, 1);
+
+            for (int i = 0; i < 10; i++)
+                cache.get(i);
+
+            boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return evtCntr.get() > 0;
+                }
+            }, 5000);
+
+            assertTrue(wait);
+
+            U.sleep(100);
+
+            assertEquals(1, evtCntr.get());
+        }
+        finally {
+            ignite(0).destroyCache(cache.getName());
+        }
+    }
+
+    /**
+     *
+     * @param cacheMode Cache mode.
+     * @param atomicityMode Cache atomicity mode.
+     * @param memoryMode Cache memory mode.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Object, Object> cacheConfiguration(
+        CacheMode cacheMode,
+        CacheAtomicityMode atomicityMode,
+        CacheMemoryMode memoryMode) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setMemoryMode(memoryMode);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(1);
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    private static class ExpiredListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
+        /** {@inheritDoc} */
+        @Override public CacheEntryListener<Object, Object> create() {
+            return new ExpiredListener();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class ExpiredListener implements CacheEntryExpiredListener<Object, Object> {
+        /** {@inheritDoc} */
+        @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts) {
+            for (CacheEntryEvent<?, ?> evt : evts)
+                evtCntr.incrementAndGet();
+        }
+    }
+}