You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/26 08:28:57 UTC

[2/7] ignite git commit: IGNITE-6654 Ignite client can hang in case IgniteOOM on server. This closes #2908.

IGNITE-6654 Ignite client can hang in case IgniteOOM on server. This closes #2908.

Signed-off-by: nikolay_tikhonov <nt...@gridgain.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/918febaa
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/918febaa
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/918febaa

Branch: refs/heads/ignite-6667
Commit: 918febaa17efa9e109fc68d268afbc7109a800e9
Parents: 6ed872b
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Wed Oct 25 18:46:59 2017 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Oct 25 18:46:59 2017 +0300

----------------------------------------------------------------------
 .../pagemem/impl/PageMemoryNoStoreImpl.java     |   6 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   7 +-
 .../datastreamer/DataStreamerImpl.java          |  23 +-
 .../cache/IgniteOutOfMemoryPropagationTest.java | 251 +++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite6.java       |   5 +
 5 files changed, 285 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/918febaa/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
index 6ba68c2..e219d6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
@@ -290,9 +290,11 @@ public class PageMemoryNoStoreImpl implements PageMemory {
 
         if (relPtr == INVALID_REL_PTR)
             throw new IgniteOutOfMemoryException("Not enough memory allocated " +
-                "(consider increasing data region size or enabling evictions) " +
                 "[policyName=" + dataRegionCfg.getName() +
-                ", size=" + U.readableSize(dataRegionCfg.getMaxSize(), true) + "]"
+                ", size=" + U.readableSize(dataRegionCfg.getMaxSize(), true) + "]" + U.nl() +
+                "Consider increasing memory policy size, enabling evictions, adding more nodes to the cluster, " +
+                "reducing number of backups or reducing model size."
+
             );
 
         assert (relPtr & ~PageIdUtils.PAGE_IDX_MASK) == 0 : U.hexLong(relPtr & ~PageIdUtils.PAGE_IDX_MASK);

http://git-wip-us.apache.org/repos/asf/ignite/blob/918febaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 5095f45..a7dd615 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -38,6 +38,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.mem.IgniteOutOfMemoryException;
 import org.apache.ignite.internal.pagemem.wal.StorageException;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -96,6 +97,7 @@ import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.CX1;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -3213,7 +3215,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 catch (GridDhtInvalidPartitionException ignored) {
                     // Ignore.
                 }
-                catch (IgniteCheckedException e) {
+                catch (IgniteCheckedException|RuntimeException e) {
+                    if(e instanceof RuntimeException && !X.hasCause(e, IgniteOutOfMemoryException.class))
+                        throw (RuntimeException)e;
+
                     IgniteCheckedException err = new IgniteCheckedException("Failed to update key on backup node: " + key, e);
 
                     if (nearRes != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/918febaa/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 6ed552a..d38132f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -188,6 +188,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
     /** {@code True} if data loader has been cancelled. */
     private volatile boolean cancelled;
 
+    /** Cancellation reason. */
+    private volatile Throwable cancellationReason = null;
+
     /** Fail counter. */
     private final LongAdder8 failCntr = new LongAdder8();
 
@@ -210,7 +213,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
                 failCntr.increment();
 
-                cancelled = true;
+                synchronized (DataStreamerImpl.this) {
+                    if(cancellationReason == null)
+                        cancellationReason = err;
+
+                    cancelled = true;
+                }
             }
         }
     };
@@ -399,12 +407,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             if (disconnectErr != null)
                 throw disconnectErr;
 
-            throw new IllegalStateException("Data streamer has been closed.");
+            closedException();
         }
         else if (cancelled) {
             busyLock.leaveBusy();
 
-            throw new IllegalStateException("Data streamer has been closed.");
+            closedException();
         }
     }
 
