You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/28 08:34:08 UTC

[12/16] ignite git commit: IGNITE-2874: .NET: Fixed IDataStreamer performance regression caused by incorrect topology size calculation. This closes #572.

IGNITE-2874: .NET: Fixed IDataStreamer performance regression caused by incorrect topology size calculation. This closes #572.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fb9e9b78
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fb9e9b78
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fb9e9b78

Branch: refs/heads/ignite-gg-10994
Commit: fb9e9b78520a6258422e9e2f498f5e9331ae5197
Parents: 166bce8
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Fri Mar 25 16:59:05 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Mar 25 16:59:05 2016 +0300

----------------------------------------------------------------------
 .../datastreamer/PlatformDataStreamer.java      |   7 +-
 .../platform/PlatformAttributeNodeFilter.java   |  31 ++++++
 .../Apache.Ignite.Core.Tests.csproj             |   5 +
 .../Config/cache-local-node.xml                 |  65 ++++++++++++
 .../Dataload/DataStreamerTestTopologyChange.cs  | 104 +++++++++++++++++++
 .../Impl/Datastream/DataStreamerImpl.cs         |   9 +-
 6 files changed, 215 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fb9e9b78/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
index 07ef4f2..78d5d86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
@@ -162,10 +162,11 @@ public class PlatformDataStreamer extends PlatformAbstractTarget {
 
         GridDiscoveryManager discoMgr = platformCtx.kernalContext().discovery();
 
-        long topVer = discoMgr.topologyVersion();
-        int topSize = discoMgr.cacheNodes(cacheName, new AffinityTopologyVersion(topVer)).size();
+        AffinityTopologyVersion topVer = discoMgr.topologyVersionEx();
 
-        platformCtx.gateway().dataStreamerTopologyUpdate(ptr, topVer, topSize);
+        int topSize = discoMgr.cacheNodes(cacheName, topVer).size();
+
+        platformCtx.gateway().dataStreamerTopologyUpdate(ptr, topVer.topologyVersion(), topSize);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/fb9e9b78/modules/core/src/test/java/org/apache/ignite/platform/PlatformAttributeNodeFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformAttributeNodeFilter.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformAttributeNodeFilter.java
new file mode 100644
index 0000000..bbd08a4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformAttributeNodeFilter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.platform;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.lang.IgnitePredicate;
+
+/**
+ * Node filter that allows nodes with an attribute.
+ */
+public class PlatformAttributeNodeFilter implements IgnitePredicate<ClusterNode> {
+    /** {@inheritDoc} */
+    @Override public boolean apply(ClusterNode node) {
+        return node.attributes().containsKey("platformAttributeNode");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/fb9e9b78/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
index 7cc9296..38ddd4d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
@@ -114,6 +114,7 @@
     <Compile Include="Compute\TaskAdapterTest.cs" />
     <Compile Include="Compute\TaskResultTest.cs" />
     <Compile Include="Dataload\DataStreamerTest.cs" />
+    <Compile Include="Dataload\DataStreamerTestTopologyChange.cs" />
     <Compile Include="DataStructures\AtomicLongTest.cs" />
     <Compile Include="DataStructures\AtomicReferenceTest.cs" />
     <Compile Include="DataStructures\AtomicSequenceTest.cs" />
@@ -188,6 +189,10 @@
     <Content Include="Config\cache-binarizables.xml">
       <CopyToOutputDirectory>Always</CopyToOutputDirectory>
     </Content>
+    <Content Include="Config\cache-local-node.xml">
+      <SubType>Designer</SubType>
+      <CopyToOutputDirectory>Always</CopyToOutputDirectory>
+    </Content>
     <Content Include="Config\cache-query-continuous.xml">
       <CopyToOutputDirectory>Always</CopyToOutputDirectory>
     </Content>

http://git-wip-us.apache.org/repos/asf/ignite/blob/fb9e9b78/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/cache-local-node.xml
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/cache-local-node.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/cache-local-node.xml
new file mode 100644
index 0000000..d6e4dd2
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/cache-local-node.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:util="http://www.springframework.org/schema/util"
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans
+        http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util
+        http://www.springframework.org/schema/util/spring-util.xsd">
+    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+        <property name="localHost" value="127.0.0.1"/>
+        <property name="connectorConfiguration"><null/></property>
+        
+        <property name="userAttributes">
+            <map>
+                <entry key="platformAttributeNode" value="true"/>
+            </map>
+        </property>
+        
+
+        <property name="cacheConfiguration">
+            <list>
+                <bean class="org.apache.ignite.configuration.CacheConfiguration">
+                    <property name="name" value="cache"/>
+                    <property name="nodeFilter">
+                        <bean class="org.apache.ignite.platform.PlatformAttributeNodeFilter" />
+                    </property>
+                </bean>
+            </list>
+        </property>
+
+        <property name="discoverySpi">
+            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+                <property name="ipFinder">
+                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+                        <property name="addresses">
+                            <list>
+                                <!-- In distributed environment, replace with actual host IP address. -->
+                                <value>127.0.0.1:47500..47501</value>
+                            </list>
+                        </property>
+                    </bean>
+                </property>
+            </bean>
+        </property>
+    </bean>
+</beans>

http://git-wip-us.apache.org/repos/asf/ignite/blob/fb9e9b78/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTestTopologyChange.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTestTopologyChange.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTestTopologyChange.cs
new file mode 100644
index 0000000..c1f2c53
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTestTopologyChange.cs
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Dataload
+{
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Streamer test with topology change.
+    /// </summary>
+    public class DataStreamerTestTopologyChange
+    {
+        /// <summary>
+        /// Tests the streamer on a node without cache.
+        /// </summary>
+        [Test]
+        public void TestNoCacheNode()
+        {
+            const string cacheName = "cache";
+
+            var cacheNodeCfg = new IgniteConfiguration(TestUtils.GetTestConfiguration())
+            {
+                SpringConfigUrl = @"Config\cache-local-node.xml",
+                GridName = "cacheGrid"
+            };
+
+            using (var gridNoCache = Ignition.Start(TestUtils.GetTestConfiguration()))
+            {
+                Assert.Throws<ArgumentException>(() => gridNoCache.GetCache<int, int>(cacheName));
+
+                var gridWithCache = Ignition.Start(cacheNodeCfg);
+
+                var streamer = gridNoCache.GetDataStreamer<int, int>(cacheName);
+
+                streamer.AddData(1, 2);
+                streamer.Flush();
+
+                Ignition.Stop(gridWithCache.Name, true);
+
+                Thread.Sleep(500);  // Wait for node to stop
+
+                var task = streamer.AddData(2, 3);
+                streamer.Flush();
+
+                AssertThrowsCacheStopped(task);
+            }
+        }
+
+        /// <summary>
+        /// Streamer test with destroyed cache.
+        /// </summary>
+        [Test]
+        public void TestDestroyCache()
+        {
+            const string cacheName = "cache";
+
+            using (var grid = Ignition.Start(TestUtils.GetTestConfiguration()))
+            {
+                grid.CreateCache<int, int>(cacheName);
+
+                var streamer = grid.GetDataStreamer<int, int>(cacheName);
+
+                var task = streamer.AddData(1, 2);
+                streamer.Flush();
+                task.Wait();
+
+                grid.DestroyCache(cacheName);
+
+                task = streamer.AddData(2, 3);
+                streamer.Flush();
+
+                AssertThrowsCacheStopped(task);
+            }
+        }
+
+        /// <summary>
+        /// Asserts that cache stopped error is thrown.
+        /// </summary>
+        private static void AssertThrowsCacheStopped(Task task)
+        {
+            var ex = Assert.Throws<AggregateException>(task.Wait);
+            Assert.IsTrue(ex.InnerException.Message.Contains(
+                "Failed to find server node for cache " +
+                "(all affinity nodes have left the grid or cache was stopped):"));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/fb9e9b78/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
index 6066504..74261d3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
 {
     using System;
     using System.Collections.Generic;
+    using System.Diagnostics;
     using System.Diagnostics.CodeAnalysis;
     using System.Threading;
     using System.Threading.Tasks;
@@ -86,7 +87,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
         private long _topVer;
 
         /** Topology size. */
-        private int _topSize;
+        private int _topSize = 1;
         
         /** Buffer send size. */
         private volatile int _bufSndSize;
@@ -568,9 +569,9 @@ namespace Apache.Ignite.Core.Impl.Datastream
                 if (_topVer < topVer)
                 {
                     _topVer = topVer;
-                    _topSize = topSize;
+                    _topSize = topSize > 0 ? topSize : 1;  // Do not set to 0 to avoid 0 buffer size.
 
-                    _bufSndSize = topSize * UU.DataStreamerPerNodeBufferSizeGet(Target);
+                    _bufSndSize = _topSize * UU.DataStreamerPerNodeBufferSizeGet(Target);
                 }
             }
             finally
@@ -590,6 +591,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
         {
             int bufSndSize0 = _bufSndSize;
 
+            Debug.Assert(bufSndSize0 > 0);
+
             while (true)
             {
                 var batch0 = _batch;