You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/20 07:45:33 UTC

[1/5] ignite git commit: .NET: Remove unused import

Repository: ignite
Updated Branches:
  refs/heads/ignite-4565-ddl 137054969 -> fe3407333


.NET: Remove unused import


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/527c045f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/527c045f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/527c045f

Branch: refs/heads/ignite-4565-ddl
Commit: 527c045f4f42201ccdb159cffaec7af09a149d17
Parents: 9020d12
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Mon Mar 20 10:07:34 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Mon Mar 20 10:07:34 2017 +0300

----------------------------------------------------------------------
 .../platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs  | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/527c045f/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs
index e8f8bfb..621e604 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs
@@ -20,7 +20,6 @@ namespace Apache.Ignite.Core.Interop
     using System;
     using System.Threading.Tasks;
     using Apache.Ignite.Core.Binary;
-    using Apache.Ignite.Core.Impl.Binary;
 
     /// <summary>
     /// Interface to interoperate with


[3/5] ignite git commit: Merge remote-tracking branch 'origin/ignite-2.0' into ignite-2.0

Posted by vo...@apache.org.
Merge remote-tracking branch 'origin/ignite-2.0' into ignite-2.0


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/167d82d9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/167d82d9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/167d82d9

Branch: refs/heads/ignite-4565-ddl
Commit: 167d82d98610cad2b9256f978319df09b1bd7ac2
Parents: 61c845d 527c045
Author: devozerov <vo...@gridgain.com>
Authored: Mon Mar 20 10:16:55 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Mar 20 10:16:55 2017 +0300

----------------------------------------------------------------------
 .../platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs  | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------



[5/5] ignite git commit: Merge branch 'ignite-2.0' into ignite-4565-ddl

Posted by vo...@apache.org.
Merge branch 'ignite-2.0' into ignite-4565-ddl

# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fe340733
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fe340733
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fe340733

Branch: refs/heads/ignite-4565-ddl
Commit: fe34073337fc9f15646f0d6764b26f1e0c47281b
Parents: 1370549 837bae6
Author: devozerov <vo...@gridgain.com>
Authored: Mon Mar 20 10:45:22 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Mar 20 10:45:22 2017 +0300

----------------------------------------------------------------------
 .../ignite/internal/IgniteComputeImpl.java      | 13 ++++----
 .../failover/GridFailoverContextImpl.java       | 11 ------
 .../managers/failover/GridFailoverManager.java  |  3 --
 .../GridCachePartitionExchangeManager.java      |  2 +-
 .../processors/closure/AffinityTask.java        |  6 ----
 .../closure/GridClosureProcessor.java           | 35 +++-----------------
 .../processors/task/GridTaskWorker.java         |  7 +---
 .../ignite/spi/failover/FailoverContext.java    | 10 ------
 .../spi/failover/GridFailoverTestContext.java   |  7 +---
 .../Interop/IPlatformTarget.cs                  |  1 -
 10 files changed, 13 insertions(+), 82 deletions(-)
----------------------------------------------------------------------



[2/5] ignite git commit: IGNITE-4834: Added ability to execute custom tasks from exchange thread.

Posted by vo...@apache.org.
IGNITE-4834: Added ability to execute custom tasks from exchange thread.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/61c845d6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/61c845d6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/61c845d6

Branch: refs/heads/ignite-4565-ddl
Commit: 61c845d66b82463bfad71c28093f2cbf54d99eb0
Parents: 9020d12
Author: devozerov <vo...@gridgain.com>
Authored: Mon Mar 20 10:16:36 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Mar 20 10:16:36 2017 +0300

----------------------------------------------------------------------
 .../cache/CachePartitionExchangeWorkerTask.java |  29 ++++
 .../GridCachePartitionExchangeManager.java      | 171 +++++++++++++------
 .../processors/cache/GridCacheProcessor.java    |  19 +++
 .../GridDhtPartitionsExchangeFuture.java        |   8 +-
 4 files changed, 176 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/61c845d6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
