You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/03 10:13:18 UTC
ignite git commit: ignite-1843 Avoid discovery thread blocking in
GridJobProcessor.
Repository: ignite
Updated Branches:
refs/heads/ignite-1843 [created] 3df34841f
ignite-1843 Avoid discovery thread blocking in GridJobProcessor.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3df34841
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3df34841
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3df34841
Branch: refs/heads/ignite-1843
Commit: 3df34841f73dac95b5a0b907f8ca0172e7c5a79a
Parents: 6a22193
Author: sboikov <sb...@gridgain.com>
Authored: Tue Nov 3 12:13:07 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Nov 3 12:13:07 2015 +0300
----------------------------------------------------------------------
.../processors/job/GridJobProcessor.java | 96 +++++++++-----------
1 file changed, 44 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3df34841/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 4d6d0bf..20bf58c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -202,7 +202,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
};
/** Current session. */
- private final ThreadLocal<ComputeTaskSession> currentSess = new ThreadLocal<>();
+ private final ThreadLocal<ComputeTaskSession> currSess = new ThreadLocal<>();
/**
* @param ctx Kernal context.
@@ -448,18 +448,19 @@ public class GridJobProcessor extends GridProcessorAdapter {
else if (!nodeId.equals(taskNodeId))
err = "Received job siblings response from unexpected node [taskNodeId=" + taskNodeId +
", nodeId=" + nodeId + ']';
- else
+ else {
// Sender and message type are fine.
res = (GridJobSiblingsResponse)msg;
- if (res.jobSiblings() == null) {
- try {
- res.unmarshalSiblings(marsh);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to unmarshal job siblings.", e);
+ if (res.jobSiblings() == null) {
+ try {
+ res.unmarshalSiblings(marsh);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to unmarshal job siblings.", e);
- err = e.getMessage();
+ err = e.getMessage();
+ }
}
}
@@ -830,7 +831,8 @@ public class GridJobProcessor extends GridProcessorAdapter {
if (w == null)
throw new NoSuchElementException();
- org.apache.ignite.spi.collision.CollisionJobContext ret = new CollisionJobContext(w, false);
+ org.apache.ignite.spi.collision.CollisionJobContext ret =
+ new CollisionJobContext(w, false);
w = null;
@@ -953,16 +955,14 @@ public class GridJobProcessor extends GridProcessorAdapter {
GridJobWorker job = null;
- rwLock.readLock();
-
- try {
- if (stopping) {
- if (log.isDebugEnabled())
- log.debug("Received job execution request while stopping this node (will ignore): " + req);
+ if (!rwLock.tryReadLock()) {
+ if (log.isDebugEnabled())
+ log.debug("Received job execution request while stopping this node (will ignore): " + req);
- return;
- }
+ return;
+ }
+ try {
long endTime = req.getCreateTime() + req.getTimeout();
// Account for overflow.
@@ -1172,7 +1172,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
* @param ses Session.
*/
public void currentTaskSession(ComputeTaskSession ses) {
- currentSess.set(ses);
+ currSess.set(ses);
}
/**
@@ -1195,7 +1195,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
if (!ctx.security().enabled())
return null;
- ComputeTaskSession ses = currentSess.get();
+ ComputeTaskSession ses = currSess.get();
if (ses == null)
return null;
@@ -1404,16 +1404,14 @@ public class GridJobProcessor extends GridProcessorAdapter {
*/
@SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "RedundantCast"})
private void processTaskSessionRequest(UUID nodeId, GridTaskSessionRequest req) {
- rwLock.readLock();
-
- try {
- if (stopping) {
- if (log.isDebugEnabled())
- log.debug("Received job session request while stopping grid (will ignore): " + req);
+ if (!rwLock.tryReadLock()) {
+ if (log.isDebugEnabled())
+ log.debug("Received job session request while stopping grid (will ignore): " + req);
- return;
- }
+ return;
+ }
+ try {
GridTaskSessionImpl ses = ctx.session().getSession(req.getSessionId());
if (ses == null) {
@@ -1557,16 +1555,14 @@ public class GridJobProcessor extends GridProcessorAdapter {
if (log.isDebugEnabled())
log.debug("Received external collision event.");
- rwLock.readLock();
-
- try {
- if (stopping) {
- if (log.isDebugEnabled())
- log.debug("Received external collision notification while stopping grid (will ignore).");
+ if (!rwLock.tryReadLock()) {
+ if (log.isDebugEnabled())
+ log.debug("Received external collision notification while stopping grid (will ignore).");
- return;
- }
+ return;
+ }
+ try {
handleCollisions();
}
finally {
@@ -1653,16 +1649,14 @@ public class GridJobProcessor extends GridProcessorAdapter {
updateJobMetrics();
}
else {
- rwLock.readLock();
-
- try {
- if (stopping) {
- if (log.isDebugEnabled())
- log.debug("Skipping collision handling on job finish (node is stopping).");
+ if (!rwLock.tryReadLock()) {
+ if (log.isDebugEnabled())
+ log.debug("Skipping collision handling on job finish (node is stopping).");
- return;
- }
+ return;
+ }
+ try {
handleCollisions();
}
finally {
@@ -1851,16 +1845,14 @@ public class GridJobProcessor extends GridProcessorAdapter {
}
if (handleCollisions) {
- rwLock.readLock();
-
- try {
- if (stopping) {
- if (log.isDebugEnabled())
- log.debug("Skipped collision handling on discovery event (node is stopping): " + evt);
+ if (!rwLock.tryReadLock()) {
+ if (log.isDebugEnabled())
+ log.debug("Skipped collision handling on discovery event (node is stopping): " + evt);
- return;
- }
+ return;
+ }
+ try {
if (!jobAlwaysActivate)
handleCollisions();
else if (metricsUpdateFreq > -1L)