You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/11/17 10:51:38 UTC

[15/50] [abbrv] ignite git commit: GG-11655 - Fix merge

GG-11655 - Fix merge


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a62a0136
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a62a0136
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a62a0136

Branch: refs/heads/ignite-2693
Commit: a62a0136d295486d95c6e2ab5bba88270d831753
Parents: 92fff63
Author: dkarachentsev <dk...@gridgain.com>
Authored: Wed Nov 2 19:07:45 2016 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Wed Nov 2 19:10:01 2016 +0300

----------------------------------------------------------------------
 .../service/GridServiceProcessor.java           | 136 ++++++++++---------
 1 file changed, 74 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a62a0136/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 8489875..6c26363 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -851,7 +851,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             }
         }
 
-        return new GridServiceProxy<>(prj, name, svcItf, sticky, ctx).proxy();
+        return new GridServiceProxy<T>(prj, name, svcItf, sticky, ctx).proxy();
     }
 
     /**
@@ -904,7 +904,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
      * @param topVer Topology version.
      * @throws IgniteCheckedException If failed.
      */
-    private void reassign(GridServiceDeployment dep, long topVer) throws IgniteCheckedException {
+    private void reassign(GridServiceDeployment dep, AffinityTopologyVersion topVer) throws IgniteCheckedException {
         ServiceConfiguration cfg = dep.configuration();
 
         Object nodeFilter = cfg.getNodeFilter();
@@ -918,7 +918,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         Object affKey = cfg.getAffinityKey();
 
         while (true) {
-            GridServiceAssignments assigns = new GridServiceAssignments(cfg, dep.nodeId(), topVer);
+            GridServiceAssignments assigns = new GridServiceAssignments(cfg, dep.nodeId(), topVer.topologyVersion());
 
              Collection<ClusterNode> nodes;
 
@@ -948,7 +948,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                 Map<UUID, Integer> cnts = new HashMap<>();
 
                 if (affKey != null) {
-                    ClusterNode n = ctx.affinity().mapKeyToNode(cacheName, affKey, new AffinityTopologyVersion(topVer));
+                    ClusterNode n = ctx.affinity().mapKeyToNode(cacheName, affKey, topVer);
 
                     if (n != null) {
                         int cnt = maxPerNodeCnt == 0 ? totalCnt == 0 ? 1 : totalCnt : maxPerNodeCnt;
@@ -1180,7 +1180,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         if (cfg instanceof LazyServiceConfiguration) {
             byte[] bytes = ((LazyServiceConfiguration)cfg).serviceBytes();
 
-            Service srvc = m.unmarshal(bytes, U.resolveClassLoader(null, ctx.config()));
+            Service srvc = U.unmarshal(m, bytes, U.resolveClassLoader(null, ctx.config()));
 
             ctx.resource().inject(srvc);
 
@@ -1190,10 +1190,9 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             Service svc = cfg.getService();
 
             try {
-                byte[] bytes = m.marshal(svc);
+                byte[] bytes = U.marshal(m, svc);
 
-                Service cp = m.unmarshal(bytes,
-                    U.resolveClassLoader(svc.getClass().getClassLoader(), ctx.config()));
+                Service cp = U.unmarshal(m, bytes, U.resolveClassLoader(svc.getClass().getClassLoader(), ctx.config()));
 
                 ctx.resource().inject(cp);
 
@@ -1268,8 +1267,8 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                 ClusterNode oldestSrvNode =
                     CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE);
 
-            if (oldestSrvNode == null)
-                return F.emptyIterator();
+                if (oldestSrvNode == null)
+                    return new GridEmptyIterator<>();
 
                 GridCacheQueryManager qryMgr = cache.context().queries();
 
@@ -1455,7 +1454,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             svcName.set(dep.configuration().getName());
 
             // Ignore other utility cache events.
-            long topVer = ctx.discovery().topologyVersion();
+            AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
 
             ClusterNode oldest = U.oldest(ctx.discovery().nodes(topVer), null);
 
@@ -1506,60 +1505,60 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         }
     }
 
-    /**
-     * Deployment callback.
-     *
-     * @param dep Service deployment.
-     * @param topVer Topology version.
-     */
-    private void onDeployment(final GridServiceDeployment dep, final long topVer) {
-        // Retry forever.
-        try {
-            long newTopVer = ctx.discovery().topologyVersion();
-
-            // If topology version changed, reassignment will happen from topology event.
-            if (newTopVer == topVer)
-                reassign(dep, topVer);
-        }
-        catch (IgniteCheckedException e) {
-            if (!(e instanceof ClusterTopologyCheckedException))
-                log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e);
-
-            long newTopVer = ctx.discovery().topologyVersion();
-
-            if (newTopVer != topVer) {
-                assert newTopVer > topVer;
+        /**
+         * Deployment callback.
+         *
+         * @param dep Service deployment.
+         * @param topVer Topology version.
+         */
+        private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion topVer) {
+            // Retry forever.
+            try {
+                AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
 
-                // Reassignment will happen from topology event.
-                return;
+                // If topology version changed, reassignment will happen from topology event.
+                if (newTopVer.equals(topVer))
+                    reassign(dep, topVer);
             }
+            catch (IgniteCheckedException e) {
+                if (!(e instanceof ClusterTopologyCheckedException))
+                    log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e);
 
-            ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
-                private IgniteUuid id = IgniteUuid.randomUuid();
+                AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
 
-                private long start = System.currentTimeMillis();
+                if (!newTopVer.equals(topVer)) {
+                    assert newTopVer.compareTo(topVer) > 0;
 
-                @Override public IgniteUuid timeoutId() {
-                    return id;
+                    // Reassignment will happen from topology event.
+                    return;
                 }
 
-                @Override public long endTime() {
-                    return start + RETRY_TIMEOUT;
-                }
+                ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
+                    private IgniteUuid id = IgniteUuid.randomUuid();
 
-                @Override public void onTimeout() {
-                    if (!busyLock.enterBusy())
-                        return;
+                    private long start = System.currentTimeMillis();
 
-                    try {
-                        // Try again.
-                        onDeployment(dep, topVer);
+                    @Override public IgniteUuid timeoutId() {
+                        return id;
                     }
-                    finally {
-                        busyLock.leaveBusy();
+
+                    @Override public long endTime() {
+                        return start + RETRY_TIMEOUT;
                     }
-                }
-            });
+
+                    @Override public void onTimeout() {
+                        if (!busyLock.enterBusy())
+                            return;
+
+                        try {
+                            // Try again.
+                            onDeployment(dep, topVer);
+                        }
+                        finally {
+                            busyLock.leaveBusy();
+                        }
+                    }
+                });
         }
     }
 
@@ -1568,16 +1567,28 @@ public class GridServiceProcessor extends GridProcessorAdapter {
      */
     private class TopologyListener implements GridLocalEventListener {
         /** {@inheritDoc} */
-        @Override public void onEvent(final Event evt) {
+        @Override public void onEvent(Event evt) {
             if (!busyLock.enterBusy())
                 return;
 
             try {
+                final AffinityTopologyVersion topVer;
+
+                if (evt instanceof DiscoveryCustomEvent) {
+                    DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage();
+
+                    topVer = ((DiscoveryCustomEvent)evt).affinityTopologyVersion();
+
+                    if (msg instanceof CacheAffinityChangeMessage) {
+                        if (!((CacheAffinityChangeMessage)msg).exchangeNeeded())
+                            return;
+                    }
+                }
+                else
+                    topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
+
                 depExe.submit(new BusyRunnable() {
                     @Override public void run0() {
-                        AffinityTopologyVersion topVer =
-                            new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion());
-
                         ClusterNode oldest = CU.oldestAliveCacheServerNode(cache.context().shared(), topVer);
 
                         if (oldest != null && oldest.isLocal()) {
@@ -1612,7 +1623,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                                         ctx.cache().internalCache(UTILITY_CACHE_NAME).context().affinity().
                                             affinityReadyFuture(topVer).get();
 
-                                        reassign(dep, topVer.topologyVersion());
+                                        reassign(dep, topVer);
                                     }
                                     catch (IgniteCheckedException ex) {
                                         if (!(e instanceof ClusterTopologyCheckedException))
@@ -1629,7 +1640,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                             }
 
                             if (!retries.isEmpty())
-                                onReassignmentFailed(topVer.topologyVersion(), retries);
+                                onReassignmentFailed(topVer, retries);
                         }
 
                         // Clean up zombie assignments.
@@ -1666,13 +1677,14 @@ public class GridServiceProcessor extends GridProcessorAdapter {
          * @param topVer Topology version.
          * @param retries Retries.
          */
-        private void onReassignmentFailed(final long topVer, final Collection<GridServiceDeployment> retries) {
+        private void onReassignmentFailed(final AffinityTopologyVersion topVer,
+            final Collection<GridServiceDeployment> retries) {
             if (!busyLock.enterBusy())
                 return;
 
             try {
                 // If topology changed again, let next event handle it.
-                if (ctx.discovery().topologyVersion() != topVer)
+                if (ctx.discovery().topologyVersionEx().equals(topVer))
                     return;
 
                 for (Iterator<GridServiceDeployment> it = retries.iterator(); it.hasNext(); ) {