You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/09/09 12:30:28 UTC
ignite git commit: IGNITE-1391: Fixed deadlock in discovery message
processing caused by platform latch.
Repository: ignite
Updated Branches:
refs/heads/master f7230daeb -> a7631980e
IGNITE-1391: Fixed deadlock in discovery message processing caused by platform latch.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a7631980
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a7631980
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a7631980
Branch: refs/heads/master
Commit: a7631980ef5c0c359e8d74ae77f5d085287da139
Parents: f7230da
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Wed Sep 9 13:31:08 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Sep 9 13:31:08 2015 +0300
----------------------------------------------------------------------
.../callback/PlatformCallbackGateway.java | 5 +-
.../callback/PlatformCallbackUtils.java | 3 +-
.../cpp/common/include/ignite/common/java.h | 4 +-
.../platform/src/main/cpp/common/src/java.cpp | 8 +--
.../main/dotnet/Apache.Ignite.Core/Ignition.cs | 28 ++++++--
.../dotnet/Apache.Ignite.Core/Impl/Ignite.cs | 8 ++-
.../Impl/Unmanaged/UnmanagedCallbacks.cs | 8 ++-
.../Impl/Unmanaged/UnmanagedUtils.cs | 2 +-
.../platform/PlatformProcessorImpl.java | 2 +-
.../IgniteStartStopTest.cs | 70 ++++++++++++++++++++
10 files changed, 114 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
index a348888..5d5cdb8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
@@ -737,13 +737,14 @@ public class PlatformCallbackGateway {
/**
* Kernal start callback.
*
+ * @param proc Platform processor.
* @param memPtr Memory pointer.
*/
- public void onStart(long memPtr) {
+ public void onStart(Object proc, long memPtr) {
enter();
try {
- PlatformCallbackUtils.onStart(envPtr, memPtr);
+ PlatformCallbackUtils.onStart(envPtr, proc, memPtr);
}
finally {
leave();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
index dd43e0d..64749ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
@@ -384,9 +384,10 @@ public class PlatformCallbackUtils {
* Kernal start callback.
*
* @param envPtr Environment pointer.
+ * @param proc Platform processor.
* @param memPtr Memory pointer.
*/
- static native void onStart(long envPtr, long memPtr);
+ static native void onStart(long envPtr, Object proc, long memPtr);
/*
* Kernal stop callback.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/platform/src/main/cpp/common/include/ignite/common/java.h
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/common/include/ignite/common/java.h b/modules/platform/src/main/cpp/common/include/ignite/common/java.h
index e2d23b2..01ecbe3 100644
--- a/modules/platform/src/main/cpp/common/include/ignite/common/java.h
+++ b/modules/platform/src/main/cpp/common/include/ignite/common/java.h
@@ -96,7 +96,7 @@ namespace ignite
typedef long long(JNICALL *NodeInfoHandler)(void* target, long long memPtr);
- typedef void(JNICALL *OnStartHandler)(void* target, long long memPtr);
+ typedef void(JNICALL *OnStartHandler)(void* target, void* proc, long long memPtr);
typedef void(JNICALL *OnStopHandler)(void* target);
typedef void(JNICALL *ErrorHandler)(void* target, int errCode, const char* errClsChars, int errClsCharsLen, const char* errMsgChars, int errMsgCharsLen, void* errData, int errDataLen);
@@ -640,7 +640,7 @@ namespace ignite
JNIEXPORT jlong JNICALL JniNodeInfo(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr);
- JNIEXPORT void JNICALL JniOnStart(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr);
+ JNIEXPORT void JNICALL JniOnStart(JNIEnv *env, jclass cls, jlong envPtr, jobject proc, jlong memPtr);
JNIEXPORT void JNICALL JniOnStop(JNIEnv *env, jclass cls, jlong envPtr);
JNIEXPORT jlong JNICALL JniExtensionCallbackInLongOutLong(JNIEnv *env, jclass cls, jlong envPtr, jint typ, jlong arg1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/platform/src/main/cpp/common/src/java.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/common/src/java.cpp b/modules/platform/src/main/cpp/common/src/java.cpp
index bb4cc20..492e7b6 100644
--- a/modules/platform/src/main/cpp/common/src/java.cpp
+++ b/modules/platform/src/main/cpp/common/src/java.cpp
@@ -311,7 +311,7 @@ namespace ignite
JniMethod M_PLATFORM_CALLBACK_UTILS_MEMORY_REALLOCATE = JniMethod("memoryReallocate", "(JJI)V", true);
- JniMethod M_PLATFORM_CALLBACK_UTILS_ON_START = JniMethod("onStart", "(JJ)V", true);
+ JniMethod M_PLATFORM_CALLBACK_UTILS_ON_START = JniMethod("onStart", "(JLjava/lang/Object;J)V", true);
JniMethod M_PLATFORM_CALLBACK_UTILS_ON_STOP = JniMethod("onStop", "(J)V", true);
JniMethod M_PLATFORM_CALLBACK_UTILS_EXTENSION_CALLBACK_IN_LONG_OUT_LONG = JniMethod("extensionCallbackInLongOutLong", "(JIJ)J", true);
@@ -322,7 +322,7 @@ namespace ignite
JniMethod M_PLATFORM_UTILS_ERR_DATA = JniMethod("errorData", "(Ljava/lang/Throwable;)[B", true);
const char* C_PLATFORM_IGNITION = "org/apache/ignite/internal/processors/platform/PlatformIgnition";
- JniMethod M_PLATFORM_IGNITION_START = JniMethod("start", "(Ljava/lang/String;Ljava/lang/String;IJJ)Lorg/apache/ignite/internal/processors/platform/PlatformProcessor;", true);
+ JniMethod M_PLATFORM_IGNITION_START = JniMethod("start", "(Ljava/lang/String;Ljava/lang/String;IJJ)V", true);
JniMethod M_PLATFORM_IGNITION_INSTANCE = JniMethod("instance", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformProcessor;", true);
JniMethod M_PLATFORM_IGNITION_ENVIRONMENT_POINTER = JniMethod("environmentPointer", "(Ljava/lang/String;)J", true);
JniMethod M_PLATFORM_IGNITION_STOP = JniMethod("stop", "(Ljava/lang/String;Z)Z", true);
@@ -2185,8 +2185,8 @@ namespace ignite
IGNITE_SAFE_FUNC(env, envPtr, NodeInfoHandler, nodeInfo, memPtr);
}
- JNIEXPORT void JNICALL JniOnStart(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr) {
- IGNITE_SAFE_PROC(env, envPtr, OnStartHandler, onStart, memPtr);
+ JNIEXPORT void JNICALL JniOnStart(JNIEnv *env, jclass cls, jlong envPtr, jobject proc, jlong memPtr) {
+ IGNITE_SAFE_PROC(env, envPtr, OnStartHandler, onStart, env->NewGlobalRef(proc), memPtr);
}
JNIEXPORT void JNICALL JniOnStop(JNIEnv *env, jclass cls, jlong envPtr) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/platform/src/main/dotnet/Apache.Ignite.Core/Ignition.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Ignition.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Ignition.cs
index ef79008..c9de62a 100644
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Ignition.cs
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Ignition.cs
@@ -172,19 +172,19 @@ namespace Apache.Ignite.Core
sbyte* gridName0 = IgniteUtils.StringToUtf8Unmanaged(gridName);
// 3. Create startup object which will guide us through the rest of the process.
- _startup = new Startup(cfg) { Context = ctx };
+ _startup = new Startup(cfg, cbs) { Context = ctx };
IUnmanagedTarget interopProc = null;
try
{
// 4. Initiate Ignite start.
- interopProc = UU.IgnitionStart(cbs.Context, cfg.SpringConfigUrl ?? DefaultCfg,
+ UU.IgnitionStart(cbs.Context, cfg.SpringConfigUrl ?? DefaultCfg,
cfgEx != null ? cfgEx.GridName : null, ClientMode);
// 5. At this point start routine is finished. We expect STARTUP object to have all necessary data.
- Ignite node = new Ignite(cfg, _startup.Name, interopProc, _startup.Marshaller,
- _startup.LifecycleBeans, cbs);
+ var node = _startup.Ignite;
+ interopProc = node.InteropProcessor;
// 6. On-start callback (notify lifecycle components).
node.OnStart();
@@ -351,8 +351,9 @@ namespace Apache.Ignite.Core
/// <summary>
/// Kernal start callback.
/// </summary>
+ /// <param name="interopProc">Interop processor.</param>
/// <param name="stream">Stream.</param>
- internal static void OnStart(IPortableStream stream)
+ internal static void OnStart(IUnmanagedTarget interopProc, IPortableStream stream)
{
try
{
@@ -368,6 +369,8 @@ namespace Apache.Ignite.Core
if (Nodes.ContainsKey(new NodeKey(name)))
throw new IgniteException("Ignite with the same name already started: " + name);
+ _startup.Ignite = new Ignite(_startup.Configuration, _startup.Name, interopProc, _startup.Marshaller,
+ _startup.LifecycleBeans, _startup.Callbacks);
}
catch (Exception e)
{
@@ -604,17 +607,23 @@ namespace Apache.Ignite.Core
/// Constructor.
/// </summary>
/// <param name="cfg">Configuration.</param>
- internal Startup(IgniteConfiguration cfg)
+ /// <param name="cbs"></param>
+ internal Startup(IgniteConfiguration cfg, UnmanagedCallbacks cbs)
{
Configuration = cfg;
+ Callbacks = cbs;
}
-
/// <summary>
/// Configuration.
/// </summary>
internal IgniteConfiguration Configuration { get; private set; }
/// <summary>
+ /// Gets unmanaged callbacks.
+ /// </summary>
+ internal UnmanagedCallbacks Callbacks { get; private set; }
+
+ /// <summary>
/// Lifecycle beans.
/// </summary>
internal IList<LifecycleBeanHolder> LifecycleBeans { get; set; }
@@ -638,6 +647,11 @@ namespace Apache.Ignite.Core
/// Gets or sets the context.
/// </summary>
internal void* Context { get; set; }
+
+ /// <summary>
+ /// Gets or sets the ignite.
+ /// </summary>
+ internal Ignite Ignite { get; set; }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Ignite.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Ignite.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Ignite.cs
index 7e33416..c5025b2 100644
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Ignite.cs
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Ignite.cs
@@ -74,7 +74,7 @@ namespace Apache.Ignite.Core.Impl
private IClusterNode _locNode;
/** Transactions facade. */
- private readonly TransactionsImpl _transactions;
+ private readonly Lazy<TransactionsImpl> _transactions;
/** Callbacks */
private readonly UnmanagedCallbacks _cbs;
@@ -119,7 +119,9 @@ namespace Apache.Ignite.Core.Impl
cbs.Initialize(this);
- _transactions = new TransactionsImpl(UU.ProcessorTransactions(proc), marsh, LocalNode.Id);
+ // Grid is not completely started here, can't initialize interop transactions right away.
+ _transactions = new Lazy<TransactionsImpl>(
+ () => new TransactionsImpl(UU.ProcessorTransactions(proc), marsh, LocalNode.Id));
}
/// <summary>
@@ -423,7 +425,7 @@ namespace Apache.Ignite.Core.Impl
/** <inheritdoc /> */
public ITransactions Transactions
{
- get { return _transactions; }
+ get { return _transactions.Value; }
}
/** <inheritdoc /> */
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
index 80b33df..9edf2ef 100644
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
@@ -153,7 +153,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
private delegate void NodeInfoCallbackDelegate(void* target, long memPtr);
- private delegate void OnStartCallbackDelegate(void* target, long memPtr);
+ private delegate void OnStartCallbackDelegate(void* target, void* proc, long memPtr);
private delegate void OnStopCallbackDelegate(void* target);
private delegate void ErrorCallbackDelegate(void* target, int errType, sbyte* errClsChars, int errClsCharsLen, sbyte* errMsgChars, int errMsgCharsLen, void* errData, int errDataLen);
@@ -1001,11 +1001,13 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
}, true);
}
- private void OnStart(void* target, long memPtr)
+ private void OnStart(void* target, void* proc, long memPtr)
{
SafeCall(() =>
{
- Ignition.OnStart(IgniteManager.Memory.Get(memPtr).Stream());
+ var proc0 = new UnmanagedTarget(_ctx, proc);
+
+ Ignition.OnStart(proc0, IgniteManager.Memory.Get(memPtr).Stream());
}, true);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
index 9ec2668..4bea392 100644
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
@@ -527,7 +527,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
#region NATIVE METHODS: PROCESSOR
- internal static IUnmanagedTarget IgnitionStart(UnmanagedContext ctx, string cfgPath, string gridName,
+ internal static IUnmanagedTarget IgnitionStart(UnmanagedContext ctx, string cfgPath, string gridName,
bool clientMode)
{
using (var mem = IgniteManager.Memory.Allocate().Stream())
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
index e5e7a57..40b1334 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
@@ -126,7 +126,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
out.synchronize();
- platformCtx.gateway().onStart(mem.pointer());
+ platformCtx.gateway().onStart(this, mem.pointer());
}
// At this moment all necessary native libraries must be loaded, so we can process with store creation.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7631980/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs
index ec7e157..2db1781 100644
--- a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs
@@ -19,8 +19,12 @@ namespace Apache.Ignite.Core.Tests
{
using System;
using System.Collections.Generic;
+ using System.IO;
using System.Threading;
+ using System.Threading.Tasks;
using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Messaging;
+ using Apache.Ignite.Core.Tests.Process;
using NUnit.Framework;
/// <summary>
@@ -44,6 +48,7 @@ namespace Apache.Ignite.Core.Tests
[TearDown]
public void TearDown()
{
+ TestUtils.KillProcesses();
Ignition.StopAll(true);
}
@@ -348,5 +353,70 @@ namespace Apache.Ignite.Core.Tests
Assert.AreEqual(1, cache.Get(1));
}
+
+ /// <summary>
+ /// Tests the processor initialization and grid usage right after topology enter.
+ /// </summary>
+ [Test]
+ public void TestProcessorInit()
+ {
+ var cfg = new IgniteConfiguration
+ {
+ SpringConfigUrl = "config\\start-test-grid1.xml",
+ JvmOptions = TestUtils.TestJavaOptions(),
+ JvmClasspath = TestUtils.CreateTestClasspath()
+ };
+
+ // Start local node
+ var grid = Ignition.Start(cfg);
+
+ // Start remote node in a separate process
+ // ReSharper disable once UnusedVariable
+ var proc = new IgniteProcess(
+ "-jvmClasspath=" + TestUtils.CreateTestClasspath(),
+ "-springConfigUrl=" + Path.GetFullPath(cfg.SpringConfigUrl),
+ "-J-Xms512m", "-J-Xmx512m");
+
+ var cts = new CancellationTokenSource();
+ var token = cts.Token;
+
+ // Spam message subscriptions on a separate thread
+ // to test race conditions during processor init on remote node
+ var listenTask = Task.Factory.StartNew(() =>
+ {
+ var filter = new MessageFilter();
+
+ while (!token.IsCancellationRequested)
+ {
+ var listenId = grid.Message().RemoteListen(filter);
+
+ grid.Message().StopRemoteListen(listenId);
+ }
+ // ReSharper disable once FunctionNeverReturns
+ });
+
+ // Wait for remote node to join
+ Assert.IsTrue(grid.WaitTopology(2, 30000));
+
+ // Wait some more for initialization
+ Thread.Sleep(1000);
+
+ // Cancel listen task and check that it finishes
+ cts.Cancel();
+ Assert.IsTrue(listenTask.Wait(5000));
+ }
+
+ /// <summary>
+ /// Noop message filter.
+ /// </summary>
+ [Serializable]
+ private class MessageFilter : IMessageFilter<int>
+ {
+ /** <inheritdoc /> */
+ public bool Invoke(Guid nodeId, int message)
+ {
+ return true;
+ }
+ }
}
}