You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/07 08:07:22 UTC
[10/18] ignite git commit: Fixed "IGNITE-5390 But in
IgniteCacheTxStoreSessionWriteBehindCoalescingTest"
Fixed "IGNITE-5390 But in IgniteCacheTxStoreSessionWriteBehindCoalescingTest"
Signed-off-by: nikolay_tikhonov <nt...@gridgain.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d8a50e47
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d8a50e47
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d8a50e47
Branch: refs/heads/ignite-gg-12306-1
Commit: d8a50e47a51718c9ef202375b324695b78225813
Parents: 1cf402f
Author: Alexander Belyak <al...@xored.com>
Authored: Thu Jul 6 14:28:22 2017 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Thu Jul 6 14:32:27 2017 +0300
----------------------------------------------------------------------
.../processors/cache/ClusterCachesInfo.java | 4 +
.../processors/cache/GridCacheAttributes.java | 7 ++
.../store/GridCacheStoreManagerAdapter.java | 1 +
.../cache/IgniteCacheAbstractTest.java | 17 ++++
...acheStoreSessionWriteBehindAbstractTest.java | 62 +++++++++-----
...TxStoreSessionWriteBehindCoalescingTest.java | 88 ++++++++++++++++++++
...ClientWriteBehindStoreNonCoalescingTest.java | 30 ++++---
.../testsuites/IgniteCacheTestSuite4.java | 2 +
8 files changed, 180 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d8a50e47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 5452bd2..5aca8c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -263,6 +263,10 @@ class ClusterCachesInfo {
"Write behind batch size", locAttr.writeBehindBatchSize(), rmtAttr.writeBehindBatchSize(),
false);
+ CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindCoalescing",
+ "Write behind coalescing", locAttr.writeBehindCoalescing(), rmtAttr.writeBehindCoalescing(),
+ false);
+
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindEnabled",
"Write behind enabled", locAttr.writeBehindEnabled(), rmtAttr.writeBehindEnabled(), false);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d8a50e47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java
index dca4286..32871ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java
@@ -266,6 +266,13 @@ public class GridCacheAttributes implements Serializable {
}
/**
+ * @return Write coalescing flag.
+ */
+ public boolean writeBehindCoalescing() {
+ return ccfg.getWriteBehindCoalescing();
+ }
+
+ /**
* @return Interceptor class name.
*/
public String interceptorClassName() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/d8a50e47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index 8ff2f5a..c02e2c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -185,6 +185,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
store.setFlushThreadCount(cfg.getWriteBehindFlushThreadCount());
store.setFlushFrequency(cfg.getWriteBehindFlushFrequency());
store.setBatchSize(cfg.getWriteBehindBatchSize());
+ store.setWriteCoalescing(cfg.getWriteBehindCoalescing());
return store;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d8a50e47/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
index c5cb715..34a811b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
@@ -144,6 +144,9 @@ public abstract class IgniteCacheAbstractTest extends GridCommonAbstractTest {
cfg.setReadThrough(true);
cfg.setWriteThrough(true);
cfg.setLoadPreviousValue(true);
+
+ cfg.setWriteBehindEnabled(writeBehindEnabled());
+ cfg.setWriteBehindCoalescing(writeBehindCoalescing());
}
if (cacheMode() == PARTITIONED)
@@ -162,6 +165,20 @@ public abstract class IgniteCacheAbstractTest extends GridCommonAbstractTest {
}
/**
+ * @return write behind enabled flag.
+ */
+ protected boolean writeBehindEnabled() {
+ return false;
+ }
+
+ /**
+ * @return write behind coalescing flag.
+ */
+ protected boolean writeBehindCoalescing() {
+ return true;
+ }
+
+ /**
* @return Cache loader factory.
*/
protected Factory<? extends CacheLoader> loaderFactory() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/d8a50e47/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java
index dcbb63f..7ad240d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java
@@ -49,6 +49,9 @@ public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign
private static volatile CountDownLatch latch;
/** */
+ protected static volatile CountDownLatch entLatch;
+
+ /** */
private static volatile ExpectedData expData;
/** {@inheritDoc} */
@@ -66,36 +69,42 @@ public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign
return null;
}
- /** {@inheritDoc} */
+ /**
+ * @param igniteInstanceName Ignite instance name.
+ * @return Cache configuration.
+ * @throws Exception In case of error.
+ */
@SuppressWarnings("unchecked")
- @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+ protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception {
+ CacheConfiguration ccfg0 = super.cacheConfiguration(igniteInstanceName);
- assert cfg.getCacheConfiguration().length == 1;
-
- CacheConfiguration ccfg0 = cfg.getCacheConfiguration()[0];
ccfg0.setReadThrough(true);
ccfg0.setWriteThrough(true);
ccfg0.setWriteBehindBatchSize(10);
ccfg0.setWriteBehindFlushSize(10);
- ccfg0.setWriteBehindFlushFrequency(60_000);
+ ccfg0.setWriteBehindFlushFrequency(600);
ccfg0.setWriteBehindEnabled(true);
ccfg0.setCacheStoreFactory(singletonFactory(new TestStore()));
- CacheConfiguration ccfg1 = cacheConfiguration(igniteInstanceName);
+ return ccfg0;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
- ccfg1.setReadThrough(true);
- ccfg1.setWriteThrough(true);
- ccfg1.setWriteBehindBatchSize(10);
- ccfg1.setWriteBehindFlushSize(10);
- ccfg1.setWriteBehindFlushFrequency(60_000);
- ccfg1.setWriteBehindEnabled(true);
+ assert cfg.getCacheConfiguration().length == 1;
- ccfg1.setName(CACHE_NAME1);
+ CacheConfiguration ccfg0 = cacheConfiguration(igniteInstanceName);
+
+ ccfg0.setName(DEFAULT_CACHE_NAME);
+
+ CacheConfiguration ccfg1 = cacheConfiguration(igniteInstanceName);
- ccfg1.setCacheStoreFactory(singletonFactory(new TestStore()));
+ ccfg1.setName(CACHE_NAME1);
cfg.setCacheConfiguration(ccfg0, ccfg1);
@@ -120,6 +129,7 @@ public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign
try {
latch = new CountDownLatch(2);
+ entLatch = new CountDownLatch(11);
expData = new ExpectedData("writeAll", cacheName);
@@ -127,13 +137,17 @@ public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign
cache.put(i, i);
assertTrue(latch.await(10_000, TimeUnit.MILLISECONDS));
+
+ assertTrue(entLatch.await(10_000,TimeUnit.MILLISECONDS));
}
finally {
latch = null;
+ entLatch = null;
}
try {
latch = new CountDownLatch(2);
+ entLatch = new CountDownLatch(11);
expData = new ExpectedData("deleteAll", cacheName);
@@ -141,16 +155,20 @@ public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign
cache.remove(i);
assertTrue(latch.await(10_000, TimeUnit.MILLISECONDS));
+
+ assertTrue(entLatch.await(10_000,TimeUnit.MILLISECONDS));
}
finally {
latch = null;
+ entLatch = null;
}
}
/**
*
*/
- private class TestStore implements CacheStore<Object, Object> {
+ protected class TestStore implements CacheStore<Object, Object> {
+
/** Auto-injected store session. */
@CacheStoreSessionResource
private CacheStoreSession ses;
@@ -191,10 +209,13 @@ public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign
/** {@inheritDoc} */
@Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) throws CacheWriterException {
log.info("writeAll: " + entries);
-
+
assertTrue("Unexpected entries: " + entries, entries.size() == 10 || entries.size() == 1);
checkSession("writeAll");
+
+ for (int i = 0; i < entries.size(); i++)
+ entLatch.countDown();
}
/** {@inheritDoc} */
@@ -209,6 +230,9 @@ public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign
assertTrue("Unexpected keys: " + keys, keys.size() == 10 || keys.size() == 1);
checkSession("deleteAll");
+
+ for (int i = 0; i < keys.size(); i++)
+ entLatch.countDown();
}
/**
@@ -221,7 +245,7 @@ public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign
/**
* @param mtd Called stored method.
*/
- private void checkSession(String mtd) {
+ protected void checkSession(String mtd) {
assertNotNull(ignite);
CacheStoreSession ses = session();
http://git-wip-us.apache.org/repos/asf/ignite/blob/d8a50e47/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionWriteBehindCoalescingTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionWriteBehindCoalescingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionWriteBehindCoalescingTest.java
new file mode 100644
index 0000000..58cc380
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionWriteBehindCoalescingTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.integration;
+
+import java.util.Collection;
+import javax.cache.Cache;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ * Integration test write behind cache store with {@link CacheConfiguration#getWriteBehindCoalescing()}={@code False}
+ * parameter.
+ */
+public class IgniteCacheTxStoreSessionWriteBehindCoalescingTest extends IgniteCacheStoreSessionWriteBehindAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return TRANSACTIONAL;
+ }
+
+ /**
+ * @param igniteInstanceName Ignite instance name.
+ * @return Cache configuration.
+ * @throws Exception In case of error.
+ */
+ @SuppressWarnings("unchecked")
+ protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception {
+ CacheConfiguration ccfg = super.cacheConfiguration(igniteInstanceName);
+
+ ccfg.setWriteBehindCoalescing(false);
+
+ ccfg.setCacheStoreFactory(singletonFactory(new TestNonCoalescingStore()));
+
+ return ccfg;
+ }
+
+ /**
+ *
+ */
+ private class TestNonCoalescingStore extends TestStore {
+
+ /** {@inheritDoc} */
+ @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) throws CacheWriterException {
+ log.info("writeAll: " + entries);
+
+ assertTrue("Unexpected entries: " + entries, entries.size() <= 10);
+
+ checkSession("writeAll");
+
+ for (int i = 0; i < entries.size(); i++)
+ entLatch.countDown();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) throws CacheWriterException {
+ fail();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void deleteAll(Collection<?> keys) throws CacheWriterException {
+ log.info("deleteAll: " + keys);
+
+ assertTrue("Unexpected keys: " + keys, keys.size() <= 10);
+
+ checkSession("deleteAll");
+
+ for (int i = 0; i < keys.size(); i++)
+ entLatch.countDown();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d8a50e47/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/IgnteCacheClientWriteBehindStoreNonCoalescingTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/IgnteCacheClientWriteBehindStoreNonCoalescingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/IgnteCacheClientWriteBehindStoreNonCoalescingTest.java
index 6a75dbd..4ffa973 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/IgnteCacheClientWriteBehindStoreNonCoalescingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/IgnteCacheClientWriteBehindStoreNonCoalescingTest.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
@@ -36,6 +37,7 @@ import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteFuture;
@@ -70,6 +72,14 @@ public class IgnteCacheClientWriteBehindStoreNonCoalescingTest extends IgniteCac
return new TestIncrementStoreFactory();
}
+ /** {@inheritDoc} */
+ @Override protected boolean writeBehindEnabled() { return true;}
+
+ /** {@inheritDoc} */
+ @Override protected boolean writeBehindCoalescing() { return false;}
+
+ private static Random rnd = new Random();
+
/**
* @throws Exception If failed.
*/
@@ -81,35 +91,30 @@ public class IgnteCacheClientWriteBehindStoreNonCoalescingTest extends IgniteCac
assertEquals(cache.getConfiguration(CacheConfiguration.class).getCacheStoreFactory().getClass(),
TestIncrementStoreFactory.class);
- Set<Integer> keys = new HashSet<>();
-
- for (int i = 0; i < 1000; i++) {
- keys.add(i);
-
+ for (int i = 0; i < CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_SIZE * 2; i++) {
cache.put(i, i);
}
Collection<IgniteFuture<?>> futs = new ArrayList<>();
- for (int i = 0; i < 100; i++)
- futs.add(updateKeys(cache, keys));
+ for (int i = 0; i < 1000; i++)
+ futs.add(updateKey(cache));
for (IgniteFuture<?> fut : futs)
fut.get();
}
/**
- * Update specified keys in async mode.
+ * Update random key in async mode.
*
* @param cache Cache to use.
- * @param keys Keys to update.
* @return IgniteFuture.
*/
- private IgniteFuture<?> updateKeys(IgniteCache<Integer, Integer> cache, Set<Integer> keys) {
+ private IgniteFuture<?> updateKey(IgniteCache<Integer, Integer> cache) {
IgniteCache asyncCache = cache.withAsync();
// Using EntryProcessor.invokeAll to increment every value in place.
- asyncCache.invokeAll(keys, new EntryProcessor<Integer, Integer, Object>() {
+ asyncCache.invoke(rnd.nextInt(100), new EntryProcessor<Integer, Integer, Object>() {
@Override public Object process(MutableEntry<Integer, Integer> entry, Object... arguments)
throws EntryProcessorException {
entry.setValue(entry.getValue() + 1);
@@ -150,7 +155,8 @@ public class IgnteCacheClientWriteBehindStoreNonCoalescingTest extends IgniteCac
@Override public void write(Cache.Entry<? extends Object, ? extends Object> entry) {
Object oldVal = storeMap.put(entry.getKey(), entry.getValue());
- if (oldVal instanceof Integer && entry.getValue() instanceof Integer) {
+
+ if (oldVal != null) {
Integer oldInt = (Integer) oldVal;
Integer newInt = (Integer)entry.getValue();
http://git-wip-us.apache.org/repos/asf/ignite/blob/d8a50e47/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 1b35acb..45f575e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -134,6 +134,7 @@ import org.apache.ignite.internal.processors.cache.integration.IgniteCacheTxNoLo
import org.apache.ignite.internal.processors.cache.integration.IgniteCacheTxNoReadThroughTest;
import org.apache.ignite.internal.processors.cache.integration.IgniteCacheTxNoWriteThroughTest;
import org.apache.ignite.internal.processors.cache.integration.IgniteCacheTxStoreSessionTest;
+import org.apache.ignite.internal.processors.cache.integration.IgniteCacheTxStoreSessionWriteBehindCoalescingTest;
import org.apache.ignite.internal.processors.cache.integration.IgniteCacheTxStoreSessionWriteBehindTest;
import org.apache.ignite.internal.processors.cache.version.CacheVersionedEntryLocalAtomicSwapDisabledSelfTest;
import org.apache.ignite.internal.processors.cache.version.CacheVersionedEntryLocalTransactionalSelfTest;
@@ -172,6 +173,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(IgniteCacheTxStoreSessionTest.class);
suite.addTestSuite(IgniteCacheAtomicStoreSessionWriteBehindTest.class);
suite.addTestSuite(IgniteCacheTxStoreSessionWriteBehindTest.class);
+ suite.addTestSuite(IgniteCacheTxStoreSessionWriteBehindCoalescingTest.class);
suite.addTestSuite(IgniteCacheAtomicNoReadThroughTest.class);
suite.addTestSuite(IgniteCacheAtomicNearEnabledNoReadThroughTest.class);