You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/08 15:41:23 UTC
[41/50] [abbrv] ignite git commit: IGNITE-3789 Test for
EntryProcessor/partition evts
IGNITE-3789 Test for EntryProcessor/partition evts
Tests that validate:
1. EntryProcessors execute against partitions that are fully owned/loaded.
2. Expected events are raised when partitions are moved due to a change in cluster topology.
See https://issues.apache.org/jira/browse/IGNITE-3456.
This closes #997.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6ca7e066
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6ca7e066
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6ca7e066
Branch: refs/heads/ignite-961
Commit: 6ca7e0666ef0ed1b70c775a06cee5a58d55a77a7
Parents: 8f69787
Author: Patrick Peralta <pa...@workday.com>
Authored: Thu Sep 1 11:49:08 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 1 11:49:08 2016 +0300
----------------------------------------------------------------------
.../GridCacheRebalancingOrderingTest.java | 916 +++++++++++++++++++
1 file changed, 916 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ca7e066/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingOrderingTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingOrderingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingOrderingTest.java
new file mode 100644
index 0000000..62fc5e9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingOrderingTest.java
@@ -0,0 +1,916 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteEvents;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.events.CacheRebalancingEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Test to validate cache and partition events raised from entry processors executing against
+ * partitions are are loading/unloading.
+ * <p>
+ * The test consists of two parts:
+ * <p>
+ * 1. The server side that maintains a map of partition id to the set of keys that belong to
+ * that partition for all partitions (primary + backup) owned by the server. This map
+ * is updated by a local listener registered for the following events:
+ * <ul>
+ * <li>EVT_CACHE_OBJECT_PUT</li>
+ * <li>EVT_CACHE_OBJECT_REMOVED</li>
+ * <li>EVT_CACHE_REBALANCE_OBJECT_LOADED</li>
+ * <li>EVT_CACHE_REBALANCE_OBJECT_UNLOADED</li>
+ * <li>EVT_CACHE_REBALANCE_PART_LOADED</li>
+ * <li>EVT_CACHE_REBALANCE_PART_UNLOADED</li>
+ * <li>EVT_CACHE_REBALANCE_PART_DATA_LOST</li>
+ * </ul>
+ * 2. The client side that generates a random number of keys for each partition and populates
+ * the cache. When the cache is loaded, each partition has at least one key assigned to it. The
+ * client then issues an {@code invokeAll} on the cache with a key set consisting of one key
+ * belonging to each partition.
+ * <p>
+ * The test makes the following assertions:
+ * <ol>
+ * <li>EntryProcessors should execute against partitions that are owned/fully loaded.
+ * If a processor executes against a partition that is partially loaded, the message
+ * "Key validation requires a retry for partitions" is logged on the client, and
+ * "Retrying validation for primary partition N due to newly arrived partition..." and
+ * "Retrying validation for primary partition N due to forming partition..." is logged
+ * on the server side.</li>
+ * <li>Events for entries being added/removed and partitions being loaded/unloaded
+ * should always be delivered to the server nodes that own the partition. If this does
+ * not happen, the client will log "For primary partition N expected [...], but
+ * found [...]; missing local keys: []" and "Key validation failed for partitions: [...]".
+ * The server will log "Retrying validation for primary|backup partition N due to
+ * forming partition" and "For primary|backup partition N expected [...], but found [...];"
+ * </li>
+ * </ol>
+ */
+public class GridCacheRebalancingOrderingTest extends GridCommonAbstractTest {
+ /** {@link Random} for test key generation. */
+ private final static Random RANDOM = new Random();
+
+ /** Test cache name. */
+ private static final String TEST_CACHE_NAME = "TestCache";
+
+ /** Flag to configure transactional versus non-transactional cache. */
+ public static final boolean TRANSACTIONAL = false;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (isFirstGrid(gridName)) {
+ cfg.setClientMode(true);
+
+ assert cfg.getDiscoverySpi() instanceof TcpDiscoverySpi : cfg.getDiscoverySpi();
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
+ }
+ else
+ cfg.setServiceConfiguration(getServiceConfiguration());
+
+ cfg.setCacheConfiguration(getCacheConfiguration());
+
+ return cfg;
+ }
+
+ /**
+ * @return service configuration for services used in test cluster
+ * @see #getConfiguration()
+ */
+ private ServiceConfiguration getServiceConfiguration() {
+ ServiceConfiguration cfg = new ServiceConfiguration();
+
+ cfg.setName(PartitionObserver.class.getName());
+ cfg.setService(new PartitionObserverService());
+ cfg.setMaxPerNodeCount(1);
+ cfg.setTotalCount(0); // 1 service per node.
+
+ return cfg;
+ }
+
+ /**
+ * @return Cache configuration used by test.
+ * @see #getConfiguration().
+ */
+ protected CacheConfiguration<IntegerKey, Integer> getCacheConfiguration() {
+ CacheConfiguration<IntegerKey, Integer> cfg = new CacheConfiguration<>();
+
+ cfg.setAtomicityMode(TRANSACTIONAL ? CacheAtomicityMode.TRANSACTIONAL : CacheAtomicityMode.ATOMIC);
+ cfg.setCacheMode(CacheMode.PARTITIONED);
+ cfg.setName(TEST_CACHE_NAME);
+ cfg.setAffinity(new RendezvousAffinityFunction(true /* machine-safe */, 271));
+ cfg.setAtomicWriteOrderMode(PRIMARY);
+ cfg.setBackups(1);
+ cfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean isMultiJvm() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 1000 * 60 * 5;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * Convert the given key from binary form, if necessary.
+ *
+ * @param key the key to convert if necessary
+ * @return the key
+ */
+ private static IntegerKey ensureKey(Object key) {
+ Object converted = key instanceof BinaryObject ? ((BinaryObject) key).deserialize() : key;
+ return converted instanceof IntegerKey ? (IntegerKey) converted : null;
+ }
+
+ /**
+ * Determine which of the specified keys (if any) are missing locally from the given cache.
+ *
+ * @param cache The cache to check.
+ * @param exp The expected set of keys.
+ * @return The set of missing keys.
+ */
+ private static Set<IntegerKey> getMissingKeys(IgniteCache<IntegerKey, Integer> cache, Set<IntegerKey> exp) {
+ Set<IntegerKey> missing = new HashSet<>();
+
+ for (IntegerKey key : exp) {
+ if (cache.localPeek(key, CachePeekMode.ALL) == null)
+ missing.add(key);
+ }
+
+ return missing;
+ }
+
+ /**
+ * For an Ignite cache, generate a random {@link IntegerKey} per partition. The number
+ * of partitions is determined by the cache's {@link Affinity}.
+ *
+ * @param ignite Ignite instance.
+ * @param cache Cache to generate keys for.
+ * @return Map of partition number to randomly generated key.
+ */
+ private Map<Integer, IntegerKey> generateKeysForPartitions(Ignite ignite, IgniteCache<IntegerKey, Integer> cache) {
+ Affinity<IntegerKey> affinity = ignite.affinity(cache.getName());
+
+ int parts = affinity.partitions();
+
+ Map<Integer, IntegerKey> keyMap = new HashMap<>(parts);
+
+ for (int i = 0; i < parts; i++) {
+ boolean found = false;
+
+ do {
+ IntegerKey key = new IntegerKey(RANDOM.nextInt(10000));
+
+ if (affinity.partition(key) == i) {
+ keyMap.put(i, key);
+ found = true;
+ }
+ } while (!found);
+ }
+
+ // Sanity check.
+ if (keyMap.size() != affinity.partitions())
+ throw new IllegalStateException("Inconsistent partition count");
+
+ for (int i = 0; i < parts; i++) {
+ IntegerKey key = keyMap.get(i);
+
+ if (affinity.partition(key) != i)
+ throw new IllegalStateException("Inconsistent partition");
+ }
+
+ return keyMap;
+ }
+
+ /**
+ * Starts background thread that launches servers. This method will block
+ * until at least one server is running.
+ *
+ * @return {@link ServerStarter} runnable that starts servers
+ * @throws Exception If failed.
+ */
+ private ServerStarter startServers() throws Exception {
+ ServerStarter srvStarter = new ServerStarter();
+
+ Thread t = new Thread(srvStarter);
+ t.setDaemon(true);
+ t.setName("Server Starter");
+ t.start();
+
+ srvStarter.waitForServerStart();
+
+ return srvStarter;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testEvents() throws Exception {
+ Ignite ignite = startGrid(0);
+
+ ServerStarter srvStarter = startServers();
+
+ IgniteCache<IntegerKey, Integer> cache = ignite.cache(TEST_CACHE_NAME);
+
+ // Generate a key per partition.
+ Map<Integer, IntegerKey> keyMap = generateKeysForPartitions(ignite, cache);
+
+ // Populate a random number of keys per partition.
+ Map<Integer, Set<IntegerKey>> partMap = new HashMap<>(keyMap.size());
+
+ for (Map.Entry<Integer, IntegerKey> entry : keyMap.entrySet()) {
+ Integer part = entry.getKey();
+ int affinity = entry.getValue().getKey();
+ int cnt = RANDOM.nextInt(10) + 1;
+
+ Set<IntegerKey> keys = new HashSet<>(cnt);
+
+ for (int i = 0; i < cnt; i++) {
+ IntegerKey key = new IntegerKey(RANDOM.nextInt(10000), affinity);
+ keys.add(key);
+ cache.put(key, RANDOM.nextInt());
+ }
+
+ partMap.put(part, keys);
+ }
+
+ // Display the partition map.
+ X.println("Partition Map:");
+
+ for (Map.Entry<Integer, Set<IntegerKey>> entry : partMap.entrySet())
+ X.println(entry.getKey() + ": " + entry.getValue());
+
+ // Validate keys across all partitions.
+ Affinity<IntegerKey> affinity = ignite.affinity(cache.getName());
+
+ Map<IntegerKey, KeySetValidator> validatorMap = new HashMap<>(partMap.size());
+
+ for (Map.Entry<Integer, Set<IntegerKey>> partEntry : partMap.entrySet()) {
+ Integer part = partEntry.getKey();
+
+ validatorMap.put(keyMap.get(part), new KeySetValidator(partEntry.getValue()));
+ }
+
+ int i = 0;
+
+ while (!srvStarter.isDone()) {
+ Map<IntegerKey, EntryProcessorResult<KeySetValidator.Result>> results = cache.invokeAll(validatorMap);
+
+ Set<Integer> failures = new HashSet<>();
+ Set<Integer> retries = new HashSet<>();
+
+ for (Map.Entry<IntegerKey, EntryProcessorResult<KeySetValidator.Result>> result : results.entrySet()) {
+ try {
+ if (result.getValue().get() == KeySetValidator.Result.RETRY)
+ retries.add(affinity.partition(result.getKey()));
+ }
+ catch (Exception e) {
+ X.println("!!! " + e.getMessage());
+ e.printStackTrace();
+ failures.add(affinity.partition(result.getKey()));
+ }
+ }
+
+ if (!failures.isEmpty()) {
+ X.println("*** Key validation failed for partitions: " + failures);
+ fail("https://issues.apache.org/jira/browse/IGNITE-3456");
+ }
+ else if (!retries.isEmpty()) {
+ X.println("*** Key validation requires a retry for partitions: " + retries);
+ retries.clear();
+ }
+ else
+ X.println("*** Key validation was successful: " + i);
+
+ i++;
+
+ Thread.sleep(500);
+ }
+ }
+
+ /**
+ * EntryProcessor that validates that the partition associated with the targeted key has a specified set of keys.
+ */
+ public static class KeySetValidator implements EntryProcessor<IntegerKey, Integer, KeySetValidator.Result> {
+ /** */
+ private final Set<IntegerKey> keys;
+
+ /**
+ * Create a new KeySetValidator.
+ *
+ * @param keys the expected keys belonging to the partition that owns the targeted key
+ */
+ KeySetValidator(Set<IntegerKey> keys) {
+ if (keys == null)
+ throw new IllegalArgumentException();
+
+ this.keys = keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Result process(MutableEntry<IntegerKey, Integer> entry, Object... objects) {
+ try {
+ Ignite ignite = entry.unwrap(Ignite.class);
+
+ PartitionObserver observer = ignite.services().service(PartitionObserver.class.getName());
+
+ assertNotNull(observer);
+
+ IgniteCache<IntegerKey, Integer> cache = ignite.cache(TEST_CACHE_NAME);
+
+ Affinity<IntegerKey> affinity = ignite.affinity(TEST_CACHE_NAME);
+
+ Set<IntegerKey> exp = this.keys;
+
+ Set<IntegerKey> missing = getMissingKeys(cache, exp);
+
+ IntegerKey key = entry.getKey();
+
+ int part = affinity.partition(key);
+
+ String ownership = affinity.isPrimary(ignite.cluster().localNode(), key) ? "primary" : "backup";
+
+ // Wait for the local listener to sync past events.
+ if (!observer.getIgniteLocalSyncListener().isSynced()) {
+ ignite.log().info("Retrying validation for " + ownership + " partition " + part
+ + " due to initial sync");
+
+ return Result.RETRY;
+ }
+
+ // Determine if the partition is being loaded and wait for it to load completely.
+ if (observer.getLoadingMap().containsKey(part)) {
+ ignite.log().info("Retrying validation due to forming partition [ownership=" + ownership +
+ ", partition=" + part +
+ ", expKeys=" + exp +
+ ", loadedKeys=" + observer.getLoadingMap().get(part) +
+ ", missingLocalKeys=" + missing + ']');
+
+ return Result.RETRY;
+ }
+
+ if (!observer.getPartitionMap().containsKey(part)) {
+ ignite.log().info("Retrying validation due to newly arrived partition [ownership=" + ownership +
+ ", partition=" + part +
+ ", missingLocalKeys=" + missing + ']');
+
+ return Result.RETRY;
+ }
+
+ // Validate the key count.
+ Set<IntegerKey> curr = observer.ensureKeySet(part);
+
+ if (curr.equals(exp) && missing.isEmpty())
+ return Result.OK;
+
+ String msg = String.format("For %s partition %s:\n\texpected %s,\n\t" +
+ "but found %s;\n\tmissing local keys: %s",
+ ownership, part, new TreeSet<>(exp), new TreeSet<>(curr), new TreeSet<>(missing));
+
+ ignite.log().info(">>> " + msg);
+
+ throw new EntryProcessorException(msg);
+ }
+ catch (NullPointerException e) {
+ e.printStackTrace();
+
+ throw e;
+ }
+ }
+
+ /**
+ *
+ */
+ enum Result {
+ /** */
+ OK,
+ /** */
+ RETRY
+ }
+ }
+
+ /**
+ * Integer value that can optionally be associated with another integer.
+ */
+ public static class IntegerKey implements Comparable<IntegerKey> {
+ /**
+ * The integer key value.
+ */
+ private final int val;
+
+ /**
+ * The optional associated integer.
+ */
+ @AffinityKeyMapped
+ private final Integer affinity;
+
+ /**
+ * Create a new IntegerKey for the given integer value.
+ *
+ * @param val the integer key value
+ */
+ IntegerKey(int val) {
+ this.val = val;
+ this.affinity = val;
+ }
+
+ /**
+ * Create a new IntegerKey for the given integer value that is associated with the specified integer.
+ *
+ * @param val the integer key value
+ * @param affinity the associated integer
+ */
+ IntegerKey(int val, int affinity) {
+ this.val = val;
+ this.affinity = affinity;
+ }
+
+ /**
+ * Return the integer key value.
+ *
+ * @return the integer key value
+ */
+ public int getKey() {
+ return this.val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return this.val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (o == this)
+ return true;
+ if (o == null)
+ return false;
+
+ if (IntegerKey.class.equals(o.getClass())) {
+ IntegerKey that = (IntegerKey) o;
+ return this.val == that.val;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ int val = this.val;
+ Integer affinity = this.affinity;
+
+ if (val == affinity)
+ return String.valueOf(val);
+
+ return "IntKey [val=" + val + ", aff=" + affinity + ']';
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareTo(final IntegerKey that) {
+ int i = this.affinity.compareTo(that.affinity);
+
+ if (i == 0)
+ i = Integer.compare(this.getKey(), that.getKey());
+
+ return i;
+ }
+ }
+
+ /**
+ * Local listener wrapper that brings a delegate listener up to date with the latest events.
+ */
+ private static class IgniteLocalSyncListener implements IgnitePredicate<Event> {
+ /** */
+ private final IgnitePredicate<Event> delegate;
+
+ /** */
+ private final int[] causes;
+
+ /** */
+ private volatile boolean isSynced;
+
+ /** */
+ private volatile long syncedId = Long.MIN_VALUE;
+
+ /**
+ * @param delegate Event listener.
+ * @param causes Event types to listen.
+ */
+ IgniteLocalSyncListener(IgnitePredicate<Event> delegate, int... causes) {
+ this.delegate = delegate;
+ this.causes = causes;
+ }
+
+ /**
+ * @return Local ignite.
+ */
+ protected Ignite ignite() {
+ return Ignition.localIgnite();
+ }
+
+ /**
+ *
+ */
+ public void register() {
+ ignite().events().localListen(this.delegate, this.causes);
+
+ sync();
+ }
+
+ /**
+ *
+ */
+ public void sync() {
+ if (!this.isSynced) {
+ synchronized (this) {
+ if (!this.isSynced) {
+ Collection<Event> evts = ignite().events().localQuery(new IgnitePredicate<Event>() {
+ @Override public boolean apply(final Event evt) {
+ return true;
+ }
+ },
+ this.causes);
+
+ for (Event event : evts) {
+ // Events returned from localQuery() are ordered by increasing local ID. Update the sync ID
+ // within a finally block to avoid applying duplicate events if the delegate listener
+ // throws an exception while processing the event.
+ try {
+ applyInternal(event);
+ }
+ finally {
+ this.syncedId = event.localOrder();
+ }
+ }
+
+ this.isSynced = true;
+
+ notifyAll();
+ }
+ }
+ }
+ }
+
+ /**
+ * @return Synced flag.
+ */
+ boolean isSynced() {
+ return isSynced;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(Event evt) {
+ sync();
+
+ return applyInternal(evt);
+ }
+
+ /**
+ * @param evt Event.
+ * @return See {@link IgniteEvents#localListen}.
+ */
+ boolean applyInternal(Event evt) {
+ // Avoid applying previously recorded events.
+ if (evt.localOrder() > this.syncedId) {
+ try {
+ return this.delegate.apply(evt);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+
+ return false;
+ }
+ }
+
+ return true;
+ }
+ }
+
+ /**
+ * Service interface for server side partition observation.
+ */
+ interface PartitionObserver {
+ /**
+ * @return map of partitions to the keys belonging to that partition
+ */
+ ConcurrentMap<Integer, Set<IntegerKey>> getPartitionMap();
+
+ /**
+ * @return Map of partitions that are in the process of loading and the current keys that belong to that partition.
+ * Currently it seems that an EntryProcessor is not guaranteed to have a "stable" view of a partition and
+ * can see entries as they are being loaded into the partition, so we must batch these events up in the map
+ * and update the {@link #getPartitionMap() partition map} atomically once the partition has been fully loaded.
+ */
+ ConcurrentMap<Integer, Set<IntegerKey>> getLoadingMap();
+
+ /**
+ * Ensure that the {@link #getPartitionMap() partition map} has a set of keys associated with the given
+ * partition, creating one if it doesn't already exist.
+ * @param part the partition
+ * @return the set for the given partition
+ */
+ Set<IntegerKey> ensureKeySet(int part);
+
+ /**
+ * @return listener wrapper that brings a delegate listener up to date with the latest events
+ */
+ IgniteLocalSyncListener getIgniteLocalSyncListener();
+ }
+
+ /**
+ *
+ */
+ private static class PartitionObserverService implements Service, PartitionObserver, Serializable {
+ /** */
+ private final ConcurrentMap<Integer, Set<IntegerKey>> partMap = new ConcurrentHashMap<>();
+
+ /** */
+ private final ConcurrentMap<Integer, Set<IntegerKey>> loadingMap = new ConcurrentHashMap<>();
+
+ /** */
+ private final IgnitePredicate<Event> pred = (IgnitePredicate<Event>) new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ // Handle:
+ // EVT_CACHE_OBJECT_PUT
+ // EVT_CACHE_REBALANCE_OBJECT_LOADED
+ // EVT_CACHE_OBJECT_REMOVED
+ // EVT_CACHE_REBALANCE_OBJECT_UNLOADED
+ if (evt instanceof CacheEvent) {
+ CacheEvent cacheEvt = (CacheEvent) evt;
+ int part = cacheEvt.partition();
+
+ // Oonly handle events for the test cache.
+ if (TEST_CACHE_NAME.equals(cacheEvt.cacheName())) {
+ switch (evt.type()) {
+ case EventType.EVT_CACHE_OBJECT_PUT: {
+ ensureKeySet(part).add(ensureKey(cacheEvt.key()));
+ break;
+ }
+
+ case EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED: {
+ // Batch up objects that are being loaded.
+ ensureKeySet(part, loadingMap).add(ensureKey(cacheEvt.key()));
+ break;
+ }
+
+ case EventType.EVT_CACHE_OBJECT_REMOVED:
+ case EventType.EVT_CACHE_REBALANCE_OBJECT_UNLOADED: {
+ ensureKeySet(part).remove(ensureKey(cacheEvt.key()));
+ break;
+ }
+ }
+ }
+ }
+ // Handle:
+ // EVT_CACHE_REBALANCE_PART_LOADED
+ // EVT_CACHE_REBALANCE_PART_UNLOADED
+ // EVT_CACHE_REBALANCE_PART_DATA_LOST
+ else if (evt instanceof CacheRebalancingEvent) {
+ CacheRebalancingEvent rebalancingEvt = (CacheRebalancingEvent) evt;
+
+ int part = rebalancingEvt.partition();
+
+ // Only handle events for the test cache.
+ if (TEST_CACHE_NAME.equals(rebalancingEvt.cacheName())) {
+ switch (evt.type()) {
+ case EventType.EVT_CACHE_REBALANCE_PART_UNLOADED: {
+ Set<IntegerKey> keys = partMap.get(part);
+
+ if (keys != null && !keys.isEmpty())
+ X.println("!!! Attempting to unload non-empty partition: " + part + "; keys=" + keys);
+
+ partMap.remove(part);
+
+ X.println("*** Unloaded partition: " + part);
+
+ break;
+ }
+
+ case EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST: {
+ partMap.remove(part);
+
+ X.println("*** Lost partition: " + part);
+
+ break;
+ }
+
+ case EventType.EVT_CACHE_REBALANCE_PART_LOADED: {
+ // Atomically update the key count for the new partition.
+ Set<IntegerKey> keys = loadingMap.get(part);
+ partMap.put(part, keys);
+ loadingMap.remove(part);
+
+ X.println("*** Loaded partition: " + part + "; keys=" + keys);
+
+ break;
+ }
+ }
+ }
+ }
+
+ return true;
+ }
+ };
+
+ /** */
+ private final IgniteLocalSyncListener lsnr = new IgniteLocalSyncListener(pred,
+ EventType.EVT_CACHE_OBJECT_PUT,
+ EventType.EVT_CACHE_OBJECT_REMOVED,
+ EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED,
+ EventType.EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
+ EventType.EVT_CACHE_REBALANCE_PART_LOADED,
+ EventType.EVT_CACHE_REBALANCE_PART_UNLOADED,
+ EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
+
+ /** {@inheritDoc} */
+ @Override public ConcurrentMap<Integer, Set<IntegerKey>> getPartitionMap() {
+ return partMap;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ConcurrentMap<Integer, Set<IntegerKey>> getLoadingMap() {
+ return loadingMap;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteLocalSyncListener getIgniteLocalSyncListener() {
+ return lsnr;
+ }
+
+ /**
+ * Ensure that the static partition map has a set of keys associated with the given partition,
+ * creating one if it doesn't already exist.
+ *
+ * @param part the partition
+ * @return the set for the given partition
+ */
+ public Set<IntegerKey> ensureKeySet(final int part) {
+ return ensureKeySet(part, partMap);
+ }
+
+ /**
+ * Ensure that the given partition map has a set of keys associated with the given partition, creating one if it
+ * doesn't already exist.
+ *
+ * @param part the partition
+ * @param map the partition map
+ *
+ * @return the set for the given partition
+ */
+ Set<IntegerKey> ensureKeySet(final int part, final ConcurrentMap<Integer, Set<IntegerKey>> map) {
+ Set<IntegerKey> keys = map.get(part);
+
+ if (keys == null) {
+ map.putIfAbsent(part, new CopyOnWriteArraySet<IntegerKey>());
+
+ keys = map.get(part);
+ }
+
+ return keys;
+ }
+
+
+ /** {@inheritDoc} */
+ @Override public void cancel(final ServiceContext ctx) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void init(final ServiceContext ctx) throws Exception {
+ this.lsnr.register();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void execute(final ServiceContext ctx) throws Exception {
+ // No-op.
+ }
+ }
+
+ /**
+ * Runnable that starts {@value #SERVER_COUNT} servers. This runnable starts
+ * servers every {@value #START_DELAY} milliseconds. The staggered start is intended
+ * to allow partitions to move every time a new server is started.
+ */
+ private class ServerStarter implements Runnable {
+ /** */
+ static final int SERVER_COUNT = 10;
+
+ /** */
+ static final int START_DELAY = 2000;
+
+ /** */
+ private volatile boolean done;
+
+ /** */
+ private final CountDownLatch started = new CountDownLatch(1);
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try {
+ for (int i = 1; i <= SERVER_COUNT; i++) {
+ startGrid(i);
+
+ Thread.sleep(START_DELAY);
+
+ awaitPartitionMapExchange();
+
+ started.countDown();
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+
+ X.println("Shutting down server starter thread");
+ }
+ finally {
+ done = true;
+ }
+ }
+
+ /**
+ * Blocks the executing thread until at least one server has started.
+ *
+ * @throws InterruptedException If interrupted.
+ */
+ void waitForServerStart() throws InterruptedException {
+ started.await(getTestTimeout(), TimeUnit.MILLISECONDS);
+ }
+
+ /** @return true if {@value #SERVER_COUNT} servers have started. */
+ public boolean isDone() {
+ return done;
+ }
+ }
+}