You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/09 11:21:35 UTC

[24/50] incubator-ignite git commit: # ignite-sprint-5 use consistent topology version in streamer

# ignite-sprint-5 use consistent topology version in streamer


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/dc1d427f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/dc1d427f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/dc1d427f

Branch: refs/heads/ignite-961
Commit: dc1d427fa2f5235a5779058a444e3845946a7e07
Parents: 20e5677
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jun 5 09:54:37 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jun 5 11:21:36 2015 +0300

----------------------------------------------------------------------
 .../affinity/GridAffinityProcessor.java         | 23 ++++-
 .../datastreamer/DataStreamerImpl.java          | 92 ++++++++++++++------
 .../DataStreamerMultiThreadedSelfTest.java      | 59 +++++++++----
 3 files changed, 129 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc1d427f/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index daa2bc2..aac63c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -164,14 +164,17 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
      *
      * @param cacheName Cache name.
      * @param key Key to map.
+     * @param topVer Topology version.
      * @return Affinity nodes, primary first.
      * @throws IgniteCheckedException If failed.
      */
-    public <K> List<ClusterNode> mapKeyToPrimaryAndBackups(@Nullable String cacheName, K key) throws IgniteCheckedException {
+    public <K> List<ClusterNode> mapKeyToPrimaryAndBackups(@Nullable String cacheName,
+        K key,
+        AffinityTopologyVersion topVer)
+        throws IgniteCheckedException
+    {
         A.notNull(key, "key");
 
-        AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
-
         AffinityInfo affInfo = affinityCache(cacheName, topVer);
 
         if (affInfo == null)
@@ -181,6 +184,20 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Map single key to primary and backup nodes.
+     *
+     * @param cacheName Cache name.
+     * @param key Key to map.
+     * @return Affinity nodes, primary first.
+     * @throws IgniteCheckedException If failed.
+     */
+    public <K> List<ClusterNode> mapKeyToPrimaryAndBackups(@Nullable String cacheName, K key)
+        throws IgniteCheckedException
+    {
+        return mapKeyToPrimaryAndBackups(cacheName, key, ctx.discovery().topologyVersionEx());
+    }
+
+    /**
      * Gets affinity key for cache key.
      *
      * @param cacheName Cache name.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc1d427f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index d16167a..ed8e573 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -198,19 +198,14 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                 // Remap regular mappings.
                 final Buffer buf = bufMappings.remove(id);
 
+                // Only async notification is possible since
+                // discovery thread may be trapped otherwise.
                 if (buf != null) {
-                    // Only async notification is possible since
-                    // discovery thread may be trapped otherwise.
-                    ctx.closure().callLocalSafe(
-                        new Callable<Object>() {
-                            @Override public Object call() throws Exception {
-                                buf.onNodeLeft();
-
-                                return null;
-                            }
-                        },
-                        true /* system pool */
-                    );
+                    waitAffinityAndRun(new Runnable() {
+                        @Override public void run() {
+                            buf.onNodeLeft();
+                        }
+                    }, discoEvt.topologyVersion(), true);
                 }
             }
         };
@@ -248,6 +243,31 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
     }
 
     /**
+     * @param c Closure to run.
+     * @param topVer Topology version to wait for.
+     * @param async Async flag.
+     */
+    private void waitAffinityAndRun(final Runnable c, long topVer, boolean async) {
+        AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer, 0);
+
+        IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(topVer0);
+
+        if (fut != null && !fut.isDone()) {
+            fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                @Override public void apply(IgniteInternalFuture<?> fut) {
+                    ctx.closure().runLocalSafe(c, true);
+                }
+            });
+        }
+        else {
+            if (async)
+                ctx.closure().runLocalSafe(c, true);
+            else
+                c.run();
+        }
+    }
+
+    /**
      * @return Cache object context.
      */
     public CacheObjectContext cacheObjectContext() {
@@ -527,6 +547,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
         boolean initPda = ctx.deploy().enabled() && jobPda == null;
 
+        AffinityTopologyVersion topVer = ctx.cache().context().exchange().readyAffinityVersion();
+
         for (DataStreamerEntry entry : entries) {
             List<ClusterNode> nodes;
 
@@ -543,7 +565,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                     initPda = false;
                 }
 
-                nodes = nodes(key);
+                nodes = nodes(key, topVer);
             }
             catch (IgniteCheckedException e) {
                 resFut.onDone(e);
@@ -621,10 +643,10 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                 }
             };
 
