You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/11/17 10:51:38 UTC
[15/50] [abbrv] ignite git commit: GG-11655 - Fix merge
GG-11655 - Fix merge
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a62a0136
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a62a0136
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a62a0136
Branch: refs/heads/ignite-2693
Commit: a62a0136d295486d95c6e2ab5bba88270d831753
Parents: 92fff63
Author: dkarachentsev <dk...@gridgain.com>
Authored: Wed Nov 2 19:07:45 2016 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Wed Nov 2 19:10:01 2016 +0300
----------------------------------------------------------------------
.../service/GridServiceProcessor.java | 136 ++++++++++---------
1 file changed, 74 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a62a0136/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 8489875..6c26363 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -851,7 +851,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
}
}
- return new GridServiceProxy<>(prj, name, svcItf, sticky, ctx).proxy();
+ return new GridServiceProxy<T>(prj, name, svcItf, sticky, ctx).proxy();
}
/**
@@ -904,7 +904,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
* @param topVer Topology version.
* @throws IgniteCheckedException If failed.
*/
- private void reassign(GridServiceDeployment dep, long topVer) throws IgniteCheckedException {
+ private void reassign(GridServiceDeployment dep, AffinityTopologyVersion topVer) throws IgniteCheckedException {
ServiceConfiguration cfg = dep.configuration();
Object nodeFilter = cfg.getNodeFilter();
@@ -918,7 +918,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
Object affKey = cfg.getAffinityKey();
while (true) {
- GridServiceAssignments assigns = new GridServiceAssignments(cfg, dep.nodeId(), topVer);
+ GridServiceAssignments assigns = new GridServiceAssignments(cfg, dep.nodeId(), topVer.topologyVersion());
Collection<ClusterNode> nodes;
@@ -948,7 +948,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
Map<UUID, Integer> cnts = new HashMap<>();
if (affKey != null) {
- ClusterNode n = ctx.affinity().mapKeyToNode(cacheName, affKey, new AffinityTopologyVersion(topVer));
+ ClusterNode n = ctx.affinity().mapKeyToNode(cacheName, affKey, topVer);
if (n != null) {
int cnt = maxPerNodeCnt == 0 ? totalCnt == 0 ? 1 : totalCnt : maxPerNodeCnt;
@@ -1180,7 +1180,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
if (cfg instanceof LazyServiceConfiguration) {
byte[] bytes = ((LazyServiceConfiguration)cfg).serviceBytes();
- Service srvc = m.unmarshal(bytes, U.resolveClassLoader(null, ctx.config()));
+ Service srvc = U.unmarshal(m, bytes, U.resolveClassLoader(null, ctx.config()));
ctx.resource().inject(srvc);
@@ -1190,10 +1190,9 @@ public class GridServiceProcessor extends GridProcessorAdapter {
Service svc = cfg.getService();
try {
- byte[] bytes = m.marshal(svc);
+ byte[] bytes = U.marshal(m, svc);
- Service cp = m.unmarshal(bytes,
- U.resolveClassLoader(svc.getClass().getClassLoader(), ctx.config()));
+ Service cp = U.unmarshal(m, bytes, U.resolveClassLoader(svc.getClass().getClassLoader(), ctx.config()));
ctx.resource().inject(cp);
@@ -1268,8 +1267,8 @@ public class GridServiceProcessor extends GridProcessorAdapter {
ClusterNode oldestSrvNode =
CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE);
- if (oldestSrvNode == null)
- return F.emptyIterator();
+ if (oldestSrvNode == null)
+ return new GridEmptyIterator<>();
GridCacheQueryManager qryMgr = cache.context().queries();
@@ -1455,7 +1454,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
svcName.set(dep.configuration().getName());
// Ignore other utility cache events.
- long topVer = ctx.discovery().topologyVersion();
+ AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
ClusterNode oldest = U.oldest(ctx.discovery().nodes(topVer), null);
@@ -1506,60 +1505,60 @@ public class GridServiceProcessor extends GridProcessorAdapter {
}
}
- /**
- * Deployment callback.
- *
- * @param dep Service deployment.
- * @param topVer Topology version.
- */
- private void onDeployment(final GridServiceDeployment dep, final long topVer) {
- // Retry forever.
- try {
- long newTopVer = ctx.discovery().topologyVersion();
-
- // If topology version changed, reassignment will happen from topology event.
- if (newTopVer == topVer)
- reassign(dep, topVer);
- }
- catch (IgniteCheckedException e) {
- if (!(e instanceof ClusterTopologyCheckedException))
- log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e);
-
- long newTopVer = ctx.discovery().topologyVersion();
-
- if (newTopVer != topVer) {
- assert newTopVer > topVer;
+ /**
+ * Deployment callback.
+ *
+ * @param dep Service deployment.
+ * @param topVer Topology version.
+ */
+ private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion topVer) {
+ // Retry forever.
+ try {
+ AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
- // Reassignment will happen from topology event.
- return;
+ // If topology version changed, reassignment will happen from topology event.
+ if (newTopVer.equals(topVer))
+ reassign(dep, topVer);
}
+ catch (IgniteCheckedException e) {
+ if (!(e instanceof ClusterTopologyCheckedException))
+ log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e);
- ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
- private IgniteUuid id = IgniteUuid.randomUuid();
+ AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
- private long start = System.currentTimeMillis();
+ if (!newTopVer.equals(topVer)) {
+ assert newTopVer.compareTo(topVer) > 0;
- @Override public IgniteUuid timeoutId() {
- return id;
+ // Reassignment will happen from topology event.
+ return;
}
- @Override public long endTime() {
- return start + RETRY_TIMEOUT;
- }
+ ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
+ private IgniteUuid id = IgniteUuid.randomUuid();
- @Override public void onTimeout() {
- if (!busyLock.enterBusy())
- return;
+ private long start = System.currentTimeMillis();
- try {
- // Try again.
- onDeployment(dep, topVer);
+ @Override public IgniteUuid timeoutId() {
+ return id;
}
- finally {
- busyLock.leaveBusy();
+
+ @Override public long endTime() {
+ return start + RETRY_TIMEOUT;
}
- }
- });
+
+ @Override public void onTimeout() {
+ if (!busyLock.enterBusy())
+ return;
+
+ try {
+ // Try again.
+ onDeployment(dep, topVer);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ });
}
}
@@ -1568,16 +1567,28 @@ public class GridServiceProcessor extends GridProcessorAdapter {
*/
private class TopologyListener implements GridLocalEventListener {
/** {@inheritDoc} */
- @Override public void onEvent(final Event evt) {
+ @Override public void onEvent(Event evt) {
if (!busyLock.enterBusy())
return;
try {
+ final AffinityTopologyVersion topVer;
+
+ if (evt instanceof DiscoveryCustomEvent) {
+ DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage();
+
+ topVer = ((DiscoveryCustomEvent)evt).affinityTopologyVersion();
+
+ if (msg instanceof CacheAffinityChangeMessage) {
+ if (!((CacheAffinityChangeMessage)msg).exchangeNeeded())
+ return;
+ }
+ }
+ else
+ topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
+
depExe.submit(new BusyRunnable() {
@Override public void run0() {
- AffinityTopologyVersion topVer =
- new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion());
-
ClusterNode oldest = CU.oldestAliveCacheServerNode(cache.context().shared(), topVer);
if (oldest != null && oldest.isLocal()) {
@@ -1612,7 +1623,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
ctx.cache().internalCache(UTILITY_CACHE_NAME).context().affinity().
affinityReadyFuture(topVer).get();
- reassign(dep, topVer.topologyVersion());
+ reassign(dep, topVer);
}
catch (IgniteCheckedException ex) {
if (!(e instanceof ClusterTopologyCheckedException))
@@ -1629,7 +1640,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
}
if (!retries.isEmpty())
- onReassignmentFailed(topVer.topologyVersion(), retries);
+ onReassignmentFailed(topVer, retries);
}
// Clean up zombie assignments.
@@ -1666,13 +1677,14 @@ public class GridServiceProcessor extends GridProcessorAdapter {
* @param topVer Topology version.
* @param retries Retries.
*/
- private void onReassignmentFailed(final long topVer, final Collection<GridServiceDeployment> retries) {
+ private void onReassignmentFailed(final AffinityTopologyVersion topVer,
+ final Collection<GridServiceDeployment> retries) {
if (!busyLock.enterBusy())
return;
try {
// If topology changed again, let next event handle it.
- if (ctx.discovery().topologyVersion() != topVer)
+ if (ctx.discovery().topologyVersionEx().equals(topVer))
return;
for (Iterator<GridServiceDeployment> it = retries.iterator(); it.hasNext(); ) {