@@ -886,7 +894,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                                             @Override public void run() {
                                                 try {
                                                     if (cancelled)
-                                                        throw new IllegalStateException("DataStreamer closed.");
+                                                        closedException();
 
                                                     load0(entriesForNode, resFut, activeKeys, remaps + 1);
                                                 }
@@ -990,6 +998,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
     }
 
     /**
+     * Throws stream closed exception.
+     */
+    private void closedException() {
+        throw new IllegalStateException("Data streamer has been closed.", cancellationReason);
+    }
+
+    /**
      * @param key Key to map.
      * @param topVer Topology version.
      * @param cctx Context.

http://git-wip-us.apache.org/repos/asf/ignite/blob/918febaa/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOutOfMemoryPropagationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOutOfMemoryPropagationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOutOfMemoryPropagationTest.java
new file mode 100644
index 0000000..a13cbd4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOutOfMemoryPropagationTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.mem.IgniteOutOfMemoryException;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+/**
+ *
+ */
+public class IgniteOutOfMemoryPropagationTest extends GridCommonAbstractTest {
+
+    /** */
+    public static final int NODES = 3;
+
+    /** */
+    private CacheAtomicityMode atomicityMode;
+
+    /** */
+    private CacheMode mode;
+
+    /** */
+    private int backupsCount;
+
+    /** */
+    private CacheWriteSynchronizationMode writeSyncMode;
+
+    /** */
+    private IgniteEx client;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        assert G.allGrids().isEmpty();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 20 * 60 * 1000;
+    }
+
+    /** */
+    public void testPutOOMPropagation() throws Exception {
+        testOOMPropagation(false);
+    }
+
+    /** */
+    public void testStreamerOOMPropagation() throws Exception {
+        testOOMPropagation(true);
+    }
+
+    /** */
+    private void testOOMPropagation(boolean useStreamer) throws Exception {
+        for (CacheAtomicityMode atomicityMode : CacheAtomicityMode.values()) {
+            for (CacheMode cacheMode : CacheMode.values()) {
+                for (CacheWriteSynchronizationMode writeSyncMode : CacheWriteSynchronizationMode.values()) {
+                    for (int backupsCount = 0; backupsCount < 1; backupsCount++) {
+                        if (writeSyncMode == CacheWriteSynchronizationMode.FULL_ASYNC
+                            || cacheMode == CacheMode.REPLICATED)
+                            continue;
+
+                        if (atomicityMode == CacheAtomicityMode.TRANSACTIONAL && !useStreamer) {
+                            for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+                                for (TransactionIsolation isolation : TransactionIsolation.values()) {
+                                    checkOOMPropagation(
+                                        false,
+                                        CacheAtomicityMode.TRANSACTIONAL,
+                                        cacheMode,
+                                        writeSyncMode,
+                                        backupsCount,
+                                        concurrency,
+                                        isolation);
+                                }
+                            }
+                        }
+                        else
+                            checkOOMPropagation(useStreamer, atomicityMode, cacheMode, writeSyncMode, backupsCount);
+                    }
+                }
+            }
+        }
+    }
+
+    /** */
+    private void checkOOMPropagation(boolean useStreamer, CacheAtomicityMode atomicityMode, CacheMode cacheMode,
+        CacheWriteSynchronizationMode writeSyncMode, int backupsCount) throws Exception {
+        checkOOMPropagation(useStreamer, atomicityMode, cacheMode, writeSyncMode, backupsCount, null, null);
+    }
+
+    /** */
+    private void checkOOMPropagation(boolean useStreamer, CacheAtomicityMode atomicityMode, CacheMode cacheMode,
+        CacheWriteSynchronizationMode writeSyncMode, int backupsCount,
+        TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception {
+        Throwable t = null;
+
+        System.out.println("Checking conf: CacheAtomicityMode." + atomicityMode +
+            " CacheMode." + mode + " CacheWriteSynchronizationMode." + writeSyncMode + " backupsCount = " + backupsCount
+            + " TransactionConcurrency." + concurrency + " TransactionIsolation." + isolation);
+
+        initGrid(atomicityMode, cacheMode, writeSyncMode, backupsCount);
+        try {
+            forceOOM(useStreamer, concurrency, isolation);
+        }
+        catch (Throwable t0) {
+            t = t0;
+
+            t.printStackTrace(System.out);
+
+            assertTrue(X.hasCause(t, IgniteOutOfMemoryException.class, ClusterTopologyException.class));
+        }
+        finally {
+            assertNotNull(t);
+
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Ignite grid of 3 server nodes with passed parameters.
+     *
+     * @param atomicityMode atomicity mode
+     * @param mode cache mode
+     * @param writeSyncMode cache write synchronization mode
+     * @param backupsCount backups count
+     * @throws Exception
+     */
+    private void initGrid(CacheAtomicityMode atomicityMode, CacheMode mode,
+        CacheWriteSynchronizationMode writeSyncMode, int backupsCount) throws Exception {
+
+        this.atomicityMode = atomicityMode;
+        this.mode = mode;
+        this.backupsCount = backupsCount;
+        this.writeSyncMode = writeSyncMode;
+
+        Ignition.setClientMode(false);
+
+        for (int i = 0; i < NODES; i++)
+            startGrid(i);
+
+        Ignition.setClientMode(true);
+
+        client = startGrid(NODES + 1);
+
+        // it is required to start first node in test jvm, but we can not start client node,
+        // because client will fail to connect and test will fail too.
+        // as workaround start first server node in test jvm and then stop it.
+        stopGrid(0);
+    }
+
+
+    /** */
+    public void forceOOM(boolean useStreamer, TransactionConcurrency concurrency,
+        TransactionIsolation isolation) throws Exception {
+        final IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+        IgniteDataStreamer<String, String> streamer = client.dataStreamer(DEFAULT_CACHE_NAME);
+
+        Map<String, String> map = new HashMap<>();
+
+        Transaction tx = null;
+
+        for (int i = 0; i < Integer.MAX_VALUE; i++) {
+            map.put("k" + i, "v" + i);
+
+            if (map.size() > 1_000) {
+                if (concurrency != null && isolation != null)
+                    tx = client.transactions().txStart(concurrency, isolation);
+
+                if (useStreamer)
+                    streamer.addData(map);
+                else
+                    cache.putAll(map);
+
+                map.clear();
+
+                if (tx != null) {
+                    tx.commit();
+                    tx.close();
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean isMultiJvm() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean isRemoteJvm(String igniteInstanceName) {
+        return !(Ignition.isClientMode() || igniteInstanceName.endsWith("0"));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        DataStorageConfiguration memCfg = new DataStorageConfiguration();
+
+        memCfg.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+            .setMaxSize(10 * 1024 * 1024 + 1));
+
+        cfg.setDataStorageConfiguration(memCfg);
+
+        CacheConfiguration<Object, Object> baseCfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+
+        baseCfg.setAtomicityMode(this.atomicityMode);
+        baseCfg.setCacheMode(this.mode);
+        baseCfg.setBackups(this.backupsCount);
+        baseCfg.setWriteSynchronizationMode(this.writeSyncMode);
+
+        cfg.setCacheConfiguration(baseCfg);
+
+        return cfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/918febaa/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index 7c71381..8a2d6a0 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.IgniteOutOfMemoryPropagationTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest;
 import org.apache.ignite.internal.processors.cache.distributed.CachePartitionStateTest;
 import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionEvictionDuringReadThroughSelfTest;
@@ -54,6 +55,10 @@ public class IgniteCacheTestSuite6 extends TestSuite {
         suite.addTestSuite(TxRollbackOnTimeoutNearCacheTest.class);
         suite.addTestSuite(IgniteCacheThreadLocalTxTest.class);
 
+
+//        TODO enable this test after IGNITE-6753, now it takes too long
+//        suite.addTestSuite(IgniteOutOfMemoryPropagationTest.class);
+
         return suite;
     }
 }