You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/03/29 10:04:55 UTC
[26/27] ignite git commit: Merge branch 'master' into ignite-2004
Merge branch 'master' into ignite-2004
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1c5e96d1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1c5e96d1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1c5e96d1
Branch: refs/heads/ignite-2004
Commit: 1c5e96d14e530d5c90d9dc7809b488bfa8e01bd2
Parents: 116c6ce 732abda
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Tue Mar 29 11:01:51 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue Mar 29 11:02:05 2016 +0300
----------------------------------------------------------------------
.../store/CacheLoadOnlyStoreExample.java | 171 ++++++
examples/src/main/resources/person.csv | 20 +
.../ignite/examples/CacheExamplesSelfTest.java | 8 +
.../rest/RestBinaryProtocolSelfTest.java | 4 +-
.../rest/RestMemcacheProtocolSelfTest.java | 4 +-
.../java/org/apache/ignite/IgniteCache.java | 20 +-
.../cache/store/jdbc/CacheJdbcPojoStore.java | 2 +-
.../igfs/IgfsIpcEndpointConfiguration.java | 28 +
.../ignite/internal/binary/BinaryContext.java | 2 +
.../ignite/internal/binary/BinaryUtils.java | 16 -
.../binary/builder/BinaryObjectBuilderImpl.java | 107 ++--
.../managers/communication/GridIoManager.java | 54 +-
.../managers/communication/GridIoPolicy.java | 3 +
.../discovery/GridDiscoveryManager.java | 2 +-
.../cache/CacheClusterMetricsMXBeanImpl.java | 410 +++++++++++++++
.../cache/CacheLocalMetricsMXBeanImpl.java | 410 +++++++++++++++
.../cache/CacheMetricsMXBeanImpl.java | 410 ---------------
.../processors/cache/GridCacheAdapter.java | 45 +-
.../processors/cache/GridCacheMapEntry.java | 2 +-
.../GridCachePartitionExchangeManager.java | 2 +-
.../processors/cache/GridCacheProcessor.java | 15 +-
.../processors/cache/GridCacheProxyImpl.java | 45 +-
.../processors/cache/IgniteCacheProxy.java | 39 +-
.../processors/cache/IgniteInternalCache.java | 31 +-
.../dht/atomic/GridDhtAtomicCache.java | 17 +-
.../continuous/GridContinuousProcessor.java | 2 +-
.../datastreamer/DataStreamProcessor.java | 8 +-
.../datastreamer/DataStreamerImpl.java | 37 +-
.../internal/processors/igfs/IgfsBlockKey.java | 30 +-
.../processors/igfs/IgfsDataManager.java | 29 +-
.../processors/igfs/IgfsDeleteWorker.java | 2 +-
.../processors/igfs/IgfsDirectoryInfo.java | 33 +-
.../internal/processors/igfs/IgfsEntryInfo.java | 8 +-
.../igfs/IgfsFragmentizerManager.java | 4 +-
.../processors/igfs/IgfsIpcHandler.java | 81 ++-
.../internal/processors/igfs/IgfsProcessor.java | 11 +-
.../internal/processors/igfs/IgfsServer.java | 2 +-
.../internal/processors/igfs/IgfsUtils.java | 127 +++++
.../meta/IgfsMetaDirectoryCreateProcessor.java | 24 +-
.../IgfsMetaDirectoryListingAddProcessor.java | 6 +-
...gfsMetaDirectoryListingReplaceProcessor.java | 4 +-
.../igfs/meta/IgfsMetaFileCreateProcessor.java | 16 +-
.../meta/IgfsMetaUpdatePropertiesProcessor.java | 5 +-
.../platform/cache/PlatformCache.java | 2 +-
.../datastreamer/PlatformDataStreamer.java | 7 +-
.../handlers/cache/GridCacheCommandHandler.java | 2 +-
.../util/offheap/unsafe/GridUnsafeMap.java | 4 +-
.../internal/visor/cache/VisorCacheMetrics.java | 2 +-
.../visor/cache/VisorCacheResetMetricsTask.java | 2 +-
.../BinaryObjectBuilderAdditionalSelfTest.java | 144 ++++-
...naryObjectBuilderDefaultMappersSelfTest.java | 2 +-
.../CacheMetricsForClusterGroupSelfTest.java | 2 +-
.../cache/CacheSwapUnswapGetTest.java | 4 +-
.../cache/GridCacheAbstractMetricsSelfTest.java | 124 ++---
.../GridCacheOffHeapValuesEvictionSelfTest.java | 18 +-
...cheTransactionalAbstractMetricsSelfTest.java | 4 +-
.../cache/IgniteCacheAbstractTest.java | 6 +
.../IgniteCacheEntryListenerAbstractTest.java | 16 +-
.../distributed/IgniteCacheManyClientsTest.java | 6 +
.../dht/GridCacheColocatedDebugTest.java | 2 +-
...ePartitionedNearDisabledMetricsSelfTest.java | 2 +-
...AtomicPartitionedTckMetricsSelfTestImpl.java | 92 ++--
.../near/GridCacheNearMetricsSelfTest.java | 152 +++---
...idCachePartitionedHitsAndMissesSelfTest.java | 2 +-
...CacheLocalOffHeapAndSwapMetricsSelfTest.java | 380 ++++++-------
...dCacheAtomicLocalTckMetricsSelfTestImpl.java | 92 ++--
...usQueryFactoryFilterRandomOperationTest.java | 6 +-
...ContinuousQueryFailoverAbstractSelfTest.java | 13 +-
...acheContinuousQueryRandomOperationsTest.java | 527 +++++++++++++++++--
...inuousQueryRandomOperationsTwoNodesTest.java | 28 +
.../processors/igfs/IgfsAbstractSelfTest.java | 42 +-
...lockMessageSystemPoolStarvationSelfTest.java | 299 +++++++++++
.../igfs/IgfsPrimaryMultiNodeSelfTest.java | 28 +
.../IgfsPrimaryOptimziedMarshallerSelfTest.java | 28 +
...maryRelaxedConsistencyMultiNodeSelfTest.java | 28 +
.../IgfsPrimaryRelaxedConsistencySelfTest.java | 28 +
.../igfs/IgfsPrimaryRelaxedSelfTest.java | 28 -
.../igfs/IgfsProcessorValidationSelfTest.java | 16 +
.../platform/PlatformAttributeNodeFilter.java | 31 ++
.../multijvm/IgniteCacheProcessProxy.java | 8 +
.../IgniteCacheDataStructuresSelfTestSuite.java | 8 +-
.../ignite/testsuites/IgniteIgfsTestSuite.java | 17 +-
.../ignite/igfs/Hadoop1DualAbstractTest.java | 3 +-
.../IgniteBinaryCacheQueryTestSuite2.java | 39 ++
.../IgniteBinaryCacheQueryTestSuite3.java | 39 ++
.../IgniteBinaryCacheQueryTestSuite4.java | 39 ++
.../IgniteCacheQuerySelfTestSuite.java | 160 ------
.../IgniteCacheQuerySelfTestSuite2.java | 109 ++++
.../IgniteCacheQuerySelfTestSuite3.java | 101 ++++
.../IgniteCacheQuerySelfTestSuite4.java | 49 ++
modules/platforms/cpp/common/src/java.cpp | 74 ++-
.../impl/binary/binary_type_updater_impl.cpp | 2 +
.../Apache.Ignite.Core.Tests.csproj | 15 +
.../Binary/BinaryBuilderSelfTest.cs | 60 ++-
.../Query/CacheQueriesCodeConfigurationTest.cs | 16 +-
.../Config/Apache.Ignite.exe.config.test2 | 58 ++
.../Config/Apache.Ignite.exe.config.test3 | 34 ++
.../Config/cache-local-node.xml | 65 +++
.../Config/ignite-dotnet-cfg.xml | 52 ++
.../Dataload/DataStreamerTestTopologyChange.cs | 104 ++++
.../Examples/ExamplesTest.cs | 3 +-
.../Apache.Ignite.Core.Tests/ExecutableTest.cs | 89 +++-
.../IgniteConfigurationSerializerTest.cs | 3 +-
.../Apache.Ignite.Core.Tests/JavaHomeTest.cs | 69 +++
.../Cache/Configuration/QueryEntity.cs | 9 +-
.../Apache.Ignite.Core/IgniteConfiguration.cs | 10 +-
.../IgniteConfigurationSection.xsd | 8 +
.../Impl/Datastream/DataStreamerImpl.cs | 9 +-
.../Apache.Ignite.Core/Impl/IgniteUtils.cs | 34 ++
modules/platforms/dotnet/Apache.Ignite.sln | 5 +-
.../dotnet/Apache.Ignite/Apache.Ignite.csproj | 2 +-
.../Config/AppSettingsConfigurator.cs | 97 +---
.../Apache.Ignite/Config/ArgsConfigurator.cs | 145 +----
.../dotnet/Apache.Ignite/Config/Configurator.cs | 168 ++++++
.../Apache.Ignite/Config/IConfigurator.cs | 34 --
.../dotnet/Apache.Ignite/IgniteRunner.cs | 24 +-
.../Apache.Ignite/Service/IgniteService.cs | 21 +-
.../Compute/ClosureExample.cs | 6 +-
.../Compute/TaskExample.cs | 6 +-
.../Datagrid/ContinuousQueryExample.cs | 11 +-
.../Datagrid/DataStreamerExample.cs | 8 +-
.../Datagrid/PutGetExample.cs | 8 +-
.../Datagrid/QueryExample.cs | 22 +-
.../Datagrid/StoreExample.cs | 18 +-
.../Datagrid/TransactionExample.cs | 16 +-
.../Events/EventsExample.cs | 6 +-
.../Messaging/MessagingExample.cs | 6 +-
.../Misc/LifecycleExample.cs | 4 +-
.../Services/ServicesExample.cs | 6 +-
.../Apache.Ignite.ExamplesDll.csproj | 1 +
.../Apache.Ignite.ExamplesDll/Binary/Account.cs | 1 -
.../Apache.Ignite.ExamplesDll/Binary/Address.cs | 5 +-
.../Binary/Employee.cs | 4 +
.../Binary/EmployeeKey.cs | 4 +-
.../Binary/Organization.cs | 11 +-
.../Binary/OrganizationType.cs | 3 -
.../Datagrid/EmployeeStoreFactory.cs | 38 ++
.../examples/Config/example-cache-query.xml | 118 -----
.../examples/Config/example-cache-store.xml | 59 ---
.../dotnet/examples/Config/example-cache.xml | 87 ---
.../dotnet/examples/Config/example-compute.xml | 70 ---
.../dotnet/examples/Config/examples-config.xml | 98 ++++
parent/pom.xml | 2 +
143 files changed, 4830 insertions(+), 2035 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c5e96d1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 932ef83,c5df29b..2d58b15
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@@ -2096,8 -2092,7 +2096,8 @@@ public abstract class GridCacheMapEntr
null,
null,
false,
- updateCntr0 == null ? 0 : updateCntr0,
- 0);
++ 0,
+ null);
}
}
else
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c5e96d1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c5e96d1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java
index 9befb0d,0000000..8fe088a
mode 100644,000000..100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java
@@@ -1,723 -1,0 +1,723 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryExpiredListener;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryRemovedListener;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.NotNull;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterRandomOperationTest.NonSerializableFilter.isAccepted;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFactoryFilterRandomOperationTest extends CacheContinuousQueryRandomOperationsTest {
+ /** */
+ private static final int NODES = 5;
+
+ /** */
+ private static final int KEYS = 50;
+
+ /** */
+ private static final int VALS = 10;
+
+ /** */
+ public static final int ITERATION_CNT = 40;
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInternalQuery() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 1,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
- final IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(ccfg);
++ final IgniteCache<Object, Object> cache = grid(0).createCache(ccfg);
+
+ UUID uuid = null;
+
+ try {
+ for (int i = 0; i < 10; i++)
+ cache.put(i, i);
+
+ final CountDownLatch latch = new CountDownLatch(5);
+
+ CacheEntryUpdatedListener lsnr = new CacheEntryUpdatedListener() {
+ @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+ for (Object evt : iterable) {
+ latch.countDown();
+
+ log.info("Received event: " + evt);
+ }
+ }
+ };
+
+ uuid = grid(0).context().cache().cache(cache.getName()).context().continuousQueries()
+ .executeInternalQuery(lsnr, new SerializableFilter(), false, true, true);
+
+ for (int i = 10; i < 20; i++)
+ cache.put(i, i);
+
+ assertTrue(latch.await(3, SECONDS));
+ }
+ finally {
+ if (uuid != null)
+ grid(0).context().cache().cache(cache.getName()).context().continuousQueries()
+ .cancelInternalQuery(uuid);
+
- cache.destroy();
++ grid(0).destroyCache(ccfg.getName());
+ }
+ }
+
+ /** {@inheritDoc} */
- @Override protected void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy)
++ @Override protected void doTestContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy)
+ throws Exception {
+ ignite(0).createCache(ccfg);
+
+ try {
+ long seed = System.currentTimeMillis();
+
+ Random rnd = new Random(seed);
+
+ log.info("Random seed: " + seed);
+
+ List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues = new ArrayList<>();
+
+ Collection<QueryCursor<?>> curs = new ArrayList<>();
+
+ Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs = new ArrayList<>();
+
+ if (deploy == CLIENT)
+ evtsQueues.add(registerListener(ccfg.getName(), NODES - 1, curs, lsnrCfgs, rnd.nextBoolean()));
+ else if (deploy == SERVER)
+ evtsQueues.add(registerListener(ccfg.getName(), rnd.nextInt(NODES - 1), curs, lsnrCfgs,
+ rnd.nextBoolean()));
+ else {
+ boolean isSync = rnd.nextBoolean();
+
+ for (int i = 0; i < NODES - 1; i++)
+ evtsQueues.add(registerListener(ccfg.getName(), i, curs, lsnrCfgs, isSync));
+ }
+
+ ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
+
+ Map<Integer, Long> partCntr = new ConcurrentHashMap<>();
+
+ try {
+ for (int i = 0; i < ITERATION_CNT; i++) {
+ if (i % 10 == 0)
+ log.info("Iteration: " + i);
+
+ for (int idx = 0; idx < NODES; idx++)
+ randomUpdate(rnd, evtsQueues, expData, partCntr, grid(idx).cache(ccfg.getName()));
+ }
+ }
+ finally {
+ for (QueryCursor<?> cur : curs)
+ cur.close();
+
+ for (T2<Integer, MutableCacheEntryListenerConfiguration> e : lsnrCfgs)
+ grid(e.get1()).cache(ccfg.getName()).deregisterCacheEntryListener(e.get2());
+ }
+ }
+ finally {
+ ignite(0).destroyCache(ccfg.getName());
+ }
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param nodeIdx Node index.
+ * @param curs Cursors.
+ * @param lsnrCfgs Listener configurations.
+ * @return Event queue
+ */
+ private BlockingQueue<CacheEntryEvent<?, ?>> registerListener(String cacheName,
+ int nodeIdx,
+ Collection<QueryCursor<?>> curs,
+ Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs,
+ boolean sync) {
+ final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
+
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ MutableCacheEntryListenerConfiguration<QueryTestKey, QueryTestValue> lsnrCfg =
+ new MutableCacheEntryListenerConfiguration<>(
+ FactoryBuilder.factoryOf(new LocalNonSerialiseListener() {
+ @Override protected void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts)
+ evtsQueue.add(evt);
+ }
+ }),
+ createFilterFactory(),
+ true,
+ sync
+ );
+
+ grid(nodeIdx).cache(cacheName).registerCacheEntryListener((CacheEntryListenerConfiguration)lsnrCfg);
+
+ lsnrCfgs.add(new T2<Integer, MutableCacheEntryListenerConfiguration>(nodeIdx, lsnrCfg));
+ }
+ else {
+ ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+ for (CacheEntryEvent<?, ?> evt : evts)
+ evtsQueue.add(evt);
+ }
+ });
+
+ qry.setRemoteFilterFactory(createFilterFactory());
+
+ QueryCursor<?> cur = grid(nodeIdx).cache(cacheName).query(qry);
+
+ curs.add(cur);
+ }
+
+ return evtsQueue;
+ }
+
+ /**
+ * @return Filter factory.
+ */
+ @NotNull protected Factory<? extends CacheEntryEventFilter<QueryTestKey, QueryTestValue>> createFilterFactory() {
+ return new FilterFactory();
+ }
+
+ /**
+ * @param rnd Random generator.
+ * @param evtsQueues Events queue.
+ * @param expData Expected cache data.
+ * @param partCntr Partition counter.
+ * @param cache Cache.
+ * @throws Exception If failed.
+ */
+ private void randomUpdate(
+ Random rnd,
+ List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
+ ConcurrentMap<Object, Object> expData,
+ Map<Integer, Long> partCntr,
+ IgniteCache<Object, Object> cache)
+ throws Exception {
+ Object key = new QueryTestKey(rnd.nextInt(KEYS));
+ Object newVal = value(rnd);
+ Object oldVal = expData.get(key);
+
+ int op = rnd.nextInt(11);
+
+ Ignite ignite = cache.unwrap(Ignite.class);
+
+ Transaction tx = null;
+
+ if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL && rnd.nextBoolean())
+ tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd));
+
+ try {
+ // log.info("Random operation [key=" + key + ", op=" + op + ']');
+
+ switch (op) {
+ case 0: {
+ cache.put(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+
+ break;
+ }
+
+ case 1: {
+ cache.getAndPut(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+
+ break;
+ }
+
+ case 2: {
+ cache.remove(key);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+ expData.remove(key);
+
+ break;
+ }
+
+ case 3: {
+ cache.getAndRemove(key);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+ expData.remove(key);
+
+ break;
+ }
+
+ case 4: {
+ cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+
+ break;
+ }
+
+ case 5: {
+ cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+ expData.remove(key);
+
+ break;
+ }
+
+ case 6: {
+ cache.putIfAbsent(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ if (oldVal == null) {
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueues);
+
+ break;
+ }
+
+ case 7: {
+ cache.getAndPutIfAbsent(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ if (oldVal == null) {
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueues);
+
+ break;
+ }
+
+ case 8: {
+ cache.replace(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ if (oldVal != null) {
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueues);
+
+ break;
+ }
+
+ case 9: {
+ cache.getAndReplace(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ if (oldVal != null) {
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueues);
+
+ break;
+ }
+
+ case 10: {
+ if (oldVal != null) {
+ Object replaceVal = value(rnd);
+
+ boolean success = replaceVal.equals(oldVal);
+
+ if (success) {
+ cache.replace(key, replaceVal, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+ }
+ else {
+ cache.replace(key, replaceVal, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ checkNoEvent(evtsQueues);
+ }
+ }
+ else {
+ cache.replace(key, value(rnd), newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ checkNoEvent(evtsQueues);
+ }
+
+ break;
+ }
+
+ default:
+ fail("Op:" + op);
+ }
+ } finally {
+ if (tx != null)
+ tx.close();
+ }
+ }
+
+ /**
+ * @param rnd {@link Random}.
+ * @return {@link TransactionIsolation}.
+ */
+ private TransactionIsolation txRandomIsolation(Random rnd) {
+ int val = rnd.nextInt(3);
+
+ if (val == 0)
+ return READ_COMMITTED;
+ else if (val == 1)
+ return REPEATABLE_READ;
+ else
+ return SERIALIZABLE;
+ }
+
+ /**
+ * @param rnd {@link Random}.
+ * @return {@link TransactionConcurrency}.
+ */
+ private TransactionConcurrency txRandomConcurrency(Random rnd) {
+ return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC;
+ }
+
+ /**
+ * @param cache Cache.
+ * @param key Key
+ * @param cntrs Partition counters.
+ */
+ private void updatePartitionCounter(IgniteCache<Object, Object> cache, Object key, Map<Integer, Long> cntrs) {
+ Affinity<Object> aff = cache.unwrap(Ignite.class).affinity(cache.getName());
+
+ int part = aff.partition(key);
+
+ Long partCntr = cntrs.get(part);
+
+ if (partCntr == null)
+ partCntr = 0L;
+
+ cntrs.put(part, ++partCntr);
+ }
+
+ /**
+ * @param rnd Random generator.
+ * @return Cache value.
+ */
+ private static Object value(Random rnd) {
+ return new QueryTestValue(rnd.nextInt(VALS));
+ }
+
+ /**
+ * @param evtsQueues Event queue.
+ * @param partCntrs Partition counters.
+ * @param aff Affinity function.
+ * @param key Key.
+ * @param val Value.
+ * @param oldVal Old value.
+ * @throws Exception If failed.
+ */
+ private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
+ Map<Integer, Long> partCntrs,
+ Affinity<Object> aff,
+ Object key,
+ Object val,
+ Object oldVal)
+ throws Exception {
+ if ((val == null && oldVal == null
+ || (val != null && !isAccepted((QueryTestValue)val)))) {
+ checkNoEvent(evtsQueues);
+
+ return;
+ }
+
+ for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+ CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
+
+ assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', evt);
+ assertEquals(key, evt.getKey());
+ assertEquals(val, evt.getValue());
+ assertEquals(oldVal, evt.getOldValue());
+
+ long cntr = partCntrs.get(aff.partition(key));
+ CacheQueryEntryEvent qryEntryEvt = evt.unwrap(CacheQueryEntryEvent.class);
+
+ assertNotNull(cntr);
+ assertNotNull(qryEntryEvt);
+
+ assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter());
+ }
+ }
+
+ /**
+ * @param evtsQueues Event queue.
+ * @throws Exception If failed.
+ */
+ private void checkNoEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues) throws Exception {
+ for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+ CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS);
+
+ assertNull(evt);
+ }
+ }
+
+ /**
+ *
+ */
+ protected static class NonSerializableFilter
+ implements CacheEntryEventSerializableFilter<CacheContinuousQueryRandomOperationsTest.QueryTestKey,
+ CacheContinuousQueryRandomOperationsTest.QueryTestValue>, Externalizable {
+ /** */
+ public NonSerializableFilter() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event)
+ throws CacheEntryListenerException {
+ return isAccepted(event.getValue());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ fail("Entry filter should not be marshaled.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ fail("Entry filter should not be marshaled.");
+ }
+
+ /**
+ * @return {@code True} if value is even.
+ */
+ public static boolean isAccepted(QueryTestValue val) {
+ return val == null || val.val1 % 2 == 0;
+ }
+ }
+
+ /**
+ *
+ */
+ protected static class SerializableFilter implements CacheEntryEventSerializableFilter<Integer, Integer> {
+ /** */
+ public SerializableFilter() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> event)
+ throws CacheEntryListenerException {
+ return isAccepted(event.getValue());
+ }
+
+ /**
+ * @return {@code True} if value is even.
+ */
+ public static boolean isAccepted(Integer val) {
+ return val == null || val % 2 == 0;
+ }
+ }
+
+ /**
+ *
+ */
+ protected static class FilterFactory implements Factory<NonSerializableFilter> {
+ @Override public NonSerializableFilter create() {
+ return new NonSerializableFilter();
+ }
+ }
+
+ /**
+ *
+ */
+ public abstract class LocalNonSerialiseListener implements
+ CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>,
+ CacheEntryCreatedListener<QueryTestKey, QueryTestValue>,
+ CacheEntryExpiredListener<QueryTestKey, QueryTestValue>,
+ CacheEntryRemovedListener<QueryTestKey, QueryTestValue>,
+ Externalizable {
+ /** */
+ public LocalNonSerialiseListener() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCreated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+ onEvents(evts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onExpired(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+ onEvents(evts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onRemoved(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+ onEvents(evts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+ onEvents(evts);
+ }
+
+ /**
+ * @param evts Events.
+ */
+ protected abstract void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts);
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ throw new UnsupportedOperationException("Failed. Listener should not be marshaled.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ throw new UnsupportedOperationException("Failed. Listener should not be unmarshaled.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c5e96d1/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
index 0000000,73c856b..912c8f9
mode 000000,100644..100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
@@@ -1,0 -1,97 +1,101 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.testsuites;
+
+ import junit.framework.TestSuite;
+ import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchAckTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchForceServerModeAckTest;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest;
++import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFilterListenerTest;
++import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryAsyncFilterRandomOperationTest;
++import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterRandomOperationTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOperationP2PTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTwoNodesTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapTieredTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapValuesTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicP2PDisabledSelfTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicSelfTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryConcurrentTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryLocalAtomicSelfTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryLocalSelfTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionAtomicOneNodeTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionTxOneNodeTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionedOnlySelfTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionedP2PDisabledSelfTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionedSelfTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedAtomicOneNodeTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedAtomicSelfTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedP2PDisabledSelfTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedSelfTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedTxOneNodeTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxOffheapTieredTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxOffheapValuesTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxSelfTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
+ import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTxReconnectTest;
+
+ /**
+ * Test suite for cache queries.
+ */
+ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite {
+ /**
+ * @return Test suite.
+ * @throws Exception If failed.
+ */
+ public static TestSuite suite() throws Exception {
+ TestSuite suite = new TestSuite("Ignite Cache Queries Test Suite 3");
+
+ // Continuous queries.
+ suite.addTestSuite(GridCacheContinuousQueryLocalSelfTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryLocalAtomicSelfTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryReplicatedSelfTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryReplicatedAtomicSelfTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryReplicatedP2PDisabledSelfTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryPartitionedSelfTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryPartitionedOnlySelfTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryPartitionedP2PDisabledSelfTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryTxSelfTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryTxOffheapTieredTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryTxOffheapValuesTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryAtomicSelfTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryAtomicOffheapTieredTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryAtomicOffheapValuesTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryReplicatedTxOneNodeTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryReplicatedAtomicOneNodeTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryPartitionTxOneNodeTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryPartitionAtomicOneNodeTest.class);
+ suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class);
+ suite.addTestSuite(IgniteCacheContinuousQueryClientReconnectTest.class);
+ suite.addTestSuite(IgniteCacheContinuousQueryClientTxReconnectTest.class);
+ suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class);
+ suite.addTestSuite(CacheContinuousQueryRandomOperationsTwoNodesTest.class);
- suite.addTestSuite(CacheContinuousQueryFactoryFilterTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryConcurrentTest.class);
++ suite.addTestSuite(CacheContinuousQueryAsyncFilterListenerTest.class);
++ suite.addTestSuite(CacheContinuousQueryFactoryFilterRandomOperationTest.class);
++ suite.addTestSuite(CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.class);;
+ suite.addTestSuite(CacheContinuousQueryOperationP2PTest.class);
+ suite.addTestSuite(CacheContinuousBatchAckTest.class);
+ suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class);
+
+ return suite;
+ }
+ }