You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/09/09 14:37:57 UTC

[45/50] ignite git commit: IGNITE-1391: Fixed deadlock in discovery message processing caused by platform latch.

IGNITE-1391: Fixed deadlock in discovery message processing caused by platform latch.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a7631980
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a7631980
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a7631980

Branch: refs/heads/ignite-843
Commit: a7631980ef5c0c359e8d74ae77f5d085287da139
Parents: f7230da
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Wed Sep 9 13:31:08 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Sep 9 13:31:08 2015 +0300

----------------------------------------------------------------------
 .../callback/PlatformCallbackGateway.java       |  5 +-
 .../callback/PlatformCallbackUtils.java         |  3 +-
 .../cpp/common/include/ignite/common/java.h     |  4 +-
 .../platform/src/main/cpp/common/src/java.cpp   |  8 +--
 .../main/dotnet/Apache.Ignite.Core/Ignition.cs  | 28 ++++++--
 .../dotnet/Apache.Ignite.Core/Impl/Ignite.cs    |  8 ++-
 .../Impl/Unmanaged/UnmanagedCallbacks.cs        |  8 ++-
 .../Impl/Unmanaged/UnmanagedUtils.cs            |  2 +-
 .../platform/PlatformProcessorImpl.java         |  2 +-
 .../IgniteStartStopTest.cs                      | 70 ++++++++++++++++++++
 10 files changed, 114 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
