You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/03 10:13:18 UTC

ignite git commit: ignite-1843 Avoid discovery thread blocking in GridJobProcessor.

Repository: ignite
Updated Branches:
  refs/heads/ignite-1843 [created] 3df34841f


ignite-1843 Avoid discovery thread blocking in GridJobProcessor.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3df34841
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3df34841
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3df34841

Branch: refs/heads/ignite-1843
Commit: 3df34841f73dac95b5a0b907f8ca0172e7c5a79a
Parents: 6a22193
Author: sboikov <sb...@gridgain.com>
Authored: Tue Nov 3 12:13:07 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Nov 3 12:13:07 2015 +0300

----------------------------------------------------------------------
 .../processors/job/GridJobProcessor.java        | 96 +++++++++-----------
 1 file changed, 44 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3df34841/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 4d6d0bf..20bf58c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -202,7 +202,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
     };
 
     /** Current session. */
-    private final ThreadLocal<ComputeTaskSession> currentSess = new ThreadLocal<>();
+    private final ThreadLocal<ComputeTaskSession> currSess = new ThreadLocal<>();
 
     /**
      * @param ctx Kernal context.
@@ -448,18 +448,19 @@ public class GridJobProcessor extends GridProcessorAdapter {
                 else if (!nodeId.equals(taskNodeId))
                     err = "Received job siblings response from unexpected node [taskNodeId=" + taskNodeId +
                         ", nodeId=" + nodeId + ']';
-                else
+                else {
                     // Sender and message type are fine.
                     res = (GridJobSiblingsResponse)msg;
 
-                if (res.jobSiblings() == null) {
-                    try {
-                        res.unmarshalSiblings(marsh);
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to unmarshal job siblings.", e);
+                    if (res.jobSiblings() == null) {
+                        try {
+                            res.unmarshalSiblings(marsh);
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to unmarshal job siblings.", e);
 
-                        err = e.getMessage();
+                            err = e.getMessage();
+                        }
                     }
                 }
 
@@ -830,7 +831,8 @@ public class GridJobProcessor extends GridProcessorAdapter {
                                 if (w == null)
                                     throw new NoSuchElementException();
 
-                                org.apache.ignite.spi.collision.CollisionJobContext ret = new CollisionJobContext(w, false);
+                                org.apache.ignite.spi.collision.CollisionJobContext ret =
+                                    new CollisionJobContext(w, false);
 
                                 w = null;
 
@@ -953,16 +955,14 @@ public class GridJobProcessor extends GridProcessorAdapter {
 
         GridJobWorker job = null;
 
-        rwLock.readLock();
-
-        try {
-            if (stopping) {
-                if (log.isDebugEnabled())
-                    log.debug("Received job execution request while stopping this node (will ignore): " + req);
+        if (!rwLock.tryReadLock()) {
+            if (log.isDebugEnabled())
+                log.debug("Received job execution request while stopping this node (will ignore): " + req);
 
-                return;
-            }
+            return;
+        }
 
+        try {
             long endTime = req.getCreateTime() + req.getTimeout();
 
             // Account for overflow.
@@ -1172,7 +1172,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
      * @param ses Session.
      */
     public void currentTaskSession(ComputeTaskSession ses) {
-        currentSess.set(ses);
+        currSess.set(ses);
     }
 
     /**
@@ -1195,7 +1195,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
         if (!ctx.security().enabled())
             return null;
 
-        ComputeTaskSession ses = currentSess.get();
+        ComputeTaskSession ses = currSess.get();
 
         if (ses == null)
             return null;
@@ -1404,16 +1404,14 @@ public class GridJobProcessor extends GridProcessorAdapter {
      */
     @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "RedundantCast"})
     private void processTaskSessionRequest(UUID nodeId, GridTaskSessionRequest req) {
-        rwLock.readLock();
-
-        try {
-            if (stopping) {
-                if (log.isDebugEnabled())
-                    log.debug("Received job session request while stopping grid (will ignore): " + req);
+        if (!rwLock.tryReadLock()) {
+            if (log.isDebugEnabled())
+                log.debug("Received job session request while stopping grid (will ignore): " + req);
 
-                return;
-            }
+            return;
+        }
 
+        try {
             GridTaskSessionImpl ses = ctx.session().getSession(req.getSessionId());
 
             if (ses == null) {
@@ -1557,16 +1555,14 @@ public class GridJobProcessor extends GridProcessorAdapter {
             if (log.isDebugEnabled())
                 log.debug("Received external collision event.");
 
-            rwLock.readLock();
-
-            try {
-                if (stopping) {
-                    if (log.isDebugEnabled())
-                        log.debug("Received external collision notification while stopping grid (will ignore).");
+            if (!rwLock.tryReadLock()) {
+                if (log.isDebugEnabled())
+                    log.debug("Received external collision notification while stopping grid (will ignore).");
 
-                    return;
-                }
+                return;
+            }
 
+            try {
                 handleCollisions();
             }
             finally {
@@ -1653,16 +1649,14 @@ public class GridJobProcessor extends GridProcessorAdapter {
                         updateJobMetrics();
                 }
                 else {
-                    rwLock.readLock();
-
-                    try {
-                        if (stopping) {
-                            if (log.isDebugEnabled())
-                                log.debug("Skipping collision handling on job finish (node is stopping).");
+                    if (!rwLock.tryReadLock()) {
+                        if (log.isDebugEnabled())
+                            log.debug("Skipping collision handling on job finish (node is stopping).");
 
-                            return;
-                        }
+                        return;
+                    }
 
+                    try {
                         handleCollisions();
                     }
                     finally {
@@ -1851,16 +1845,14 @@ public class GridJobProcessor extends GridProcessorAdapter {
             }
 
             if (handleCollisions) {
-                rwLock.readLock();
-
-                try {
-                    if (stopping) {
-                        if (log.isDebugEnabled())
-                            log.debug("Skipped collision handling on discovery event (node is stopping): " + evt);
+                if (!rwLock.tryReadLock()) {
+                    if (log.isDebugEnabled())
+                        log.debug("Skipped collision handling on discovery event (node is stopping): " + evt);
 
-                        return;
-                    }
+                    return;
+                }
 
+                try {
                     if (!jobAlwaysActivate)
                         handleCollisions();
                     else if (metricsUpdateFreq > -1L)