You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ignite.apache.org by "Avihai Berkovitz (JIRA)" <ji...@apache.org> on 2015/12/13 10:33:46 UTC

[jira] [Created] (IGNITE-2145) Async cache operations on primary node might block

Avihai Berkovitz created IGNITE-2145:
----------------------------------------

             Summary: Async cache operations on primary node might block
                 Key: IGNITE-2145
                 URL: https://issues.apache.org/jira/browse/IGNITE-2145
             Project: Ignite
          Issue Type: Bug
    Affects Versions: ignite-1.4
         Environment: Windows 8.1 64 bit
java version "1.8.0_51"
Java(TM) SE Runtime Environment (build 1.8.0_51-b16)
Java HotSpot(TM) 64-Bit Server VM (build 25.51-b03, mixed mode)
Ignite 1.4.0
            Reporter: Avihai Berkovitz


When executing async operations on caches the initial function call is supposed to return immediately, without regard to cache state, network traffic, etc. If we reference a cache key on a different node it works. But if the primary node for a cache key is the same node that initiate the operation everything is executed synchronously.
This is probably an optimization, but can have side effects. For example, if a long running CacheProcessor is invoked on this key, every operation will block.

Here is some code to illustrate the problem. Notice that there is a 4 seconds delay between "Before async op" and "After async op". If you reverse the isPrimary() condition it won't happen.

{code}
IgniteConfiguration igniteConfiguration = new IgniteConfiguration()
		.setFailoverSpi(new AlwaysFailoverSpi())
		.setGridLogger(new Slf4jLogger())
		.setPeerClassLoadingEnabled(false)
		.setDeploymentMode(DeploymentMode.CONTINUOUS);
Ignite ignite1 = Ignition.start(igniteConfiguration);
Ignite ignite2 = Ignition.start(igniteConfiguration.setGridName("2"));
assert ignite1.cluster().nodes().size() == 2;
assert ignite2.cluster().nodes().size() == 2;

CacheConfiguration<String, Integer> cacheConfiguration = new CacheConfiguration<String, Integer>()
		.setName("test")
		.setCacheMode(CacheMode.PARTITIONED)
		.setAtomicityMode(CacheAtomicityMode.ATOMIC)
		.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
IgniteCache<String, Integer> cache1 = ignite1.getOrCreateCache(cacheConfiguration);
IgniteCache<String, Integer> cache2 = ignite2.getOrCreateCache(cacheConfiguration);

// Find key mapped to the first node
String key = IntStream.range(0, 100)
		.mapToObj(value -> "HI" + value)
		.filter(s -> ignite1.affinity(cacheConfiguration.getName()).isPrimary(ignite1.cluster().localNode(), s))
		.findFirst().get();

// Run a blocked processor from node 2
Thread thread = new Thread() {
	@Override
	public void run() {
		System.out.println("Invoking blocked processor");

		cache2.invoke(key, (entry, arguments) -> {
			System.out.println("IN INVOKE");
			try {
				Thread.sleep(5 * 1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			return null;
		});

		System.out.println("Blocked invoke returned");
	}
};

thread.start();
Thread.sleep(1000);

IgniteCache<String, Integer> async1 = cache1.withAsync();

System.out.println("Before async op");
//async1.put(key, 1);
//async1.get(key);
async1.containsKey(key);
System.out.println("After async op");
assert async1.future().isDone();

thread.join();
ignite2.close();
ignite1.close();
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)