You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/11/22 11:40:34 UTC

[45/50] [abbrv] ignite git commit: IGNITE-4239: add GridInternal annotaion for tasks instead of jobs. This closes #1250.

IGNITE-4239: add GridInternal annotaion for tasks instead of jobs. This closes #1250.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/861fab9d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/861fab9d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/861fab9d

Branch: refs/heads/master
Commit: 861fab9d0598ca2f06c4a6f293bf2866af31967c
Parents: fc9ee6a
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Tue Nov 22 14:52:03 2016 +0500
Committer: tledkov-gridgain <tl...@gridgain.com>
Committed: Tue Nov 22 14:52:03 2016 +0500

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  11 +-
 .../compute/PublicThreadpoolStarvationTest.java | 129 +++++++++++++++++++
 .../testsuites/IgniteComputeGridTestSuite.java  |   2 +
 3 files changed, 135 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/861fab9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 55400ab..2e24e67 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -5443,7 +5443,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     /**
      * Global clear all.
      */
-    @GridInternal
     private static class GlobalClearAllJob extends TopologyVersionAwareJob {
         /** */
         private static final long serialVersionUID = 0L;
@@ -5482,7 +5481,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     /**
      * Global clear keys.
      */
-    @GridInternal
     private static class GlobalClearKeySetJob<K> extends TopologyVersionAwareJob {
         /** */
         private static final long serialVersionUID = 0L;
@@ -5527,7 +5525,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     /**
      * Global clear all for near cache.
      */
-    @GridInternal
     private static class GlobalClearAllNearJob extends GlobalClearAllJob {
         /** */
         private static final long serialVersionUID = 0L;
@@ -5558,7 +5555,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     /**
      * Global clear keys for near cache.
      */
-    @GridInternal
     private static class GlobalClearKeySetNearJob<K> extends GlobalClearKeySetJob<K> {
         /** */
         private static final long serialVersionUID = 0L;
@@ -5590,7 +5586,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     /**
      * Internal callable for partition size calculation.
      */
-    @GridInternal
     private static class PartitionSizeLongJob extends TopologyVersionAwareJob {
         /** */
         private static final long serialVersionUID = 0L;
@@ -5636,7 +5631,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     /**
      * Internal callable for global size calculation.
      */
-    @GridInternal
     private static class SizeJob extends TopologyVersionAwareJob {
         /** */
         private static final long serialVersionUID = 0L;
@@ -5677,7 +5671,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     /**
      * Internal callable for global size calculation.
      */
-    @GridInternal
     private static class SizeLongJob extends TopologyVersionAwareJob {
         /** */
         private static final long serialVersionUID = 0L;
@@ -6523,6 +6516,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     /**
      * Size task.
      */
+    @GridInternal
     private static class SizeTask extends ComputeTaskAdapter<Object, Integer> {
         /** */
         private static final long serialVersionUID = 0L;
@@ -6588,6 +6582,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     /**
      * Size task.
      */
+    @GridInternal
     private static class SizeLongTask extends ComputeTaskAdapter<Object, Long> {
         /** */
         private static final long serialVersionUID = 0L;
@@ -6653,6 +6648,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     /**
      * Partition Size Long task.
      */
+    @GridInternal
     private static class PartitionSizeLongTask extends ComputeTaskAdapter<Object, Long> {
         /** */
         private static final long serialVersionUID = 0L;
@@ -6737,6 +6733,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     /**
      * Clear task.
      */
+    @GridInternal
     private static class ClearTask<K> extends ComputeTaskAdapter<Object, Object> {
         /** */
         private static final long serialVersionUID = 0L;

http://git-wip-us.apache.org/repos/asf/ignite/blob/861fab9d/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/PublicThreadpoolStarvationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/PublicThreadpoolStarvationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/PublicThreadpoolStarvationTest.java
new file mode 100644
index 0000000..e587310
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/PublicThreadpoolStarvationTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compute;
+
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ * Test to validate https://issues.apache.org/jira/browse/IGNITE-4239
+ * Jobs hang when a lot of jobs calculate cache.
+ */
+public class PublicThreadpoolStarvationTest extends GridCacheAbstractSelfTest {
+    /** Cache size. */
+    private static final int CACHE_SIZE = 10;
+
+    /** Cache size. */
+    private static final String CACHE_NAME = "test";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setPublicThreadPoolSize(1);
+
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+        cfg.setMarshaller(new BinaryMarshaller());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Class<?>[] indexedTypes() {
+        return new Class<?>[] {
+            Integer.class, String.class,
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        info("Fill caches begin...");
+
+        fillCaches();
+
+        info("Caches are filled.");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        grid(0).destroyCache(CACHE_NAME);
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void fillCaches() throws Exception {
+        grid(0).createCache(CACHE_NAME);
+
+        try (
+            IgniteDataStreamer<Integer, String> streamer =
+                grid(0).dataStreamer(CACHE_NAME)) {
+
+            for (int i = 0; i < CACHE_SIZE; ++i)
+                streamer.addData(i, "Data " + i);
+        }
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheSizeOnPublicThreadpoolStarvation() throws Exception {
+        grid(0).compute().run(new IgniteRunnable() {
+            @Override public void run() {
+                try {
+                    Thread.sleep(500);
+                }
+                catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+                grid(0).cache(CACHE_NAME).size();
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/861fab9d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index a1a75f8..8a501fd 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -71,6 +71,7 @@ import org.apache.ignite.internal.TaskNodeRestartTest;
 import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManagerSelfTest;
 import org.apache.ignite.internal.managers.checkpoint.GridCheckpointTaskSelfTest;
 import org.apache.ignite.internal.managers.communication.GridCommunicationManagerListenersSelfTest;
+import org.apache.ignite.internal.processors.compute.PublicThreadpoolStarvationTest;
 import org.apache.ignite.p2p.GridMultinodeRedeployContinuousModeSelfTest;
 import org.apache.ignite.p2p.GridMultinodeRedeployIsolatedModeSelfTest;
 import org.apache.ignite.p2p.GridMultinodeRedeployPrivateModeSelfTest;
@@ -150,6 +151,7 @@ public class IgniteComputeGridTestSuite {
         suite.addTestSuite(GridTaskFailoverAffinityRunTest.class);
         suite.addTestSuite(TaskNodeRestartTest.class);
         suite.addTestSuite(IgniteRoundRobinErrorAfterClientReconnectTest.class);
+        suite.addTestSuite(PublicThreadpoolStarvationTest.class);
 
         return suite;
     }