-            GridFutureAdapter<?> f;
+            final GridFutureAdapter<?> f;
 
             try {
-                f = buf.update(entriesForNode, lsnr);
+                f = buf.update(entriesForNode, topVer, lsnr);
             }
             catch (IgniteInterruptedCheckedException e1) {
                 resFut.onDone(e1);
@@ -633,30 +655,38 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             }
 
             if (ctx.discovery().node(nodeId) == null) {
-                if (bufMappings.remove(nodeId, buf))
-                    buf.onNodeLeft();
+                if (bufMappings.remove(nodeId, buf)) {
+                    final Buffer buf0 = buf;
 
-                if (f != null)
-                    f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " +
-                        "(node has left): " + nodeId));
+                    waitAffinityAndRun(new Runnable() {
+                        @Override public void run() {
+                            buf0.onNodeLeft();
+
+                            if (f != null)
+                                f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " +
+                                    "(node has left): " + nodeId));
+                        }
+                    }, ctx.discovery().topologyVersion(), false);
+                }
             }
         }
     }
 
     /**
      * @param key Key to map.
+     * @param topVer Topology version.
      * @return Nodes to send requests to.
      * @throws IgniteCheckedException If failed.
      */
-    private List<ClusterNode> nodes(KeyCacheObject key) throws IgniteCheckedException {
+    private List<ClusterNode> nodes(KeyCacheObject key, AffinityTopologyVersion topVer) throws IgniteCheckedException {
         GridAffinityProcessor aff = ctx.affinity();
 
         List<ClusterNode> res = null;
 
         if (!allowOverwrite())
-            res = aff.mapKeyToPrimaryAndBackups(cacheName, key);
+            res = aff.mapKeyToPrimaryAndBackups(cacheName, key, topVer);
         else {
-            ClusterNode node = aff.mapKeyToNode(cacheName, key);
+            ClusterNode node = aff.mapKeyToNode(cacheName, key, topVer);
 
             if (node != null)
                 res = Collections.singletonList(node);
@@ -959,11 +989,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
         /**
          * @param newEntries Infos.
+         * @param topVer Topology version.
          * @param lsnr Listener for the operation future.
          * @throws IgniteInterruptedCheckedException If failed.
          * @return Future for operation.
          */
         @Nullable GridFutureAdapter<?> update(Iterable<DataStreamerEntry> newEntries,
+            AffinityTopologyVersion topVer,
             IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws IgniteInterruptedCheckedException {
             List<DataStreamerEntry> entries0 = null;
             GridFutureAdapter<Object> curFut0;
@@ -986,7 +1018,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             }
 
             if (entries0 != null) {
-                submit(entries0, curFut0);
+                submit(entries0, topVer, curFut0);
 
                 if (cancelled)
                     curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this));
@@ -1023,7 +1055,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             }
 
             if (entries0 != null)
-                submit(entries0, curFut0);
+                submit(entries0, null, curFut0);
 
             // Create compound future for this flush.
             GridCompoundFuture<Object, Object> res = null;
@@ -1068,10 +1100,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
         /**
          * @param entries Entries to submit.
+         * @param topVer Topology version.
          * @param curFut Current future.
          * @throws IgniteInterruptedCheckedException If interrupted.
          */
-        private void submit(final Collection<DataStreamerEntry> entries, final GridFutureAdapter<Object> curFut)
+        private void submit(final Collection<DataStreamerEntry> entries,
+            @Nullable AffinityTopologyVersion topVer,
+            final GridFutureAdapter<Object> curFut)
             throws IgniteInterruptedCheckedException {
             assert entries != null;
             assert !entries.isEmpty();
@@ -1160,6 +1195,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
                 reqs.put(reqId, (GridFutureAdapter<Object>)fut);
 
+                if (topVer == null)
+                    topVer = ctx.cache().context().exchange().readyAffinityVersion();
+
                 DataStreamerRequest req = new DataStreamerRequest(
                     reqId,
                     topicBytes,
@@ -1174,7 +1212,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                     dep != null ? dep.participants() : null,
                     dep != null ? dep.classLoaderId() : null,
                     dep == null,
-                    ctx.cache().context().exchange().readyAffinityVersion());
+                    topVer);
 
                 try {
                     ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc1d427f/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
index 2382a66..e0092d4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
@@ -41,6 +41,9 @@ public class DataStreamerMultiThreadedSelfTest extends GridCommonAbstractTest {
     /** IP finder. */
     private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
+    /** */
+    private boolean dynamicCache;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -50,13 +53,22 @@ public class DataStreamerMultiThreadedSelfTest extends GridCommonAbstractTest {
 
         cfg.setDiscoverySpi(discoSpi);
 
+        if (!dynamicCache)
+            cfg.setCacheConfiguration(cacheConfiguration());
+
+        return cfg;
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration() {
         CacheConfiguration ccfg = defaultCacheConfiguration();
 
         ccfg.setCacheMode(PARTITIONED);
         ccfg.setBackups(1);
-        cfg.setCacheConfiguration(ccfg);
 
-        return cfg;
+        return ccfg;
     }
 
     /** {@inheritDoc} */
@@ -68,6 +80,22 @@ public class DataStreamerMultiThreadedSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testStartStopIgnites() throws Exception {
+        startStopIgnites();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStopIgnitesDynamicCache() throws Exception {
+        dynamicCache = true;
+
+        startStopIgnites();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void startStopIgnites() throws Exception {
         for (int attempt = 0; attempt < 3; ++attempt) {
             log.info("Iteration: " + attempt);
             
@@ -75,28 +103,29 @@ public class DataStreamerMultiThreadedSelfTest extends GridCommonAbstractTest {
 
             Set<IgniteFuture> futs = new HashSet<>();
 
-            IgniteInternalFuture<?> fut;
+            final AtomicInteger igniteId = new AtomicInteger(1);
 
-            try (final DataStreamerImpl dataLdr = (DataStreamerImpl)ignite.dataStreamer(null)) {
-                dataLdr.maxRemapCount(0);
+            IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    for (int i = 1; i < 5; ++i)
+                        startGrid(igniteId.incrementAndGet());
 
-                final AtomicInteger igniteId = new AtomicInteger(1);
+                    return true;
+                }
+            }, 2, "start-node-thread");
 
-                fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
-                    @Override public Object call() throws Exception {
-                        for (int i = 1; i < 5; ++i)
-                            startGrid(igniteId.incrementAndGet());
+            if (dynamicCache)
+                ignite.getOrCreateCache(cacheConfiguration());
 
-                        return true;
-                    }
-                }, 2, "start-node-thread");
+            try (final DataStreamerImpl dataLdr = (DataStreamerImpl)ignite.dataStreamer(null)) {
+                dataLdr.maxRemapCount(0);
 
-                Random random = new Random();
+                Random rnd = new Random();
 
                 long endTime = U.currentTimeMillis() + 15_000;
 
                 while (!fut.isDone() && U.currentTimeMillis() < endTime)
-                    futs.add(dataLdr.addData(random.nextInt(100_000), random.nextInt(100_000)));
+                    futs.add(dataLdr.addData(rnd.nextInt(100_000), String.valueOf(rnd.nextInt(100_000))));
             }
 
             for (IgniteFuture f : futs)