index a348888..5d5cdb8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
@@ -737,13 +737,14 @@ public class PlatformCallbackGateway {
     /**
      * Kernal start callback.
      *
+     * @param proc Platform processor.
      * @param memPtr Memory pointer.
      */
-    public void onStart(long memPtr) {
+    public void onStart(Object proc, long memPtr) {
         enter();
 
         try {
-            PlatformCallbackUtils.onStart(envPtr, memPtr);
+            PlatformCallbackUtils.onStart(envPtr, proc, memPtr);
         }
         finally {
             leave();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
index dd43e0d..64749ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
@@ -384,9 +384,10 @@ public class PlatformCallbackUtils {
      * Kernal start callback.
      *
      * @param envPtr Environment pointer.
+     * @param proc Platform processor.
      * @param memPtr Memory pointer.
      */
-    static native void onStart(long envPtr, long memPtr);
+    static native void onStart(long envPtr, Object proc, long memPtr);
 
     /*
      * Kernal stop callback.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/platform/src/main/cpp/common/include/ignite/common/java.h
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/common/include/ignite/common/java.h b/modules/platform/src/main/cpp/common/include/ignite/common/java.h
index e2d23b2..01ecbe3 100644
--- a/modules/platform/src/main/cpp/common/include/ignite/common/java.h
+++ b/modules/platform/src/main/cpp/common/include/ignite/common/java.h
@@ -96,7 +96,7 @@ namespace ignite
 
             typedef long long(JNICALL *NodeInfoHandler)(void* target, long long memPtr);
 
-            typedef void(JNICALL *OnStartHandler)(void* target, long long memPtr);
+            typedef void(JNICALL *OnStartHandler)(void* target, void* proc, long long memPtr);
             typedef void(JNICALL *OnStopHandler)(void* target);
             typedef void(JNICALL *ErrorHandler)(void* target, int errCode, const char* errClsChars, int errClsCharsLen, const char* errMsgChars, int errMsgCharsLen, void* errData, int errDataLen);
 
@@ -640,7 +640,7 @@ namespace ignite
 
             JNIEXPORT jlong JNICALL JniNodeInfo(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr);
 
-            JNIEXPORT void JNICALL JniOnStart(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr);
+            JNIEXPORT void JNICALL JniOnStart(JNIEnv *env, jclass cls, jlong envPtr, jobject proc, jlong memPtr);
             JNIEXPORT void JNICALL JniOnStop(JNIEnv *env, jclass cls, jlong envPtr);
 
             JNIEXPORT jlong JNICALL JniExtensionCallbackInLongOutLong(JNIEnv *env, jclass cls, jlong envPtr, jint typ, jlong arg1);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/platform/src/main/cpp/common/src/java.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/common/src/java.cpp b/modules/platform/src/main/cpp/common/src/java.cpp
index bb4cc20..492e7b6 100644
--- a/modules/platform/src/main/cpp/common/src/java.cpp
+++ b/modules/platform/src/main/cpp/common/src/java.cpp
@@ -311,7 +311,7 @@ namespace ignite
 
             JniMethod M_PLATFORM_CALLBACK_UTILS_MEMORY_REALLOCATE = JniMethod("memoryReallocate", "(JJI)V", true);
 
-            JniMethod M_PLATFORM_CALLBACK_UTILS_ON_START = JniMethod("onStart", "(JJ)V", true);
+            JniMethod M_PLATFORM_CALLBACK_UTILS_ON_START = JniMethod("onStart", "(JLjava/lang/Object;J)V", true);
             JniMethod M_PLATFORM_CALLBACK_UTILS_ON_STOP = JniMethod("onStop", "(J)V", true);
 
             JniMethod M_PLATFORM_CALLBACK_UTILS_EXTENSION_CALLBACK_IN_LONG_OUT_LONG = JniMethod("extensionCallbackInLongOutLong", "(JIJ)J", true);
@@ -322,7 +322,7 @@ namespace ignite
             JniMethod M_PLATFORM_UTILS_ERR_DATA = JniMethod("errorData", "(Ljava/lang/Throwable;)[B", true);
 
             const char* C_PLATFORM_IGNITION = "org/apache/ignite/internal/processors/platform/PlatformIgnition";
-            JniMethod M_PLATFORM_IGNITION_START = JniMethod("start", "(Ljava/lang/String;Ljava/lang/String;IJJ)Lorg/apache/ignite/internal/processors/platform/PlatformProcessor;", true);
+            JniMethod M_PLATFORM_IGNITION_START = JniMethod("start", "(Ljava/lang/String;Ljava/lang/String;IJJ)V", true);
             JniMethod M_PLATFORM_IGNITION_INSTANCE = JniMethod("instance", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformProcessor;", true);
             JniMethod M_PLATFORM_IGNITION_ENVIRONMENT_POINTER = JniMethod("environmentPointer", "(Ljava/lang/String;)J", true);
             JniMethod M_PLATFORM_IGNITION_STOP = JniMethod("stop", "(Ljava/lang/String;Z)Z", true);
@@ -2185,8 +2185,8 @@ namespace ignite
                 IGNITE_SAFE_FUNC(env, envPtr, NodeInfoHandler, nodeInfo, memPtr);
             }
 
-            JNIEXPORT void JNICALL JniOnStart(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr) {
-                IGNITE_SAFE_PROC(env, envPtr, OnStartHandler, onStart, memPtr);
+            JNIEXPORT void JNICALL JniOnStart(JNIEnv *env, jclass cls, jlong envPtr, jobject proc, jlong memPtr) {
+                IGNITE_SAFE_PROC(env, envPtr, OnStartHandler, onStart, env->NewGlobalRef(proc), memPtr);
             }
 
             JNIEXPORT void JNICALL JniOnStop(JNIEnv *env, jclass cls, jlong envPtr) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/platform/src/main/dotnet/Apache.Ignite.Core/Ignition.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Ignition.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Ignition.cs
index ef79008..c9de62a 100644
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Ignition.cs
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Ignition.cs
@@ -172,19 +172,19 @@ namespace Apache.Ignite.Core
                 sbyte* gridName0 = IgniteUtils.StringToUtf8Unmanaged(gridName);
 
                 // 3. Create startup object which will guide us through the rest of the process.
-                _startup = new Startup(cfg) { Context = ctx };
+                _startup = new Startup(cfg, cbs) { Context = ctx };
 
                 IUnmanagedTarget interopProc = null;
 
                 try
                 {
                     // 4. Initiate Ignite start.
-                    interopProc = UU.IgnitionStart(cbs.Context, cfg.SpringConfigUrl ?? DefaultCfg, 
+                    UU.IgnitionStart(cbs.Context, cfg.SpringConfigUrl ?? DefaultCfg,
                         cfgEx != null ? cfgEx.GridName : null, ClientMode);
 
                     // 5. At this point start routine is finished. We expect STARTUP object to have all necessary data.
-                    Ignite node = new Ignite(cfg, _startup.Name, interopProc, _startup.Marshaller, 
-                        _startup.LifecycleBeans, cbs);
+                    var node = _startup.Ignite;
+                    interopProc = node.InteropProcessor;
 
                     // 6. On-start callback (notify lifecycle components).
                     node.OnStart();
@@ -351,8 +351,9 @@ namespace Apache.Ignite.Core
         /// <summary>
         /// Kernal start callback.
         /// </summary>
+        /// <param name="interopProc">Interop processor.</param>
         /// <param name="stream">Stream.</param>
-        internal static void OnStart(IPortableStream stream)
+        internal static void OnStart(IUnmanagedTarget interopProc, IPortableStream stream)
         {
             try
             {
@@ -368,6 +369,8 @@ namespace Apache.Ignite.Core
                 if (Nodes.ContainsKey(new NodeKey(name)))
                     throw new IgniteException("Ignite with the same name already started: " + name);
 
+                _startup.Ignite = new Ignite(_startup.Configuration, _startup.Name, interopProc, _startup.Marshaller, 
+                    _startup.LifecycleBeans, _startup.Callbacks);
             }
             catch (Exception e)
             {
@@ -604,17 +607,23 @@ namespace Apache.Ignite.Core
             /// Constructor.
             /// </summary>
             /// <param name="cfg">Configuration.</param>
-            internal Startup(IgniteConfiguration cfg)
+            /// <param name="cbs"></param>
+            internal Startup(IgniteConfiguration cfg, UnmanagedCallbacks cbs)
             {
                 Configuration = cfg;
+                Callbacks = cbs;
             }
-
             /// <summary>
             /// Configuration.
             /// </summary>
             internal IgniteConfiguration Configuration { get; private set; }
 
             /// <summary>
+            /// Gets unmanaged callbacks.
+            /// </summary>
+            internal UnmanagedCallbacks Callbacks { get; private set; }
+
+            /// <summary>
             /// Lifecycle beans.
             /// </summary>
             internal IList<LifecycleBeanHolder> LifecycleBeans { get; set; }
@@ -638,6 +647,11 @@ namespace Apache.Ignite.Core
             /// Gets or sets the context.
             /// </summary>
             internal void* Context { get; set; }
+
+            /// <summary>
+            /// Gets or sets the ignite.
+            /// </summary>
+            internal Ignite Ignite { get; set; }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Ignite.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Ignite.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Ignite.cs
index 7e33416..c5025b2 100644
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Ignite.cs
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Ignite.cs
@@ -74,7 +74,7 @@ namespace Apache.Ignite.Core.Impl
         private IClusterNode _locNode;
 
         /** Transactions facade. */
-        private readonly TransactionsImpl _transactions;
+        private readonly Lazy<TransactionsImpl> _transactions;
 
         /** Callbacks */
         private readonly UnmanagedCallbacks _cbs;
@@ -119,7 +119,9 @@ namespace Apache.Ignite.Core.Impl
 
             cbs.Initialize(this);
 
-            _transactions = new TransactionsImpl(UU.ProcessorTransactions(proc), marsh, LocalNode.Id);
+            // Grid is not completely started here, can't initialize interop transactions right away.
+            _transactions = new Lazy<TransactionsImpl>(
+                    () => new TransactionsImpl(UU.ProcessorTransactions(proc), marsh, LocalNode.Id));
         }
 
         /// <summary>
@@ -423,7 +425,7 @@ namespace Apache.Ignite.Core.Impl
         /** <inheritdoc /> */
         public ITransactions Transactions
         {
-            get { return _transactions; }
+            get { return _transactions.Value; }
         }
 
         /** <inheritdoc /> */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
index 80b33df..9edf2ef 100644
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
@@ -153,7 +153,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
 
         private delegate void NodeInfoCallbackDelegate(void* target, long memPtr);
 
-        private delegate void OnStartCallbackDelegate(void* target, long memPtr);
+        private delegate void OnStartCallbackDelegate(void* target, void* proc, long memPtr);
         private delegate void OnStopCallbackDelegate(void* target);
         
         private delegate void ErrorCallbackDelegate(void* target, int errType, sbyte* errClsChars, int errClsCharsLen, sbyte* errMsgChars, int errMsgCharsLen, void* errData, int errDataLen);
@@ -1001,11 +1001,13 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
             }, true);
         }
 
-        private void OnStart(void* target, long memPtr)
+        private void OnStart(void* target, void* proc, long memPtr)
         {
             SafeCall(() =>
             {
-                Ignition.OnStart(IgniteManager.Memory.Get(memPtr).Stream());
+                var proc0 = new UnmanagedTarget(_ctx, proc);
+
+                Ignition.OnStart(proc0, IgniteManager.Memory.Get(memPtr).Stream());
             }, true);
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
index 9ec2668..4bea392 100644
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
@@ -527,7 +527,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
 
         #region NATIVE METHODS: PROCESSOR
 
-        internal static IUnmanagedTarget IgnitionStart(UnmanagedContext ctx, string cfgPath, string gridName, 
+        internal static IUnmanagedTarget IgnitionStart(UnmanagedContext ctx, string cfgPath, string gridName,
             bool clientMode)
         {
             using (var mem = IgniteManager.Memory.Allocate().Stream())

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
index e5e7a57..40b1334 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
@@ -126,7 +126,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
 
             out.synchronize();
 
-            platformCtx.gateway().onStart(mem.pointer());
+            platformCtx.gateway().onStart(this, mem.pointer());
         }
 
         // At this moment all necessary native libraries must be loaded, so we can process with store creation.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs
index ec7e157..2db1781 100644
--- a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs
@@ -19,8 +19,12 @@ namespace Apache.Ignite.Core.Tests
 {
     using System;
     using System.Collections.Generic;
+    using System.IO;
     using System.Threading;
+    using System.Threading.Tasks;
     using Apache.Ignite.Core.Common;
+    using Apache.Ignite.Core.Messaging;
+    using Apache.Ignite.Core.Tests.Process;
     using NUnit.Framework;
 
     /// <summary>
@@ -44,6 +48,7 @@ namespace Apache.Ignite.Core.Tests
         [TearDown]
         public void TearDown()
         {
+            TestUtils.KillProcesses();
             Ignition.StopAll(true);
         }
 
@@ -348,5 +353,70 @@ namespace Apache.Ignite.Core.Tests
 
             Assert.AreEqual(1, cache.Get(1));
         }
+
+        /// <summary>
+        /// Tests the processor initialization and grid usage right after topology enter.
+        /// </summary>
+        [Test]
+        public void TestProcessorInit()
+        {
+            var cfg = new IgniteConfiguration
+            {
+                SpringConfigUrl = "config\\start-test-grid1.xml",
+                JvmOptions = TestUtils.TestJavaOptions(),
+                JvmClasspath = TestUtils.CreateTestClasspath()
+            };
+
+            // Start local node
+            var grid = Ignition.Start(cfg);
+
+            // Start remote node in a separate process
+            // ReSharper disable once UnusedVariable
+            var proc = new IgniteProcess(
+                "-jvmClasspath=" + TestUtils.CreateTestClasspath(),
+                "-springConfigUrl=" + Path.GetFullPath(cfg.SpringConfigUrl),
+                "-J-Xms512m", "-J-Xmx512m");
+
+            var cts = new CancellationTokenSource();
+            var token = cts.Token;
+
+            // Spam message subscriptions on a separate thread 
+            // to test race conditions during processor init on remote node
+            var listenTask = Task.Factory.StartNew(() =>
+            {
+                var filter = new MessageFilter();
+
+                while (!token.IsCancellationRequested)
+                {
+                    var listenId = grid.Message().RemoteListen(filter);
+
+                    grid.Message().StopRemoteListen(listenId);
+                }
+                // ReSharper disable once FunctionNeverReturns
+            });
+
+            // Wait for remote node to join
+            Assert.IsTrue(grid.WaitTopology(2, 30000));
+
+            // Wait some more for initialization
+            Thread.Sleep(1000);
+
+            // Cancel listen task and check that it finishes
+            cts.Cancel();
+            Assert.IsTrue(listenTask.Wait(5000));
+        }
+
+        /// <summary>
+        /// Noop message filter.
+        /// </summary>
+        [Serializable]
+        private class MessageFilter : IMessageFilter<int>
+        {
+            /** <inheritdoc /> */
+            public bool Invoke(Guid nodeId, int message)
+            {
+                return true;
+            }
+        }
     }
 }