You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/23 15:26:42 UTC
[08/24] ignite git commit: ignite-3810 Fixed hang in FileSwapSpaceSpi
when too large value is stored
ignite-3810 Fixed hang in FileSwapSpaceSpi when too large value is stored
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/780bf23d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/780bf23d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/780bf23d
Branch: refs/heads/ignite-comm-opts2
Commit: 780bf23d5c89452dd062be4fab9e2e56d50bb9e2
Parents: 9b72d18
Author: sboikov <sb...@gridgain.com>
Authored: Mon Sep 19 18:19:33 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Sep 19 18:19:33 2016 +0300
----------------------------------------------------------------------
.../spi/swapspace/file/FileSwapSpaceSpi.java | 38 +++++++--
.../CacheSwapUnswapGetTestSmallQueueSize.java | 35 ++++++++
.../file/GridFileSwapSpaceSpiSelfTest.java | 89 ++++++++++++++++++++
.../testsuites/IgniteCacheTestSuite4.java | 2 +
4 files changed, 158 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
index 8809f08..9be5b93 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
@@ -639,7 +639,7 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
if (space == null && create) {
validateName(name);
- Space old = spaces.putIfAbsent(masked, space = new Space(masked));
+ Space old = spaces.putIfAbsent(masked, space = new Space(masked, log));
if (old != null)
space = old;
@@ -833,13 +833,21 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
/** */
private final int maxSize;
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private boolean queueSizeWarn;
+
/**
* @param minTakeSize Min size.
* @param maxSize Max size.
+ * @param log logger
*/
- private SwapValuesQueue(int minTakeSize, int maxSize) {
+ private SwapValuesQueue(int minTakeSize, int maxSize, IgniteLogger log) {
this.minTakeSize = minTakeSize;
this.maxSize = maxSize;
+ this.log = log;
}
/**
@@ -852,8 +860,24 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
lock.lock();
try {
- while (size + val.len > maxSize)
- mayAdd.await();
+ boolean largeVal = val.len > maxSize;
+
+ if (largeVal) {
+ if (!queueSizeWarn) {
+ U.warn(log, "Trying to save in swap entry which have size more than write queue size. " +
+ "You may wish to increase 'maxWriteQueueSize' in FileSwapSpaceSpi configuration " +
+ "[queueMaxSize=" + maxSize + ", valSize=" + val.len + ']');
+
+ queueSizeWarn = true;
+ }
+
+ while (size >= minTakeSize)
+ mayAdd.await();
+ }
+ else {
+ while (size + val.len > maxSize)
+ mayAdd.await();
+ }
size += val.len;
@@ -1419,7 +1443,7 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
private SwapFile right;
/** */
- private final SwapValuesQueue que = new SwapValuesQueue(writeBufSize, maxWriteQueSize);
+ private final SwapValuesQueue que;
/** Partitions. */
private final ConcurrentMap<Integer, ConcurrentMap<SwapKey, SwapValue>> parts =
@@ -1442,11 +1466,13 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
/**
* @param name Space name.
+ * @param log Logger.
*/
- private Space(String name) {
+ private Space(String name, IgniteLogger log) {
assert name != null;
this.name = name;
+ this.que = new SwapValuesQueue(writeBufSize, maxWriteQueSize, log);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java
new file mode 100644
index 0000000..8d189fe
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpi;
+
+/**
+ *
+ */
+public class CacheSwapUnswapGetTestSmallQueueSize extends CacheSwapUnswapGetTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((FileSwapSpaceSpi)cfg.getSwapSpaceSpi()).setMaxWriteQueueSize(2);
+
+ return cfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
index 64652b1..ab21165 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
@@ -25,11 +25,14 @@ import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
@@ -37,8 +40,10 @@ import org.apache.ignite.spi.IgniteSpiCloseableIterator;
import org.apache.ignite.spi.swapspace.GridSwapSpaceSpiAbstractSelfTest;
import org.apache.ignite.spi.swapspace.SwapKey;
import org.apache.ignite.spi.swapspace.SwapSpaceSpi;
+import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import org.junit.Assert;
/**
* Test for {@link FileSwapSpaceSpi}.
@@ -364,4 +369,88 @@ public class GridFileSwapSpaceSpiSelfTest extends GridSwapSpaceSpiAbstractSelfTe
assertEquals(hash0, hash1);
}
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testSaveValueLargeThenQueueSize() throws IgniteCheckedException {
+ final String spaceName = "mySpace";
+ final SwapKey key = new SwapKey("key");
+
+ final byte[] val = new byte[FileSwapSpaceSpi.DFLT_QUE_SIZE * 2];
+ Arrays.fill(val, (byte)1);
+
+ IgniteInternalFuture<byte[]> fut = GridTestUtils.runAsync(new Callable<byte[]>() {
+ @Override public byte[] call() throws Exception {
+ return saveAndGet(spaceName, key, val);
+ }
+ });
+
+ byte[] bytes = fut.get(10_000);
+
+ Assert.assertArrayEquals(val, bytes);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testSaveValueLargeThenQueueSizeMultiThreaded() throws Exception {
+ final String spaceName = "mySpace";
+
+ final int threads = 5;
+
+ long DURATION = 30_000;
+
+ final int maxSize = FileSwapSpaceSpi.DFLT_QUE_SIZE * 2;
+
+ final AtomicBoolean done = new AtomicBoolean();
+
+ try {
+ IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (!done.get()) {
+ SwapKey key = new SwapKey(rnd.nextInt(1000));
+
+ spi.store(spaceName, key, new byte[rnd.nextInt(0, maxSize)], context());
+ }
+
+ return null;
+ }
+ }, threads, " async-put");
+
+ Thread.sleep(DURATION);
+
+ done.set(true);
+
+ fut.get();
+ }
+ finally {
+ done.set(true);
+ }
+ }
+
+ /**
+ * @param spaceName Space name.
+ * @param key Key.
+ * @param val Value.
+ * @throws Exception If failed.
+ * @return Read bytes.
+ */
+ private byte[] saveAndGet(final String spaceName, final SwapKey key, byte[] val) throws Exception {
+ spi.store(spaceName, key, val, context());
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return spi.read(spaceName, key, context()) != null;
+ }
+ }, 10_000);
+
+ byte[] res = spi.read(spaceName, key, context());
+
+ assertNotNull(res);
+
+ return res;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 60d59d7..c494e73 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeDynam
import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeStaticStartAtomicTest;
import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeStaticStartTxTest;
import org.apache.ignite.internal.processors.cache.CacheSwapUnswapGetTest;
+import org.apache.ignite.internal.processors.cache.CacheSwapUnswapGetTestSmallQueueSize;
import org.apache.ignite.internal.processors.cache.CacheTxNotAllowReadFromBackupTest;
import org.apache.ignite.internal.processors.cache.CrossCacheLockTest;
import org.apache.ignite.internal.processors.cache.GridCacheMarshallingNodeJoinSelfTest;
@@ -304,6 +305,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(CacheVersionedEntryReplicatedTransactionalOffHeapSelfTest.class);
suite.addTestSuite(CacheSwapUnswapGetTest.class);
+ suite.addTestSuite(CacheSwapUnswapGetTestSmallQueueSize.class);
suite.addTestSuite(GridCacheDhtTxPreloadSelfTest.class);
suite.addTestSuite(GridCacheNearTxPreloadSelfTest.class);