You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/10/30 12:26:56 UTC
[02/23] ignite git commit: IGNITE-6654 Ignite client can hang in case
IgniteOOM on server. This closes #2908.
IGNITE-6654 Ignite client can hang in case IgniteOOM on server. This closes #2908.
Signed-off-by: nikolay_tikhonov <nt...@gridgain.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/918febaa
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/918febaa
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/918febaa
Branch: refs/heads/ignite-6748
Commit: 918febaa17efa9e109fc68d268afbc7109a800e9
Parents: 6ed872b
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Wed Oct 25 18:46:59 2017 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Oct 25 18:46:59 2017 +0300
----------------------------------------------------------------------
.../pagemem/impl/PageMemoryNoStoreImpl.java | 6 +-
.../dht/atomic/GridDhtAtomicCache.java | 7 +-
.../datastreamer/DataStreamerImpl.java | 23 +-
.../cache/IgniteOutOfMemoryPropagationTest.java | 251 +++++++++++++++++++
.../testsuites/IgniteCacheTestSuite6.java | 5 +
5 files changed, 285 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/918febaa/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
index 6ba68c2..e219d6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
@@ -290,9 +290,11 @@ public class PageMemoryNoStoreImpl implements PageMemory {
if (relPtr == INVALID_REL_PTR)
throw new IgniteOutOfMemoryException("Not enough memory allocated " +
- "(consider increasing data region size or enabling evictions) " +
"[policyName=" + dataRegionCfg.getName() +
- ", size=" + U.readableSize(dataRegionCfg.getMaxSize(), true) + "]"
+ ", size=" + U.readableSize(dataRegionCfg.getMaxSize(), true) + "]" + U.nl() +
+ "Consider increasing memory policy size, enabling evictions, adding more nodes to the cluster, " +
+ "reducing number of backups or reducing model size."
+
);
assert (relPtr & ~PageIdUtils.PAGE_IDX_MASK) == 0 : U.hexLong(relPtr & ~PageIdUtils.PAGE_IDX_MASK);
http://git-wip-us.apache.org/repos/asf/ignite/blob/918febaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 5095f45..a7dd615 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -38,6 +38,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.mem.IgniteOutOfMemoryException;
import org.apache.ignite.internal.pagemem.wal.StorageException;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -96,6 +97,7 @@ import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -3213,7 +3215,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
catch (GridDhtInvalidPartitionException ignored) {
// Ignore.
}
- catch (IgniteCheckedException e) {
+ catch (IgniteCheckedException|RuntimeException e) {
+ if(e instanceof RuntimeException && !X.hasCause(e, IgniteOutOfMemoryException.class))
+ throw (RuntimeException)e;
+
IgniteCheckedException err = new IgniteCheckedException("Failed to update key on backup node: " + key, e);
if (nearRes != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/918febaa/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 6ed552a..d38132f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -188,6 +188,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/** {@code True} if data loader has been cancelled. */
private volatile boolean cancelled;
+ /** Cancellation reason. */
+ private volatile Throwable cancellationReason = null;
+
/** Fail counter. */
private final LongAdder8 failCntr = new LongAdder8();
@@ -210,7 +213,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
failCntr.increment();
- cancelled = true;
+ synchronized (DataStreamerImpl.this) {
+ if(cancellationReason == null)
+ cancellationReason = err;
+
+ cancelled = true;
+ }
}
}
};
@@ -399,12 +407,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
if (disconnectErr != null)
throw disconnectErr;
- throw new IllegalStateException("Data streamer has been closed.");
+ closedException();
}
else if (cancelled) {
busyLock.leaveBusy();
- throw new IllegalStateException("Data streamer has been closed.");
+ closedException();
}
}
@@ -886,7 +894,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
@Override public void run() {
try {
if (cancelled)
- throw new IllegalStateException("DataStreamer closed.");
+ closedException();
load0(entriesForNode, resFut, activeKeys, remaps + 1);
}
@@ -990,6 +998,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
}
/**
+ * Throws stream closed exception.
+ */
+ private void closedException() {
+ throw new IllegalStateException("Data streamer has been closed.", cancellationReason);
+ }
+
+ /**
* @param key Key to map.
* @param topVer Topology version.
* @param cctx Context.
http://git-wip-us.apache.org/repos/asf/ignite/blob/918febaa/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOutOfMemoryPropagationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOutOfMemoryPropagationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOutOfMemoryPropagationTest.java
new file mode 100644
index 0000000..a13cbd4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOutOfMemoryPropagationTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.mem.IgniteOutOfMemoryException;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+/**
+ *
+ */
+public class IgniteOutOfMemoryPropagationTest extends GridCommonAbstractTest {
+
+ /** */
+ public static final int NODES = 3;
+
+ /** */
+ private CacheAtomicityMode atomicityMode;
+
+ /** */
+ private CacheMode mode;
+
+ /** */
+ private int backupsCount;
+
+ /** */
+ private CacheWriteSynchronizationMode writeSyncMode;
+
+ /** */
+ private IgniteEx client;
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ assert G.allGrids().isEmpty();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 20 * 60 * 1000;
+ }
+
+ /** */
+ public void testPutOOMPropagation() throws Exception {
+ testOOMPropagation(false);
+ }
+
+ /** */
+ public void testStreamerOOMPropagation() throws Exception {
+ testOOMPropagation(true);
+ }
+
+ /** */
+ private void testOOMPropagation(boolean useStreamer) throws Exception {
+ for (CacheAtomicityMode atomicityMode : CacheAtomicityMode.values()) {
+ for (CacheMode cacheMode : CacheMode.values()) {
+ for (CacheWriteSynchronizationMode writeSyncMode : CacheWriteSynchronizationMode.values()) {
+ for (int backupsCount = 0; backupsCount < 1; backupsCount++) {
+ if (writeSyncMode == CacheWriteSynchronizationMode.FULL_ASYNC
+ || cacheMode == CacheMode.REPLICATED)
+ continue;
+
+ if (atomicityMode == CacheAtomicityMode.TRANSACTIONAL && !useStreamer) {
+ for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ checkOOMPropagation(
+ false,
+ CacheAtomicityMode.TRANSACTIONAL,
+ cacheMode,
+ writeSyncMode,
+ backupsCount,
+ concurrency,
+ isolation);
+ }
+ }
+ }
+ else
+ checkOOMPropagation(useStreamer, atomicityMode, cacheMode, writeSyncMode, backupsCount);
+ }
+ }
+ }
+ }
+ }
+
+ /** */
+ private void checkOOMPropagation(boolean useStreamer, CacheAtomicityMode atomicityMode, CacheMode cacheMode,
+ CacheWriteSynchronizationMode writeSyncMode, int backupsCount) throws Exception {
+ checkOOMPropagation(useStreamer, atomicityMode, cacheMode, writeSyncMode, backupsCount, null, null);
+ }
+
+ /** */
+ private void checkOOMPropagation(boolean useStreamer, CacheAtomicityMode atomicityMode, CacheMode cacheMode,
+ CacheWriteSynchronizationMode writeSyncMode, int backupsCount,
+ TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception {
+ Throwable t = null;
+
+ System.out.println("Checking conf: CacheAtomicityMode." + atomicityMode +
+ " CacheMode." + mode + " CacheWriteSynchronizationMode." + writeSyncMode + " backupsCount = " + backupsCount
+ + " TransactionConcurrency." + concurrency + " TransactionIsolation." + isolation);
+
+ initGrid(atomicityMode, cacheMode, writeSyncMode, backupsCount);
+ try {
+ forceOOM(useStreamer, concurrency, isolation);
+ }
+ catch (Throwable t0) {
+ t = t0;
+
+ t.printStackTrace(System.out);
+
+ assertTrue(X.hasCause(t, IgniteOutOfMemoryException.class, ClusterTopologyException.class));
+ }
+ finally {
+ assertNotNull(t);
+
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * Ignite grid of 3 server nodes with passed parameters.
+ *
+ * @param atomicityMode atomicity mode
+ * @param mode cache mode
+ * @param writeSyncMode cache write synchronization mode
+ * @param backupsCount backups count
+ * @throws Exception
+ */
+ private void initGrid(CacheAtomicityMode atomicityMode, CacheMode mode,
+ CacheWriteSynchronizationMode writeSyncMode, int backupsCount) throws Exception {
+
+ this.atomicityMode = atomicityMode;
+ this.mode = mode;
+ this.backupsCount = backupsCount;
+ this.writeSyncMode = writeSyncMode;
+
+ Ignition.setClientMode(false);
+
+ for (int i = 0; i < NODES; i++)
+ startGrid(i);
+
+ Ignition.setClientMode(true);
+
+ client = startGrid(NODES + 1);
+
+ // it is required to start first node in test jvm, but we can not start client node,
+ // because client will fail to connect and test will fail too.
+ // as workaround start first server node in test jvm and then stop it.
+ stopGrid(0);
+ }
+
+
+ /** */
+ public void forceOOM(boolean useStreamer, TransactionConcurrency concurrency,
+ TransactionIsolation isolation) throws Exception {
+ final IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ IgniteDataStreamer<String, String> streamer = client.dataStreamer(DEFAULT_CACHE_NAME);
+
+ Map<String, String> map = new HashMap<>();
+
+ Transaction tx = null;
+
+ for (int i = 0; i < Integer.MAX_VALUE; i++) {
+ map.put("k" + i, "v" + i);
+
+ if (map.size() > 1_000) {
+ if (concurrency != null && isolation != null)
+ tx = client.transactions().txStart(concurrency, isolation);
+
+ if (useStreamer)
+ streamer.addData(map);
+ else
+ cache.putAll(map);
+
+ map.clear();
+
+ if (tx != null) {
+ tx.commit();
+ tx.close();
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean isMultiJvm() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean isRemoteJvm(String igniteInstanceName) {
+ return !(Ignition.isClientMode() || igniteInstanceName.endsWith("0"));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ DataStorageConfiguration memCfg = new DataStorageConfiguration();
+
+ memCfg.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setMaxSize(10 * 1024 * 1024 + 1));
+
+ cfg.setDataStorageConfiguration(memCfg);
+
+ CacheConfiguration<Object, Object> baseCfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+
+ baseCfg.setAtomicityMode(this.atomicityMode);
+ baseCfg.setCacheMode(this.mode);
+ baseCfg.setBackups(this.backupsCount);
+ baseCfg.setWriteSynchronizationMode(this.writeSyncMode);
+
+ cfg.setCacheConfiguration(baseCfg);
+
+ return cfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/918febaa/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index 7c71381..8a2d6a0 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -18,6 +18,7 @@
package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.IgniteOutOfMemoryPropagationTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest;
import org.apache.ignite.internal.processors.cache.distributed.CachePartitionStateTest;
import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionEvictionDuringReadThroughSelfTest;
@@ -54,6 +55,10 @@ public class IgniteCacheTestSuite6 extends TestSuite {
suite.addTestSuite(TxRollbackOnTimeoutNearCacheTest.class);
suite.addTestSuite(IgniteCacheThreadLocalTxTest.class);
+
+// TODO enable this test after IGNITE-6753, now it takes too long
+// suite.addTestSuite(IgniteOutOfMemoryPropagationTest.class);
+
return suite;
}
}