You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/12 07:13:52 UTC
[4/8] ignite git commit: ignite-4932 When possible for cache 'get'
read directly from offheap without entry creation.
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index e1d4484..56041ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -384,7 +385,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
UUID subjId = ctx.subjectIdPerCall(null, opCtx);
- Map<K, V> vals = new HashMap<>(keys.size(), 1.0f);
+ Map<K, V> vals = U.newHashMap(keys.size());
if (keyCheck)
validateCacheKeys(keys);
@@ -392,97 +393,142 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
final IgniteCacheExpiryPolicy expiry = expiryPolicy(opCtx != null ? opCtx.expiry() : null);
boolean success = true;
+ boolean readNoEntry = ctx.readNoEntry(expiry, false);
+ final boolean evt = !skipVals;
for (K key : keys) {
if (key == null)
throw new NullPointerException("Null key.");
- GridCacheEntryEx entry = null;
-
KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
- while (true) {
- try {
- entry = entryEx(cacheKey);
+ boolean skipEntry = readNoEntry;
- if (entry != null) {
- CacheObject v;
+ if (readNoEntry) {
+ CacheDataRow row = ctx.offheap().read(cacheKey);
- if (needVer) {
- EntryGetResult res = entry.innerGetVersioned(
- null,
- null,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiry,
- !deserializeBinary,
- null);
-
- if (res != null) {
- ctx.addResult(
- vals,
- cacheKey,
- res,
- skipVals,
- false,
- deserializeBinary,
- true,
- needVer);
- }
- else
- success = false;
- }
- else {
- v = entry.innerGet(
- null,
+ if (row != null) {
+ long expireTime = row.expireTime();
+
+ if (expireTime == 0 || expireTime > U.currentTimeMillis()) {
+ ctx.addResult(vals,
+ cacheKey,
+ row.value(),
+ skipVals,
+ false,
+ deserializeBinary,
+ true,
+ null,
+ row.version(),
+ 0,
+ 0,
+ needVer);
+
+ if (configuration().isStatisticsEnabled() && !skipVals)
+ metrics0().onRead(true);
+
+ if (evt) {
+ ctx.events().readEvent(cacheKey,
null,
- /*read-through*/false,
- /**update-metrics*/true,
- /**event*/!skipVals,
+ row.value(),
subjId,
- null,
taskName,
- expiry,
!deserializeBinary);
+ }
+ }
+ else
+ skipEntry = false;
+ }
+ else
+ success = false;
+ }
- if (v != null) {
- ctx.addResult(vals,
- cacheKey,
- v,
- skipVals,
- false,
- deserializeBinary,
- true,
+ if (!skipEntry) {
+ GridCacheEntryEx entry = null;
+
+ while (true) {
+ try {
+ entry = entryEx(cacheKey);
+
+ if (entry != null) {
+ CacheObject v;
+
+ if (needVer) {
+ EntryGetResult res = entry.innerGetVersioned(
null,
- 0,
- 0);
+ null,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ !deserializeBinary,
+ null);
+
+ if (res != null) {
+ ctx.addResult(
+ vals,
+ cacheKey,
+ res,
+ skipVals,
+ false,
+ deserializeBinary,
+ true,
+ needVer);
+ }
+ else
+ success = false;
+ }
+ else {
+ v = entry.innerGet(
+ null,
+ null,
+ /*read-through*/false,
+ /*update-metrics*/true,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ !deserializeBinary);
+
+ if (v != null) {
+ ctx.addResult(vals,
+ cacheKey,
+ v,
+ skipVals,
+ false,
+ deserializeBinary,
+ true,
+ null,
+ 0,
+ 0);
+ }
+ else
+ success = false;
}
- else
- success = false;
}
- }
- else {
- if (!storeEnabled && configuration().isStatisticsEnabled() && !skipVals)
- metrics0().onRead(false);
+ else {
+ if (!storeEnabled && configuration().isStatisticsEnabled() && !skipVals)
+ metrics0().onRead(false);
- success = false;
+ success = false;
+ }
+
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ // No-op, retry.
+ }
+ finally {
+ if (entry != null)
+ ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion());
}
- break; // While.
- }
- catch (GridCacheEntryRemovedException ignored) {
- // No-op, retry.
+ if (!success && storeEnabled)
+ break;
}
- finally {
- if (entry != null)
- ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion());
- }
-
- if (!success && storeEnabled)
- break;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 663040d..5961b8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -4099,6 +4099,8 @@ class ServerImpl extends TcpDiscoveryImpl {
DiscoveryDataPacket dataPacket = msg.gridDiscoveryData();
+ assert dataPacket != null : msg;
+
if (dataPacket.hasJoiningNodeData())
spi.onExchange(dataPacket, U.resolveClassLoader(spi.ignite().configuration()));
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
new file mode 100644
index 0000000..9250e0b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ModifiedExpiryPolicy;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+@SuppressWarnings("unchecked")
+public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static volatile CountDownLatch processorStartLatch;
+
+ /** */
+ private static volatile CountDownLatch hangLatch;
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrid(0);
+
+ client = true;
+
+ startGrid(1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicGet() throws Exception {
+ getTest(ATOMIC);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxGet() throws Exception {
+ getTest(TRANSACTIONAL);
+ }
+
+ /**
+ * @param atomicityMode Cache atomicity mode.
+ * @throws Exception If failed.
+ */
+ private void getTest(CacheAtomicityMode atomicityMode) throws Exception {
+ boolean getAll[] = {true, false};
+ boolean cfgExpiryPlc[] = {false};
+ boolean withExpiryPlc[] = {false};
+ boolean heapCache[] = {false};
+
+ for (boolean getAll0 : getAll) {
+ for (boolean expiryPlc0 : cfgExpiryPlc) {
+ for (boolean withExpiryPlc0 : withExpiryPlc) {
+ for (boolean heapCache0 : heapCache)
+ doGet(atomicityMode, heapCache0, getAll0, expiryPlc0, withExpiryPlc0);
+ }
+ }
+ }
+ }
+
+ /**
+ * @param atomicityMode Cache atomicity mode.
+ * @param heapCache Heap cache flag.
+ * @param getAll Test getAll flag.
+ * @param cfgExpiryPlc Configured expiry policy flag.
+ * @param withExpiryPlc Custom expiry policy flag.
+ * @throws Exception If failed.
+ */
+ private void doGet(CacheAtomicityMode atomicityMode,
+ boolean heapCache,
+ final boolean getAll,
+ final boolean cfgExpiryPlc,
+ final boolean withExpiryPlc) throws Exception {
+ log.info("Test get [getAll=" + getAll + ", cfgExpiryPlc=" + cfgExpiryPlc + ']');
+
+ Ignite srv = ignite(0);
+
+ Ignite client = ignite(1);
+
+ final IgniteCache cache = client.createCache(cacheConfiguration(atomicityMode, heapCache, cfgExpiryPlc));
+
+ final Map<Object, Object> data = new HashMap<>();
+
+ data.put(1, 1);
+ data.put(2, 2);
+
+ try {
+ // Get from compute closure.
+ {
+ cache.putAll(data);
+
+ hangLatch = new CountDownLatch(1);
+ processorStartLatch = new CountDownLatch(1);
+
+ IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ if (getAll)
+ cache.invokeAll(data.keySet(), new HangEntryProcessor());
+ else
+ cache.invoke(1, new HangEntryProcessor());
+
+ return null;
+ }
+ });
+
+ try {
+ boolean wait = processorStartLatch.await(30, TimeUnit.SECONDS);
+
+ assertTrue(wait);
+
+ if (getAll) {
+ assertEquals(data, client.compute().affinityCall(cache.getName(), 1,
+ new GetAllClosure(data.keySet(), cache.getName(), withExpiryPlc)));
+ }
+ else {
+ assertEquals(1, client.compute().affinityCall(cache.getName(), 1,
+ new GetClosure(1, cache.getName(), withExpiryPlc)));
+ }
+
+ hangLatch.countDown();
+
+ fut.get();
+ }
+ finally {
+ hangLatch.countDown();
+ }
+ }
+
+ // Local get.
+ {
+ cache.putAll(data);
+
+ hangLatch = new CountDownLatch(1);
+ processorStartLatch = new CountDownLatch(1);
+
+ IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ if (getAll)
+ cache.invokeAll(data.keySet(), new HangEntryProcessor());
+ else
+ cache.invoke(1, new HangEntryProcessor());
+
+ return null;
+ }
+ });
+
+ try {
+ boolean wait = processorStartLatch.await(30, TimeUnit.SECONDS);
+
+ assertTrue(wait);
+
+ IgniteCache srvCache = srv.cache(cache.getName());
+
+ if (withExpiryPlc)
+ srvCache = srvCache.withExpiryPolicy(ModifiedExpiryPolicy.factoryOf(Duration.FIVE_MINUTES).create());
+
+ if (getAll) {
+ assertEquals(data, srvCache.getAll(data.keySet()));
+ assertEquals(data.size(), srvCache.getEntries(data.keySet()).size());
+ }
+ else {
+ assertEquals(1, srvCache.get(1));
+ assertEquals(1, srvCache.getEntry(1).getValue());
+ }
+
+ hangLatch.countDown();
+
+ fut.get();
+ }
+ finally {
+ hangLatch.countDown();
+ }
+ }
+ }
+ finally {
+ client.destroyCache(cache.getName());
+ }
+ }
+
+ /**
+ * @param atomicityMode Atomicity mode.
+ * @param heapCache Heap cache flag.
+ * @param expiryPlc Expiry policy flag.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration(CacheAtomicityMode atomicityMode,
+ boolean heapCache,
+ boolean expiryPlc) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setOnheapCacheEnabled(heapCache);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setName("testCache");
+
+ if (expiryPlc)
+ ccfg.setExpiryPolicyFactory(ModifiedExpiryPolicy.factoryOf(Duration.FIVE_MINUTES));
+
+ return ccfg;
+ }
+
+ /**
+ *
+ */
+ static class HangEntryProcessor implements CacheEntryProcessor {
+ /** {@inheritDoc} */
+ @Override public Object process(MutableEntry entry, Object... arguments) {
+ assert processorStartLatch != null;
+ assert hangLatch != null;
+
+ try {
+ processorStartLatch.countDown();
+
+ if (!hangLatch.await(60, TimeUnit.SECONDS))
+ throw new RuntimeException("Failed to wait for latch");
+ }
+ catch (Exception e) {
+ System.out.println("Unexpected error: " + e);
+
+ throw new EntryProcessorException(e);
+ }
+
+ entry.setValue(U.currentTimeMillis());
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ public static class GetClosure implements IgniteCallable<Object> {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** */
+ private final int key;
+
+ /** */
+ private final String cacheName;
+
+ /** */
+ private final boolean withExpiryPlc;
+
+ /**
+ * @param key Key.
+ * @param cacheName Cache name.
+ * @param withExpiryPlc Custom expiry policy flag.
+ */
+ GetClosure(int key, String cacheName, boolean withExpiryPlc) {
+ this.key = key;
+ this.cacheName = cacheName;
+ this.withExpiryPlc = withExpiryPlc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object call() throws Exception {
+ IgniteCache cache = ignite.cache(cacheName);
+
+ if (withExpiryPlc)
+ cache = cache.withExpiryPolicy(ModifiedExpiryPolicy.factoryOf(Duration.FIVE_MINUTES).create());
+
+ Object val = cache.get(key);
+
+ CacheEntry e = cache.getEntry(key);
+
+ assertEquals(val, e.getValue());
+
+ return val;
+ }
+ }
+
+ /**
+ *
+ */
+ public static class GetAllClosure implements IgniteCallable<Object> {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** */
+ private final Set<Object> keys;
+
+ /** */
+ private final String cacheName;
+
+ /** */
+ private final boolean withExpiryPlc;
+
+ /**
+ * @param keys Keys.
+ * @param cacheName Cache name.
+ * @param withExpiryPlc Custom expiry policy flag.
+ */
+ GetAllClosure(Set<Object> keys, String cacheName, boolean withExpiryPlc) {
+ this.keys = keys;
+ this.cacheName = cacheName;
+ this.withExpiryPlc = withExpiryPlc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object call() throws Exception {
+ IgniteCache cache = ignite.cache(cacheName);
+
+ if (withExpiryPlc)
+ cache = cache.withExpiryPolicy(ModifiedExpiryPolicy.factoryOf(Duration.FIVE_MINUTES).create());
+
+ Map vals = cache.getAll(keys);
+
+ Collection<CacheEntry> entries = cache.getEntries(keys);
+
+ assertEquals(vals.size(), entries.size());
+
+ for (CacheEntry entry : entries) {
+ Object val = vals.get(entry.getKey());
+
+ assertEquals(val, entry.getValue());
+ }
+
+ return vals;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index 3ff1bff..2b79367 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -1009,7 +1009,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
if (cacheMode() != PARTITIONED)
return;
- factory = CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 1));
+ factory = CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 2));
nearCache = true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java
index 5c12f84..7d4f90e 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java
@@ -79,10 +79,8 @@ public class GridHashMapLoadTest extends GridCommonAbstractTest {
while (true) {
Integer key = i++;
- Integer val = i++;
- map.put(key, new GridCacheMapEntry(ctx, ctx.toCacheKeyObject(key),
- key.hashCode(), ctx.toCacheObject(val)) {
+ map.put(key, new GridCacheMapEntry(ctx, ctx.toCacheKeyObject(key)) {
@Override public boolean tmLock(IgniteInternalTx tx,
long timeout,
@Nullable GridCacheVersion serOrder,
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 04a3753..943c5f5 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheVariableTopologySelf
import org.apache.ignite.internal.processors.cache.IgniteAtomicCacheEntryProcessorNodeJoinTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheEntryProcessorNodeJoinTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheIncrementTxTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheNoSyncForGetTest;
import org.apache.ignite.internal.processors.cache.IgniteCachePartitionMapUpdateTest;
import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheAndNodeStop;
import org.apache.ignite.internal.processors.cache.IgniteOnePhaseCommitInvokeTest;
@@ -267,6 +268,8 @@ public class IgniteCacheTestSuite2 extends TestSuite {
suite.addTest(new TestSuite(IgniteOnePhaseCommitInvokeTest.class));
+ suite.addTest(new TestSuite(IgniteCacheNoSyncForGetTest.class));
+
return suite;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetFromComputeBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetFromComputeBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetFromComputeBenchmark.java
new file mode 100644
index 0000000..83fe665
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetFromComputeBenchmark.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.yardstick.cache.model.SampleValue;
+import org.yardstickframework.BenchmarkConfiguration;
+
+import static org.yardstickframework.BenchmarkUtils.println;
+
+/**
+ * Benchmark created to verify that slow EntryProcessor does not affect 'get' performance.
+ */
+public class IgniteGetFromComputeBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> {
+ /** */
+ private static final String CACHE_NAME = "atomic";
+
+ /** */
+ private IgniteCompute compute;
+
+ /** */
+ private IgniteCache asyncCache;
+
+ /** */
+ private ThreadLocal<IgniteFuture> invokeFut = new ThreadLocal<>();
+
+ /** {@inheritDoc} */
+ @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+ super.setUp(cfg);
+
+ if (args.preloadAmount() > args.range())
+ throw new IllegalArgumentException("Preloading amount (\"-pa\", \"--preloadAmount\") " +
+ "must by less then the range (\"-r\", \"--range\").");
+
+ String cacheName = cache().getName();
+
+ println(cfg, "Loading data for cache: " + cacheName);
+
+ long start = System.nanoTime();
+
+ try (IgniteDataStreamer<Object, Object> dataLdr = ignite().dataStreamer(cacheName)) {
+ for (int i = 0; i < args.preloadAmount(); i++) {
+ dataLdr.addData(i, new SampleValue(i));
+
+ if (i % 100000 == 0) {
+ if (Thread.currentThread().isInterrupted())
+ break;
+
+ println("Loaded entries: " + i);
+ }
+ }
+ }
+
+ println(cfg, "Finished populating data [time=" + ((System.nanoTime() - start) / 1_000_000) + "ms, " +
+ "amount=" + args.preloadAmount() + ']');
+
+ compute = ignite().compute();
+
+ asyncCache = cache().withAsync();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+ IgniteFuture fut = invokeFut.get();
+
+ if (fut == null || fut.isDone()) {
+ Set<Integer> keys = new TreeSet<>();
+
+ for (int i = 0; i < 3; i++)
+ keys.add(nextRandom(args.range()));
+
+ asyncCache.invokeAll(keys, new SlowEntryProcessor(0));
+
+ invokeFut.set(asyncCache.future());
+ }
+
+ int key = nextRandom(args.range());
+
+ compute.affinityCall(CACHE_NAME, key, new GetClosure(key));
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteCache<Integer, Object> cache() {
+ return ignite().cache(CACHE_NAME);
+ }
+
+ /**
+ *
+ */
+ public static class GetClosure implements IgniteCallable<Object> {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** */
+ private final int key;
+
+ /**
+ * @param key Key.
+ */
+ public GetClosure(int key) {
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object call() throws Exception {
+ return ignite.cache(CACHE_NAME).get(key);
+ }
+ }
+
+ /**
+ *
+ */
+ public static class SlowEntryProcessor implements CacheEntryProcessor<Integer, Object, Object> {
+ /** */
+ private Object val;
+
+ /**
+ * @param val Value.
+ */
+ public SlowEntryProcessor(Object val) {
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object process(MutableEntry<Integer, Object> entry, Object... args) {
+ try {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException ignore) {
+ // No-op.
+ }
+
+ entry.setValue(val);
+
+ return null;
+ }
+ }
+}