new file mode 100644
index 0000000..80ef9f5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+/**
+ * Cache partition exchange worker task marker interface.
+ */
+public interface CachePartitionExchangeWorkerTask {
+    /**
+     * @return {@code True) if task denotes standard exchange task, {@code false} if this is a custom task which
+     * must be executed from within exchange thread.
+     */
+    boolean isExchange();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/61c845d6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 3e72efb..f7edb08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -32,7 +32,6 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -55,6 +54,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
@@ -222,10 +222,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     exchFut = exchangeFuture(exchId, evt, cache,null, null);
                 }
                 else {
-                    DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)evt;
+                    DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)evt).customMessage();
 
-                    if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) {
-                        DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage();
+                    if (customMsg instanceof DynamicCacheChangeBatch) {
+                        DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customMsg;
 
                         Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size());
 
@@ -257,8 +257,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             exchFut = exchangeFuture(exchId, evt, cache, valid, null);
                         }
                     }
-                    else if (customEvt.customMessage() instanceof CacheAffinityChangeMessage) {
-                        CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customEvt.customMessage();
+                    else if (customMsg instanceof CacheAffinityChangeMessage) {
+                        CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customMsg;
 
                         if (msg.exchangeId() == null) {
                             if (msg.exchangeNeeded()) {
@@ -267,8 +267,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                                 exchFut = exchangeFuture(exchId, evt, cache, null, msg);
                             }
                         }
-                        else
-                            exchangeFuture(msg.exchangeId(), null, null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg);
+                        else {
+                            exchangeFuture(msg.exchangeId(), null, null, null, null)
+                                .onAffinityChangeMessage(evt.eventNode(), msg);
+                        }
+                    }
+                    else {
+                        // Process event as custom discovery task if needed.
+                        CachePartitionExchangeWorkerTask task =
+                            cctx.cache().exchangeTaskForCustomDiscoveryMessage(customMsg);
+
+                        if (task != null)
+                            exchWorker.addCustomTask(task);
                     }
                 }
 
@@ -369,7 +379,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         if (reconnect)
             reconnectExchangeFut = new GridFutureAdapter<>();
 
