You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/18 21:48:29 UTC

[19/50] incubator-ignite git commit: ignite-484-1 - refactor

ignite-484-1 - refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4a534059
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4a534059
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4a534059

Branch: refs/heads/ignite-980
Commit: 4a534059e4bf46b061e1272d432fd00c6f87acb0
Parents: 2b7dc3b
Author: S.Vladykin <sv...@gridgain.com>
Authored: Fri Jun 12 17:21:14 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Fri Jun 12 17:21:14 2015 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtLocalPartition.java  |  10 +-
 .../dht/GridDhtPartitionsReservation.java       | 169 ++++++++++++++-----
 .../query/h2/twostep/GridMapQueryExecutor.java  |  24 +--
 3 files changed, 141 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a534059/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 3670b8e..018ffd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -143,19 +143,15 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
         assert state.getReference() != EVICTED : "we can reserve only active partitions";
         assert state.getStamp() != 0 : "partition must be already reserved before adding group reservation";
 
-        if (!reservations.addIfAbsent(r))
-            return false;
-
-        r.register(this);
-
-        return true;
+        return reservations.addIfAbsent(r);
     }
 
     /**
      * @param r Reservation.
      */
     public void removeReservation(GridDhtPartitionsReservation r) {
-        reservations.remove(r);
+        if (!reservations.remove(r))
+            throw new IllegalStateException("Reservation was already removed.");
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a534059/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
index fcd6088..71a1859 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
@@ -19,70 +19,126 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
 
 import java.util.*;
 import java.util.concurrent.atomic.*;
 
-import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING;
 
 /**
  * Reservation mechanism for multiple partitions allowing to do a reservation in one operation.
  */
 public class GridDhtPartitionsReservation implements GridReservable {
     /** */
+    private static final GridDhtLocalPartition[] EMPTY = {};
+
+    /** */
+    private final Object appKey;
+
+    /** */
     private final GridCacheContext<?,?> cctx;
 
     /** */
     private final AffinityTopologyVersion topVer;
 
     /** */
-    private final List<GridDhtLocalPartition> parts = new ArrayList<>();
+    private final AtomicReference<GridDhtLocalPartition[]> parts = new AtomicReference<>();
 
     /** */
     private final AtomicInteger reservations = new AtomicInteger();
 
     /** */
-    private final IgniteInClosure<GridDhtPartitionsReservation> finalize;
+    private volatile CI1<GridDhtPartitionsReservation> unpublish;
 
     /**
      * @param topVer AffinityTopologyVersion version.
      * @param cctx Cache context.
-     * @param finalize Finalizing closure.
+     * @param appKey Application key for reservation.
      */
-    public GridDhtPartitionsReservation(
-        AffinityTopologyVersion topVer,
-        GridCacheContext<?,?> cctx,
-        IgniteInClosure<GridDhtPartitionsReservation> finalize) {
+    public GridDhtPartitionsReservation(AffinityTopologyVersion topVer, GridCacheContext<?,?> cctx, Object appKey) {
         assert topVer != null;
         assert cctx != null;
+        assert appKey != null;
 
         this.topVer = topVer;
         this.cctx = cctx;
-        this.finalize = finalize;
+        this.appKey = appKey;
     }
 
     /**
-     * @return Topology version.
+     * Registers all the given partitions for this reservation.
+     *
+     * @param parts Partitions.
+     * @return {@code true} If registration succeeded and this reservation can be published.
      */
-    public AffinityTopologyVersion topologyVersion() {
-        return topVer;
-    }
+    public boolean register(Collection<? extends GridReservable> parts) {
+        assert !F.isEmpty(parts) : "empty partitions list";
 
-    /**
-     * @return Cache context.
-     */
-    public GridCacheContext<?,?> cacheContext() {
-        return cctx;
+        GridDhtLocalPartition[] arr = new GridDhtLocalPartition[parts.size()];
+
+        int i = 0;
+        int prevPart = -1;
+        boolean sorted = true; // Most probably it is a sorted list.
+
+        for (GridReservable part : parts) {
+            arr[i] = (GridDhtLocalPartition)part;
+
+            if (sorted) { // Make sure it will be a sorted array.
+                int id = arr[i].id();
+
+                if (id <= prevPart)
+                    sorted = false;
+
+                prevPart = id;
+            }
+
+            i++;
+        }
+
+        if (!sorted)
+            Arrays.sort(arr);
+
+        i = 0;
+        prevPart = -1;
+
+        // Register in correct sort order.
+        for (GridDhtLocalPartition part : arr) {
+            if (prevPart == part.id())
+                throw new IllegalStateException("Duplicated partitions.");
+
+            prevPart = part.id();
+
+            if (!part.addReservation(this)) {
+                if (i != 0)
+                    throw new IllegalStateException(
+                        "Trying to reserve different sets of partitions for the same topology version.");
+
+                return false;
+            }
+
+            i++;
+        }
+
+        if (!this.parts.compareAndSet(null, arr))
+            throw new IllegalStateException("Partitions can be registered only once.");
+
+        return true;
     }
 
     /**
-     * Registers partition for this group reservation.
+     * Must be called when this reservation is published.
      *
-     * @param part Partition.
+     * @param unpublish Closure to unpublish this reservation when it will become invalid.
      */
-    public void register(GridDhtLocalPartition part) {
-        parts.add(part);
+    public void onPublish(CI1<GridDhtPartitionsReservation> unpublish) {
+        assert unpublish != null;
+        assert this.unpublish == null;
+
+        this.unpublish = unpublish;
+
+        if (reservations.get() == -1)
+            unregister();
     }
 
     /**
@@ -91,6 +147,8 @@ public class GridDhtPartitionsReservation implements GridReservable {
      * @return {@code true} If succeeded.
      */
     @Override public boolean reserve() {
+        assert parts.get() != null : "partitions must be registered before the first reserve attempt";
+
         for (;;) {
             int r = reservations.get();
 
@@ -105,6 +163,25 @@ public class GridDhtPartitionsReservation implements GridReservable {
     }
 
     /**
+     * @param parts Partitions.
+     */
+    private static void tryEvict(GridDhtLocalPartition[] parts) {
+        if (parts == null)  // Can be not initialized yet.
+            return ;
+
+        for (GridDhtLocalPartition part : parts)
+            tryEvict(part);
+    }
+
+    /**
+     * @param part Partition.
+     */
+    private static void tryEvict(GridDhtLocalPartition part) {
+        if (part.state() == RENTING && part.reservations() == 0)
+            part.tryEvictAsync(true);
+    }
+
+    /**
      * Releases all the registered partitions.
      */
     @Override public void release() {
@@ -116,12 +193,8 @@ public class GridDhtPartitionsReservation implements GridReservable {
 
             if (reservations.compareAndSet(r, r - 1)) {
                 // If it was the last reservation and topology version changed -> attempt to evict partitions.
-                if (r == 1 && !topVer.equals(cctx.topology().topologyVersion())) {
-                    for (GridDhtLocalPartition part : parts) {
-                        if (part.state() == RENTING)
-                            part.tryEvictAsync(true);
-                    }
-                }
+                if (r == 1 && !topVer.equals(cctx.topology().topologyVersion()))
+                    tryEvict(parts.get());
 
                 return;
             }
@@ -129,6 +202,26 @@ public class GridDhtPartitionsReservation implements GridReservable {
     }
 
     /**
+     * Unregisters this reservation from all the partitions.
+     */
+    private void unregister() {
+        GridDhtLocalPartition[] arr = parts.get();
+
+        if (!F.isEmpty(arr) && unpublish != null && parts.compareAndSet(arr, EMPTY)) {
+            // Reverse order makes sure that addReservation on the same topVer reservation will fail on the first partition.
+            for (int i = arr.length - 1; i >= 0; i--) {
+                GridDhtLocalPartition part = arr[i];
+
+                part.removeReservation(this);
+
+                tryEvict(part);
+            }
+
+            unpublish.apply(this);
+        }
+    }
+
+    /**
      * Must be checked in {@link GridDhtLocalPartition#tryEvict(boolean)}.
      * If returns {@code true} then probably partition will be evicted (or at least cleared),
      * so this reservation object becomes invalid and must be dropped from the partition.
@@ -146,12 +239,7 @@ public class GridDhtPartitionsReservation implements GridReservable {
             return r == -1;
 
         if (reservations.compareAndSet(0, -1)) {
-            // Remove our self.
-            for (GridDhtLocalPartition part : parts)
-                part.removeReservation(this);
-
-            if (finalize != null)
-                finalize.apply(this);
+            unregister();
 
             return true;
         }
@@ -169,13 +257,18 @@ public class GridDhtPartitionsReservation implements GridReservable {
 
         GridDhtPartitionsReservation that = (GridDhtPartitionsReservation)o;
 
-        return topVer.equals(that.topVer) && cctx == that.cctx;
+        return cctx == that.cctx && topVer.equals(that.topVer) && appKey.equals(that.appKey);
     }
 
     /** {@inheritDoc} */
     @Override public int hashCode() {
-        String cache = cctx.name();
+        String name = cctx.name();
+
+        int result = name == null ? 0 : name.hashCode();
+
+        result = 31 * result + appKey.hashCode();
+        result = 31 * result + topVer.hashCode();
 
-        return 31 * topVer.hashCode() + cache == null ? 0 : cache.hashCode();
+        return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a534059/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index a8bc6e0..42f01cb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -290,27 +290,17 @@ public class GridMapQueryExecutor {
 
                     if (explicitParts == null) {
                         // We reserved all the primary partitions for cache, attempt to add group reservation.
-                        GridDhtPartitionsReservation reservation = new GridDhtPartitionsReservation(topVer, cctx,
-                            new CI1<GridDhtPartitionsReservation>() {
+                        GridDhtPartitionsReservation grp = new GridDhtPartitionsReservation(topVer, cctx, "SQL");
+
+                        if (grp.register(reserved.subList(reserved.size() - partIds.size(), reserved.size()))) {
+                            if (reservations.putIfAbsent(grpKey, grp) != null)
+                                throw new IllegalStateException("Reservation already exists.");
+
+                            grp.onPublish(new CI1<GridDhtPartitionsReservation>() {
                                 @Override public void apply(GridDhtPartitionsReservation r) {
                                     reservations.remove(grpKey, r);
                                 }
                             });
-
-                        for (int p = reserved.size() - partIds.size(); p < reserved.size(); p++) {
-                            if (!((GridDhtLocalPartition)reserved.get(p)).addReservation(reservation)) {
-                                // Can fail to add only on the first partition because of the same order of partitions.
-                                assert p == reserved.size() - partIds.size() : p;
-
-                                reservation = null;
-
-                                break;
-                            }
-                        }
-
-                        if (reservation != null) { // If we were able to add reservation to all partitions, publish it.
-                            if (reservations.putIfAbsent(grpKey, reservation) != null)
-                                throw new IllegalStateException();
                         }
                     }
                 }