You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/28 08:34:08 UTC
[12/16] ignite git commit: IGNITE-2874: .NET: Fixed IDataStreamer
performance regression caused by incorrect topology size calculation. This
closes #572.
IGNITE-2874: .NET: Fixed IDataStreamer performance regression caused by incorrect topology size calculation. This closes #572.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fb9e9b78
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fb9e9b78
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fb9e9b78
Branch: refs/heads/ignite-gg-10994
Commit: fb9e9b78520a6258422e9e2f498f5e9331ae5197
Parents: 166bce8
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Fri Mar 25 16:59:05 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Mar 25 16:59:05 2016 +0300
----------------------------------------------------------------------
.../datastreamer/PlatformDataStreamer.java | 7 +-
.../platform/PlatformAttributeNodeFilter.java | 31 ++++++
.../Apache.Ignite.Core.Tests.csproj | 5 +
.../Config/cache-local-node.xml | 65 ++++++++++++
.../Dataload/DataStreamerTestTopologyChange.cs | 104 +++++++++++++++++++
.../Impl/Datastream/DataStreamerImpl.cs | 9 +-
6 files changed, 215 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/fb9e9b78/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
index 07ef4f2..78d5d86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
@@ -162,10 +162,11 @@ public class PlatformDataStreamer extends PlatformAbstractTarget {
GridDiscoveryManager discoMgr = platformCtx.kernalContext().discovery();
- long topVer = discoMgr.topologyVersion();
- int topSize = discoMgr.cacheNodes(cacheName, new AffinityTopologyVersion(topVer)).size();
+ AffinityTopologyVersion topVer = discoMgr.topologyVersionEx();
- platformCtx.gateway().dataStreamerTopologyUpdate(ptr, topVer, topSize);
+ int topSize = discoMgr.cacheNodes(cacheName, topVer).size();
+
+ platformCtx.gateway().dataStreamerTopologyUpdate(ptr, topVer.topologyVersion(), topSize);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/fb9e9b78/modules/core/src/test/java/org/apache/ignite/platform/PlatformAttributeNodeFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformAttributeNodeFilter.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformAttributeNodeFilter.java
new file mode 100644
index 0000000..bbd08a4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformAttributeNodeFilter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.platform;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.lang.IgnitePredicate;
+
+/**
+ * Node filter that allows nodes with an attribute.
+ */
+public class PlatformAttributeNodeFilter implements IgnitePredicate<ClusterNode> {
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode node) {
+ return node.attributes().containsKey("platformAttributeNode");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fb9e9b78/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
index 7cc9296..38ddd4d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
@@ -114,6 +114,7 @@
<Compile Include="Compute\TaskAdapterTest.cs" />
<Compile Include="Compute\TaskResultTest.cs" />
<Compile Include="Dataload\DataStreamerTest.cs" />
+ <Compile Include="Dataload\DataStreamerTestTopologyChange.cs" />
<Compile Include="DataStructures\AtomicLongTest.cs" />
<Compile Include="DataStructures\AtomicReferenceTest.cs" />
<Compile Include="DataStructures\AtomicSequenceTest.cs" />
@@ -188,6 +189,10 @@
<Content Include="Config\cache-binarizables.xml">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</Content>
+ <Content Include="Config\cache-local-node.xml">
+ <SubType>Designer</SubType>
+ <CopyToOutputDirectory>Always</CopyToOutputDirectory>
+ </Content>
<Content Include="Config\cache-query-continuous.xml">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</Content>
http://git-wip-us.apache.org/repos/asf/ignite/blob/fb9e9b78/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/cache-local-node.xml
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/cache-local-node.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/cache-local-node.xml
new file mode 100644
index 0000000..d6e4dd2
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/cache-local-node.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="localHost" value="127.0.0.1"/>
+ <property name="connectorConfiguration"><null/></property>
+
+ <property name="userAttributes">
+ <map>
+ <entry key="platformAttributeNode" value="true"/>
+ </map>
+ </property>
+
+
+ <property name="cacheConfiguration">
+ <list>
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="cache"/>
+ <property name="nodeFilter">
+ <bean class="org.apache.ignite.platform.PlatformAttributeNodeFilter" />
+ </property>
+ </bean>
+ </list>
+ </property>
+
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+ <property name="addresses">
+ <list>
+ <!-- In distributed environment, replace with actual host IP address. -->
+ <value>127.0.0.1:47500..47501</value>
+ </list>
+ </property>
+ </bean>
+ </property>
+ </bean>
+ </property>
+ </bean>
+</beans>
http://git-wip-us.apache.org/repos/asf/ignite/blob/fb9e9b78/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTestTopologyChange.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTestTopologyChange.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTestTopologyChange.cs
new file mode 100644
index 0000000..c1f2c53
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTestTopologyChange.cs
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Dataload
+{
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Streamer test with topology change.
+ /// </summary>
+ public class DataStreamerTestTopologyChange
+ {
+ /// <summary>
+ /// Tests the streamer on a node without cache.
+ /// </summary>
+ [Test]
+ public void TestNoCacheNode()
+ {
+ const string cacheName = "cache";
+
+ var cacheNodeCfg = new IgniteConfiguration(TestUtils.GetTestConfiguration())
+ {
+ SpringConfigUrl = @"Config\cache-local-node.xml",
+ GridName = "cacheGrid"
+ };
+
+ using (var gridNoCache = Ignition.Start(TestUtils.GetTestConfiguration()))
+ {
+ Assert.Throws<ArgumentException>(() => gridNoCache.GetCache<int, int>(cacheName));
+
+ var gridWithCache = Ignition.Start(cacheNodeCfg);
+
+ var streamer = gridNoCache.GetDataStreamer<int, int>(cacheName);
+
+ streamer.AddData(1, 2);
+ streamer.Flush();
+
+ Ignition.Stop(gridWithCache.Name, true);
+
+ Thread.Sleep(500); // Wait for node to stop
+
+ var task = streamer.AddData(2, 3);
+ streamer.Flush();
+
+ AssertThrowsCacheStopped(task);
+ }
+ }
+
+ /// <summary>
+ /// Streamer test with destroyed cache.
+ /// </summary>
+ [Test]
+ public void TestDestroyCache()
+ {
+ const string cacheName = "cache";
+
+ using (var grid = Ignition.Start(TestUtils.GetTestConfiguration()))
+ {
+ grid.CreateCache<int, int>(cacheName);
+
+ var streamer = grid.GetDataStreamer<int, int>(cacheName);
+
+ var task = streamer.AddData(1, 2);
+ streamer.Flush();
+ task.Wait();
+
+ grid.DestroyCache(cacheName);
+
+ task = streamer.AddData(2, 3);
+ streamer.Flush();
+
+ AssertThrowsCacheStopped(task);
+ }
+ }
+
+ /// <summary>
+ /// Asserts that cache stopped error is thrown.
+ /// </summary>
+ private static void AssertThrowsCacheStopped(Task task)
+ {
+ var ex = Assert.Throws<AggregateException>(task.Wait);
+ Assert.IsTrue(ex.InnerException.Message.Contains(
+ "Failed to find server node for cache " +
+ "(all affinity nodes have left the grid or cache was stopped):"));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fb9e9b78/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
index 6066504..74261d3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
{
using System;
using System.Collections.Generic;
+ using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
@@ -86,7 +87,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
private long _topVer;
/** Topology size. */
- private int _topSize;
+ private int _topSize = 1;
/** Buffer send size. */
private volatile int _bufSndSize;
@@ -568,9 +569,9 @@ namespace Apache.Ignite.Core.Impl.Datastream
if (_topVer < topVer)
{
_topVer = topVer;
- _topSize = topSize;
+ _topSize = topSize > 0 ? topSize : 1; // Do not set to 0 to avoid 0 buffer size.
- _bufSndSize = topSize * UU.DataStreamerPerNodeBufferSizeGet(Target);
+ _bufSndSize = _topSize * UU.DataStreamerPerNodeBufferSizeGet(Target);
}
}
finally
@@ -590,6 +591,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
{
int bufSndSize0 = _bufSndSize;
+ Debug.Assert(bufSndSize0 > 0);
+
while (true)
{
var batch0 = _batch;