-        exchWorker.futQ.addFirst(fut);
+        exchWorker.addFirstExchangeFuture(fut);
 
         if (!cctx.kernalContext().clientNode()) {
             for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
@@ -684,7 +694,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @return {@code True} if pending future queue is empty.
      */
     public boolean hasPendingExchange() {
-        return !exchWorker.futQ.isEmpty();
+        return exchWorker.hasPendingExchange();
+    }
+
+    /**
+     * Add custom task.
+     *
+     * @param task Task.
+     */
+    public void addCustomTask(CachePartitionExchangeWorkerTask task) {
+        assert !task.isExchange();
+
+        exchWorker.addCustomTask(task);
     }
 
     /**
@@ -704,7 +725,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      */
     public void forceDummyExchange(boolean reassign,
         GridDhtPartitionsExchangeFuture exchFut) {
-        exchWorker.addFuture(
+        exchWorker.addExchangeFuture(
             new GridDhtPartitionsExchangeFuture(cctx, reassign, exchFut.discoveryEvent(), exchFut.exchangeId()));
     }
 
@@ -716,7 +737,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     public IgniteInternalFuture<Boolean> forceRebalance(GridDhtPartitionsExchangeFuture exchFut) {
         GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
 
-        exchWorker.addFuture(
+        exchWorker.addExchangeFuture(
             new GridDhtPartitionsExchangeFuture(cctx, exchFut.discoveryEvent(), exchFut.exchangeId(), fut));
 
         return fut;
@@ -1192,7 +1213,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      */
     private boolean addFuture(GridDhtPartitionsExchangeFuture fut) {
         if (fut.onAdded()) {
-            exchWorker.addFuture(fut);
+            exchWorker.addExchangeFuture(fut);
 
             return true;
         }
@@ -1345,10 +1366,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         U.warn(log, "Last exchange future: " + lastInitializedFut);
 
-        U.warn(log, "Pending exchange futures:");
-
-        for (GridDhtPartitionsExchangeFuture fut : exchWorker.futQ)
-            U.warn(log, ">>> " + fut);
+        exchWorker.dumpExchangeDebugInfo();
 
         if (!readyFuts.isEmpty()) {
             U.warn(log, "Pending affinity ready futures:");
@@ -1547,28 +1565,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @param deque Deque to poll from.
-     * @param time Time to wait.
-     * @param w Worker.
-     * @return Polled item.
-     * @throws InterruptedException If interrupted.
-     */
-    @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker w) throws InterruptedException {
-        assert w != null;
-
-        // There is currently a case where {@code interrupted}
-        // flag on a thread gets flipped during stop which causes the pool to hang.  This check
-        // will always make sure that interrupted flag gets reset before going into wait conditions.
-        // The true fix should actually make sure that interrupted flag does not get reset or that
-        // interrupted exception gets propagated. Until we find a real fix, this method should
-        // always work to make sure that there is no hanging during stop.
-        if (w.isCancelled())
-            Thread.currentThread().interrupt();
-
-        return deque.poll(time, MILLISECONDS);
-    }
-
-    /**
      * @param node Target node.
      * @return {@code True} if can use compression for partition map messages.
      */
@@ -1592,32 +1588,94 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      */
     private class ExchangeWorker extends GridWorker {
         /** Future queue. */
-        private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> futQ =
+        private final LinkedBlockingDeque<CachePartitionExchangeWorkerTask> futQ =
             new LinkedBlockingDeque<>();
 
         /** Busy flag used as performance optimization to stop current preloading. */
         private volatile boolean busy;
 
         /**
-         *
+         * Constructor.
          */
         private ExchangeWorker() {
             super(cctx.igniteInstanceName(), "partition-exchanger", GridCachePartitionExchangeManager.this.log);
         }
 
         /**
+         * Add first exchange future.
+         *
+         * @param exchFut Exchange future.
+         */
+        void addFirstExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) {
+            futQ.addFirst(exchFut);
+        }
+
+        /**
          * @param exchFut Exchange future.
          */
-        void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
+        void addExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) {
             assert exchFut != null;
 
-            if (!exchFut.dummy() || (futQ.isEmpty() && !busy))
+            if (!exchFut.dummy() || (!hasPendingExchange() && !busy))
                 futQ.offer(exchFut);
 
             if (log.isDebugEnabled())
                 log.debug("Added exchange future to exchange worker: " + exchFut);
         }
 
+        /**
+         * Add custom exchange task.
+         *
+         * @param task Task.
+         */
+        void addCustomTask(CachePartitionExchangeWorkerTask task) {
+            assert task != null;
+
+            assert !task.isExchange();
+
+            futQ.offer(task);
+        }
+
+        /**
+         * Process custom exchange task.
+         *
+         * @param task Task.
+         */
+        void processCustomTask(CachePartitionExchangeWorkerTask task) {
+            try {
+                cctx.cache().processCustomExchangeTask(task);
+            }
+            catch (Exception e) {
+                U.warn(log, "Failed to process custom exchange task: " + task, e);
+            }
+        }
+
+        /**
+         * @return Whether pending exchange future exists.
+         */
+        boolean hasPendingExchange() {
+            if (!futQ.isEmpty()) {
+                for (CachePartitionExchangeWorkerTask task : futQ) {
+                    if (task.isExchange())
+                        return true;
+                }
+            }
+
+            return false;
+        }
+
+        /**
+         * Dump debug info.
+         */
+        void dumpExchangeDebugInfo() {
+            U.warn(log, "Pending exchange futures:");
+
+            for (CachePartitionExchangeWorkerTask task: futQ) {
+                if (task.isExchange())
+                    U.warn(log, ">>> " + task);
+            }
+        }
+
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
             long timeout = cctx.gridConfig().getNetworkTimeout();
@@ -1625,7 +1683,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             int cnt = 0;
 
             while (!isCancelled()) {
-                GridDhtPartitionsExchangeFuture exchFut = null;
+                CachePartitionExchangeWorkerTask task = null;
 
                 cnt++;
 
@@ -1640,7 +1698,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     }
 
                     // If not first preloading and no more topology events present.
-                    if (!cctx.kernalContext().clientNode() && futQ.isEmpty() && preloadFinished)
+                    if (!cctx.kernalContext().clientNode() && !hasPendingExchange() && preloadFinished)
                         timeout = cctx.gridConfig().getNetworkTimeout();
 
                     // After workers line up and before preloading starts we initialize all futures.
@@ -1656,11 +1714,24 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     }
 
                     // Take next exchange future.
-                    exchFut = poll(futQ, timeout, this);
+                    if (isCancelled())
+                        Thread.currentThread().interrupt();
 
-                    if (exchFut == null)
+                    task = futQ.poll(timeout, MILLISECONDS);
+
+                    if (task == null)
                         continue; // Main while loop.
 
+                    if (!task.isExchange()) {
+                        processCustomTask(task);
+
+                        continue;
+                    }
+
+                    assert task instanceof GridDhtPartitionsExchangeFuture;
+
+                    GridDhtPartitionsExchangeFuture exchFut = (GridDhtPartitionsExchangeFuture)task;
+
                     busy = true;
 
                     Map<Integer, GridDhtPreloaderAssignments> assignsMap = null;
@@ -1727,7 +1798,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                                 changed |= cacheCtx.topology().afterExchange(exchFut);
                             }
 
-                            if (!cctx.kernalContext().clientNode() && changed && futQ.isEmpty())
+                            if (!cctx.kernalContext().clientNode() && changed && !hasPendingExchange())
                                 refreshPartitions();
                         }
                         else {
@@ -1824,7 +1895,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                             U.log(log, "Rebalancing scheduled [order=" + rebList + "]");
 
-                            if (futQ.isEmpty()) {
+                            if (!hasPendingExchange()) {
                                 U.log(log, "Rebalancing started " +
                                     "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
                                     ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
@@ -1850,7 +1921,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to wait for completion of partition map exchange " +
-                        "(preloading will not start): " + exchFut, e);
+                        "(preloading will not start): " + task, e);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/61c845d6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c7ac31a..a7d38a7 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -367,6 +367,25 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Create exchange worker task for custom discovery message.
+     *
+     * @param msg Custom discovery message.
+     * @return Task or {@code null} if message doesn't require any special processing.
+     */
+    public CachePartitionExchangeWorkerTask exchangeTaskForCustomDiscoveryMessage(DiscoveryCustomMessage msg) {
+        return null;
+    }
+
+    /**
+     * Process custom exchange task.
+     *
+     * @param task Task.
+     */
+    public void processCustomExchangeTask(CachePartitionExchangeWorkerTask task) {
+        // No-op.
+    }
+
+    /**
      * @param c Ignite configuration.
      * @param cc Configuration to validate.
      * @param cacheType Cache type.

http://git-wip-us.apache.org/repos/asf/ignite/blob/61c845d6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 46fb144..50937a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
+import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -84,7 +85,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
  * Future for exchanging partition maps.
  */
 public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityTopologyVersion>
-    implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture {
+    implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture, CachePartitionExchangeWorkerTask {
     /** */
     public static final int DUMP_PENDING_OBJECTS_THRESHOLD =
         IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD, 10);
@@ -1677,6 +1678,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /** {@inheritDoc} */
+    @Override public boolean isExchange() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
     @Override public int compareTo(GridDhtPartitionsExchangeFuture fut) {
         return exchId.compareTo(fut.exchId);
     }


[4/5] ignite git commit: IGNITE-4824 Removed deprecated AffinityTask.affinityKey method. This closes #1626.

Posted by vo...@apache.org.
IGNITE-4824 Removed deprecated AffinityTask.affinityKey method. This closes #1626.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/837bae68
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/837bae68
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/837bae68

Branch: refs/heads/ignite-4565-ddl
Commit: 837bae683931688cf2f8e77b11c1fbcbbdd3c4ed
Parents: 167d82d
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Mon Mar 20 10:24:01 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Mar 20 10:24:01 2017 +0300

----------------------------------------------------------------------
 .../ignite/internal/IgniteComputeImpl.java      | 13 ++++----
 .../failover/GridFailoverContextImpl.java       | 11 ------
 .../managers/failover/GridFailoverManager.java  |  3 --
 .../processors/closure/AffinityTask.java        |  6 ----
 .../closure/GridClosureProcessor.java           | 35 +++-----------------
 .../processors/task/GridTaskWorker.java         |  7 +---
 .../ignite/spi/failover/FailoverContext.java    | 10 ------
 .../spi/failover/GridFailoverTestContext.java   |  7 +---
 8 files changed, 12 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/837bae68/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
index 3900c1f..58ce001 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
@@ -119,8 +119,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
                 throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key="
                     + affKey + ']');
 
-            saveOrGet(ctx.closure().affinityRun(Collections.singletonList(cacheName), partId, affKey,
-                job, prj.nodes()));
+            saveOrGet(ctx.closure().affinityRun(Collections.singletonList(cacheName), partId, job, prj.nodes()));
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -149,7 +148,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
                 throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key="
                     + affKey + ']');
 
-            saveOrGet(ctx.closure().affinityRun(cacheNames, partId, affKey, job, prj.nodes()));
+            saveOrGet(ctx.closure().affinityRun(cacheNames, partId, job, prj.nodes()));
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -168,7 +167,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
         guard();
 
         try {
-            saveOrGet(ctx.closure().affinityRun(cacheNames, partId, null, job, prj.nodes()));
+            saveOrGet(ctx.closure().affinityRun(cacheNames, partId, job, prj.nodes()));
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -194,7 +193,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
                 throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key="
                     + affKey + ']');
 
-            return saveOrGet(ctx.closure().affinityCall(Collections.singletonList(cacheName), partId, affKey, job,
+            return saveOrGet(ctx.closure().affinityCall(Collections.singletonList(cacheName), partId, job,
                 prj.nodes()));
         }
         catch (IgniteCheckedException e) {
@@ -226,7 +225,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
                 throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key="
                     + affKey + ']');
 
-            return saveOrGet(ctx.closure().affinityCall(cacheNames, partId, affKey, job, prj.nodes()));
+            return saveOrGet(ctx.closure().affinityCall(cacheNames, partId, job, prj.nodes()));
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -245,7 +244,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
         guard();
 
         try {
-            return saveOrGet(ctx.closure().affinityCall(cacheNames, partId, null, job, prj.nodes()));
+            return saveOrGet(ctx.closure().affinityCall(cacheNames, partId, job, prj.nodes()));
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/837bae68/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
index ad77271..735fce9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
@@ -46,9 +46,6 @@ public class GridFailoverContextImpl implements FailoverContext {
     /** Partition key for affinityCall. */
     private final int partId;
 
-    /** Affinity key for affinityCall. */
-    private final Object affKey;
-
     /** Affinity cache name for affinityCall. */
     private final String affCacheName;
 
@@ -62,7 +59,6 @@ public class GridFailoverContextImpl implements FailoverContext {
      * @param jobRes Failed job result.
      * @param loadMgr Load manager.
      * @param partId Partition.
-     * @param affKey Affinity key.
      * @param affCacheName Affinity cache name.
      * @param topVer Affinity topology version.
      */
@@ -70,7 +66,6 @@ public class GridFailoverContextImpl implements FailoverContext {
         ComputeJobResult jobRes,
         GridLoadBalancerManager loadMgr,
         int partId,
-        @Nullable Object affKey,
         @Nullable String affCacheName,
         @Nullable AffinityTopologyVersion topVer) {
         assert taskSes != null;
@@ -81,7 +76,6 @@ public class GridFailoverContextImpl implements FailoverContext {
         this.jobRes = jobRes;
         this.loadMgr = loadMgr;
         this.partId = partId;
-        this.affKey = affKey;
         this.affCacheName = affCacheName;
         this.topVer = topVer;
     }
@@ -102,11 +96,6 @@ public class GridFailoverContextImpl implements FailoverContext {
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public Object affinityKey() {
-        return affKey;
-    }
-
-    /** {@inheritDoc} */
     @Nullable @Override public String affinityCacheName() {
         return affCacheName;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/837bae68/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
index 52edd1d..d287e63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
@@ -60,7 +60,6 @@ public class GridFailoverManager extends GridManagerAdapter<FailoverSpi> {
      * @param jobRes Job result.
      * @param top Collection of all topology nodes.
      * @param affPartId Partition number.
-     * @param affKey Affinity key.
      * @param affCacheName Affinity cache name.
      * @param topVer Affinity topology version.
      * @return New node to route this job to.
@@ -69,14 +68,12 @@ public class GridFailoverManager extends GridManagerAdapter<FailoverSpi> {
         ComputeJobResult jobRes,
         List<ClusterNode> top,
         int affPartId,
-        @Nullable Object affKey,
         @Nullable String affCacheName,
         @Nullable AffinityTopologyVersion topVer) {
         return getSpi(taskSes.getFailoverSpi()).failover(new GridFailoverContextImpl(taskSes,
             jobRes,
             ctx.loadBalancing(),
             affPartId,
-            affKey,
             affCacheName,
             topVer),
             top);

http://git-wip-us.apache.org/repos/asf/ignite/blob/837bae68/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java
index 9007c8b..a6fd224 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java
@@ -26,12 +26,6 @@ import org.jetbrains.annotations.Nullable;
  */
 public interface AffinityTask {
     /**
-     * @return Affinity key.
-     */
-    @Deprecated
-    @Nullable public Object affinityKey();
-
-    /**
      * @return Partition.
      */
     public int partition();

http://git-wip-us.apache.org/repos/asf/ignite/blob/837bae68/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 55f170f..1f21fd2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -438,7 +438,6 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     /**
      * @param cacheNames Cache names.
      * @param partId Partition.
-     * @param affKey Affinity key.
      * @param job Closure to execute.
      * @param nodes Grid nodes.
      * @return Grid future for collection of closure results.
@@ -446,7 +445,6 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      */
     public <R> ComputeTaskInternalFuture<R> affinityCall(@NotNull Collection<String> cacheNames,
         int partId,
-        @Nullable Object affKey,
         Callable<R> job,
         @Nullable Collection<ClusterNode> nodes) throws IgniteCheckedException {
         assert partId >= 0 : partId;
@@ -467,7 +465,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
-            return ctx.task().execute(new T5(node, job, cacheNames, partId, affKey, mapTopVer), null, false);
+            return ctx.task().execute(new T5(node, job, cacheNames, partId, mapTopVer), null, false);
         }
         finally {
             busyLock.readUnlock();
@@ -477,7 +475,6 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     /**
      * @param cacheNames Cache names.
      * @param partId Partition.
-     * @param affKey Affinity key.
      * @param job Job.
      * @param nodes Grid nodes.
      * @return Job future.
@@ -485,7 +482,6 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      */
     public ComputeTaskInternalFuture<?> affinityRun(@NotNull Collection<String> cacheNames,
         int partId,
-        @Nullable Object affKey,
         Runnable job,
         @Nullable Collection<ClusterNode> nodes) throws IgniteCheckedException {
         assert partId >= 0 : partId;
@@ -506,7 +502,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
-            return ctx.task().execute(new T4(node, job, cacheNames, partId, affKey, mapTopVer), null, false);
+            return ctx.task().execute(new T4(node, job, cacheNames, partId, mapTopVer), null, false);
         }
         finally {
             busyLock.readUnlock();
@@ -518,6 +514,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param job Closure to execute.
      * @param nodes Grid nodes.
      * @param sys If {@code true}, then system pool will be used.
+     * @param timeout Timeout.
      * @param <R> Type.
      * @return Grid future for collection of closure results.
      */
@@ -1324,9 +1321,6 @@ public class GridClosureProcessor extends GridProcessorAdapter {
         private Runnable job;
 
         /** */
-        private Object affKey;
-
-        /** */
         private int partId;
 
         /** */
@@ -1335,16 +1329,14 @@ public class GridClosureProcessor extends GridProcessorAdapter {
         /** */
         private Collection<String> affCacheNames;
 
-
         /**
          * @param node Cluster node.
          * @param job Job affinity partition.
          * @param affCacheNames Affinity caches.
          * @param partId Partition.
-         * @param affKey Affinity key.
          * @param topVer Affinity topology version.
          */
-        private T4(ClusterNode node, Runnable job, Collection<String> affCacheNames, int partId, Object affKey,
+        private T4(ClusterNode node, Runnable job, Collection<String> affCacheNames, int partId,
             AffinityTopologyVersion topVer) {
             super(U.peerDeployAware0(job));
 
@@ -1354,7 +1346,6 @@ public class GridClosureProcessor extends GridProcessorAdapter {
             this.job = job;
             this.affCacheNames = affCacheNames;
             this.partId = partId;
-            this.affKey = affKey;
             this.topVer = topVer;
         }
 
@@ -1377,11 +1368,6 @@ public class GridClosureProcessor extends GridProcessorAdapter {
         @Nullable @Override public AffinityTopologyVersion topologyVersion() {
             return topVer;
         }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Object affinityKey() {
-            return affKey;
-        }
     }
 
     /**
@@ -1398,9 +1384,6 @@ public class GridClosureProcessor extends GridProcessorAdapter {
         private Callable<R> job;
 
         /** */
-        private Object affKey;
-
-        /** */
         private int partId;
 
         /** */
@@ -1409,21 +1392,17 @@ public class GridClosureProcessor extends GridProcessorAdapter {
         /** */
         private Collection<String> affCacheNames;
 
-
-
         /**
          * @param node Cluster node.
          * @param job Job affinity partition.
          * @param affCacheNames Affinity caches.
          * @param partId Partition.
-         * @param affKey Affinity key.
          * @param topVer Affinity topology version.
          */
         private T5(ClusterNode node,
             Callable<R> job,
             Collection<String> affCacheNames,
             int partId,
-            Object affKey,
             AffinityTopologyVersion topVer) {
             super(U.peerDeployAware0(job));
 
@@ -1431,7 +1410,6 @@ public class GridClosureProcessor extends GridProcessorAdapter {
             this.job = job;
             this.affCacheNames = affCacheNames;
             this.partId = partId;
-            this.affKey = affKey;
             this.topVer = topVer;
         }
 
@@ -1451,11 +1429,6 @@ public class GridClosureProcessor extends GridProcessorAdapter {
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public Object affinityKey() {
-            return affKey;
-        }
-
-        /** {@inheritDoc} */
         @Override public int partition() {
             return partId;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/837bae68/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index b8a9e43..6305d23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -194,9 +194,6 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
     private final boolean noFailover;
 
     /** */
-    private final Object affKey;
-
-    /** */
     private final int affPartId;
 
     /** */
@@ -330,7 +327,6 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
 
             affPartId = affTask.partition();
             affCacheName = F.first(affTask.affinityCacheNames());
-            affKey = affTask.affinityKey();
             mapTopVer = affTask.topologyVersion();
 
             affCacheIds = new int[affTask.affinityCacheNames().size()];
@@ -343,7 +339,6 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
         else {
             affPartId = -1;
             affCacheName = null;
-            affKey = null;
             mapTopVer = null;
             affCacheIds = null;
         }
@@ -1175,7 +1170,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
             ctx.resource().invokeAnnotated(dep, jobRes.getJob(), ComputeJobBeforeFailover.class);
 
             ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top), affPartId,
-                affKey, affCacheName, mapTopVer);
+                affCacheName, mapTopVer);
 
             return checkTargetNode(res, jobRes, node);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/837bae68/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
index b126db1..d9eec04 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
@@ -59,16 +59,6 @@ public interface FailoverContext {
     public ClusterNode getBalancedNode(List<ClusterNode> top) throws IgniteException;
 
     /**
-     * Gets affinity key for {@link IgniteCompute#affinityRun(String, Object, IgniteRunnable)},
-     * {@link IgniteCompute#affinityRun(Collection, Object, IgniteRunnable)},
-     * {@link IgniteCompute#affinityCall(String, Object, IgniteCallable)}
-     * and {@link IgniteCompute#affinityCall(Collection, Object, IgniteCallable)}.
-     *
-     * @return Affinity key.
-     */
-    @Nullable public Object affinityKey();
-
-    /**
      * Gets partition for {@link IgniteCompute#affinityRun(Collection, int, IgniteRunnable)}
      * and {@link IgniteCompute#affinityCall(Collection, int, IgniteCallable)}.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/837bae68/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java b/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java
index 97a3e0b..c004fa2 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java
@@ -70,12 +70,7 @@ public class GridFailoverTestContext implements FailoverContext {
     }
 
     /** {@inheritDoc} */
-    @Override public Object affinityKey() {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public int partition() {
+    @Override public int partition() {
         return -1;
     }