You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/18 08:54:28 UTC

[21/50] incubator-ignite git commit: ignite-484-1 - group partition reservation

ignite-484-1 - group partition reservation


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1efefbd9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1efefbd9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1efefbd9

Branch: refs/heads/ignite-sprint-6
Commit: 1efefbd9497fdf31c36ff27634a5e13d5c74ea9a
Parents: ef50a38
Author: S.Vladykin <sv...@gridgain.com>
Authored: Thu Jun 11 04:02:29 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Thu Jun 11 04:02:29 2015 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtLocalPartition.java  |  58 +++++-
 .../dht/GridDhtPartitionsReservation.java       | 181 +++++++++++++++++++
 .../cache/distributed/dht/GridReservable.java   |  35 ++++
 .../query/h2/twostep/GridMapQueryExecutor.java  | 144 +++++++++++----
 4 files changed, 374 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1efefbd9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index dc4982e..e858e42 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -47,7 +47,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 /**
  * Key partition.
  */
-public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition> {
+public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, GridReservable {
     /** Maximum size for delete queue. */
     public static final int MAX_DELETE_QUEUE_SIZE = Integer.getInteger(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE,
         200_000);
@@ -63,7 +63,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>
 
     /** State. */
     @GridToStringExclude
-    private AtomicStampedReference<GridDhtPartitionState> state =
+    private final AtomicStampedReference<GridDhtPartitionState> state =
         new AtomicStampedReference<>(MOVING, 0);
 
     /** Rent future. */
@@ -94,7 +94,10 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>
     private final LongAdder8 mapPubSize = new LongAdder8();
 
     /** Remove queue. */
-    private GridCircularBuffer<T2<KeyCacheObject, GridCacheVersion>> rmvQueue;
+    private final GridCircularBuffer<T2<KeyCacheObject, GridCacheVersion>> rmvQueue;
+
+    /** Group reservations. */
+    private final CopyOnWriteArrayList<GridDhtPartitionsReservation> reservations = new CopyOnWriteArrayList<>();
 
     /**
      * @param cctx Context.
@@ -131,6 +134,31 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>
     }
 
     /**
+     * Adds group reservation to this partition.
+     *
+     * @param r Reservation.
+     * @return {@code true} If reservation added successfully.
+     */
+    public boolean addReservation(GridDhtPartitionsReservation r) {
+        assert state.getReference() != EVICTED : "we can reserve only active partitions";
+        assert state.getStamp() != 0 : "partition must be already reserved before adding group reservation";
+
+        if (!reservations.addIfAbsent(r))
+            return false;
+
+        r.register(this);
+
+        return true;
+    }
+
+    /**
+     * @param r Reservation.
+     */
+    public void removeReservation(GridDhtPartitionsReservation r) {
+        reservations.remove(r);
+    }
+
+    /**
      * @return Partition ID.
      */
     public int id() {
@@ -334,7 +362,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>
      *
      * @return {@code True} if reserved.
      */
-    public boolean reserve() {
+    @Override public boolean reserve() {
         while (true) {
             int reservations = state.getStamp();
 
@@ -351,7 +379,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>
     /**
      * Releases previously reserved partition.
      */
-    public void release() {
+    @Override public void release() {
         while (true) {
             int reservations = state.getStamp();
 
@@ -441,7 +469,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>
      * @param updateSeq Update sequence.
      * @return Future for evict attempt.
      */
-    private IgniteInternalFuture<Boolean> tryEvictAsync(boolean updateSeq) {
+    IgniteInternalFuture<Boolean> tryEvictAsync(boolean updateSeq) {
         if (map.isEmpty() && !GridQueryProcessor.isEnabled(cctx.config()) &&
             state.compareAndSet(RENTING, EVICTED, 0, 0)) {
             if (log.isDebugEnabled())
@@ -471,12 +499,26 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>
     }
 
     /**
+     * @return {@code true} If there is a group reservation.
+     */
+    private boolean groupReserved() {
+        boolean reserved = false;
+
+        for (GridDhtPartitionsReservation reservation : reservations) {
+            if (!reservation.canEvict())
+                reserved = true;
+        }
+
+        return reserved;
+    }
+
+    /**
      * @param updateSeq Update sequence.
      * @return {@code True} if entry has been transitioned to state EVICTED.
      */
-    private boolean tryEvict(boolean updateSeq) {
+    boolean tryEvict(boolean updateSeq) {
         // Attempt to evict partition entries from cache.
-        if (state.getReference() == RENTING && state.getStamp() == 0)
+        if (state.getReference() == RENTING && state.getStamp() == 0 && !groupReserved())
             clearAll();
 
         if (map.isEmpty() && state.compareAndSet(RENTING, EVICTED, 0, 0)) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1efefbd9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
new file mode 100644
index 0000000..fcd6088
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+
+/**
+ * Reservation mechanism for multiple partitions allowing to do a reservation in one operation.
+ */
+public class GridDhtPartitionsReservation implements GridReservable {
+    /** */
+    private final GridCacheContext<?,?> cctx;
+
+    /** */
+    private final AffinityTopologyVersion topVer;
+
+    /** */
+    private final List<GridDhtLocalPartition> parts = new ArrayList<>();
+
+    /** */
+    private final AtomicInteger reservations = new AtomicInteger();
+
+    /** */
+    private final IgniteInClosure<GridDhtPartitionsReservation> finalize;
+
+    /**
+     * @param topVer AffinityTopologyVersion version.
+     * @param cctx Cache context.
+     * @param finalize Finalizing closure.
+     */
+    public GridDhtPartitionsReservation(
+        AffinityTopologyVersion topVer,
+        GridCacheContext<?,?> cctx,
+        IgniteInClosure<GridDhtPartitionsReservation> finalize) {
+        assert topVer != null;
+        assert cctx != null;
+
+        this.topVer = topVer;
+        this.cctx = cctx;
+        this.finalize = finalize;
+    }
+
+    /**
+     * @return Topology version.
+     */
+    public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * @return Cache context.
+     */
+    public GridCacheContext<?,?> cacheContext() {
+        return cctx;
+    }
+
+    /**
+     * Registers partition for this group reservation.
+     *
+     * @param part Partition.
+     */
+    public void register(GridDhtLocalPartition part) {
+        parts.add(part);
+    }
+
+    /**
+     * Reserves all the registered partitions.
+     *
+     * @return {@code true} If succeeded.
+     */
+    @Override public boolean reserve() {
+        for (;;) {
+            int r = reservations.get();
+
+            if (r == -1) // Invalidated by successful canEvict call.
+                return false;
+
+            assert r >= 0 : r;
+
+            if (reservations.compareAndSet(r, r + 1))
+                return true;
+        }
+    }
+
+    /**
+     * Releases all the registered partitions.
+     */
+    @Override public void release() {
+        for (;;) {
+            int r = reservations.get();
+
+            if (r <= 0)
+                throw new IllegalStateException("Method 'reserve' must be called before 'release'.");
+
+            if (reservations.compareAndSet(r, r - 1)) {
+                // If it was the last reservation and topology version changed -> attempt to evict partitions.
+                if (r == 1 && !topVer.equals(cctx.topology().topologyVersion())) {
+                    for (GridDhtLocalPartition part : parts) {
+                        if (part.state() == RENTING)
+                            part.tryEvictAsync(true);
+                    }
+                }
+
+                return;
+            }
+        }
+    }
+
+    /**
+     * Must be checked in {@link GridDhtLocalPartition#tryEvict(boolean)}.
+     * If returns {@code true} then probably partition will be evicted (or at least cleared),
+     * so this reservation object becomes invalid and must be dropped from the partition.
+     * Also this means that after returning {@code true} here method {@link #reserve()} can not
+     * return {@code true} anymore.
+     *
+     * @return {@code true} If this reservation is NOT reserved and partition CAN be evicted.
+     */
+    public boolean canEvict() {
+        int r = reservations.get();
+
+        assert r >= -1 : r;
+
+        if (r != 0)
+            return r == -1;
+
+        if (reservations.compareAndSet(0, -1)) {
+            // Remove our self.
+            for (GridDhtLocalPartition part : parts)
+                part.removeReservation(this);
+
+            if (finalize != null)
+                finalize.apply(this);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        GridDhtPartitionsReservation that = (GridDhtPartitionsReservation)o;
+
+        return topVer.equals(that.topVer) && cctx == that.cctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        String cache = cctx.name();
+
+        return 31 * topVer.hashCode() + cache == null ? 0 : cache.hashCode();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1efefbd9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java
new file mode 100644
index 0000000..326b077
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+/**
+ * Reservations support.
+ */
+public interface GridReservable {
+    /**
+     * Reserves.
+     *
+     * @return {@code true} If reserved successfully.
+     */
+    public boolean reserve();
+
+    /**
+     * Releases.
+     */
+    public void release();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1efefbd9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index d9e9066..a8bc6e0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -87,6 +87,10 @@ public class GridMapQueryExecutor {
     /** */
     private final GridSpinBusyLock busyLock;
 
+    /** */
+    private final ConcurrentMap<T2<String, AffinityTopologyVersion>, GridReservable> reservations =
+        new ConcurrentHashMap8<>();
+
     /**
      * @param busyLock Busy lock.
      */
@@ -202,15 +206,13 @@ public class GridMapQueryExecutor {
 
     /**
      * @param cacheName Cache name.
-     * @param topVer Topology version.
      * @return Cache context or {@code null} if none.
      */
-    @Nullable private GridCacheContext<?,?> cacheContext(String cacheName, AffinityTopologyVersion topVer) {
+    @Nullable private GridCacheContext<?,?> cacheContext(String cacheName) {
         GridCacheAdapter<?,?> cache = ctx.cache().internalCache(cacheName);
 
-        if (cache == null) // Since we've waited for for cache affinity updates, this must be a misconfiguration.
-            throw new CacheException("Cache does not exist on current node: [nodeId=" + ctx.localNodeId() +
-                ", cache=" + cacheName + ", topVer=" + topVer + "]");
+        if (cache == null)
+            return null;
 
         return cache.context();
     }
@@ -218,17 +220,23 @@ public class GridMapQueryExecutor {
     /**
      * @param cacheNames Cache names.
      * @param topVer Topology version.
-     * @param parts Explicit partitions.
+     * @param explicitParts Explicit partitions list.
      * @param reserved Reserved list.
      * @return {@code true} If all the needed partitions successfully reserved.
      * @throws IgniteCheckedException If failed.
      */
-    private boolean reservePartitions(Collection<String> cacheNames, AffinityTopologyVersion topVer, final int[] parts,
-        List<GridDhtLocalPartition> reserved) throws IgniteCheckedException {
-        Collection<Integer> partIds = parts == null ? null : wrap(parts);
+    private boolean reservePartitions(
+        Collection<String> cacheNames,
+        AffinityTopologyVersion topVer,
+        final int[] explicitParts,
+        List<GridReservable> reserved
+    ) throws IgniteCheckedException {
+        assert topVer != null;
+
+        Collection<Integer> partIds = wrap(explicitParts);
 
         for (String cacheName : cacheNames) {
-            GridCacheContext<?, ?> cctx = cacheContext(cacheName, topVer);
+            GridCacheContext<?, ?> cctx = cacheContext(cacheName);
 
             if (cctx == null) // Cache was not found, probably was not deployed yet.
                 return false;
@@ -236,35 +244,75 @@ public class GridMapQueryExecutor {
             if (cctx.isLocal())
                 continue;
 
-            int partsCnt = cctx.affinity().partitions();
+            final T2<String,AffinityTopologyVersion> grpKey = new T2<>(cctx.name(), topVer);
 
-            if (cctx.isReplicated()) { // Check all the partitions are in owning state for replicated cache.
-                for (int p = 0; p < partsCnt; p++) {
-                    GridDhtLocalPartition part = cctx.topology().localPartition(p, topVer, false);
+            GridReservable r = reservations.get(grpKey);
 
-                    if (part == null || part.state() != OWNING)
-                        return false;
+            if (explicitParts == null && r != null) { // Try to reserve group partition if any and no explicits.
+                if (!r.reserve())
+                    return false; // We need explicit partitions here -> retry.
 
-                    // We don't need to reserve partitions because they will not be evicted in replicated caches.
-                }
+                reserved.add(r);
             }
-            else { // Reserve primary partitions for partitioned cache.
-                if (parts == null)
-                    partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
+            else { // Try to reserve partitions one by one.
+                int partsCnt = cctx.affinity().partitions();
 
-                for (int partId : partIds) {
-                    if (partId >= partsCnt)
-                        break; // We can have more partitions because `parts` array is shared for all caches.
+                if (cctx.isReplicated()) { // Check all the partitions are in owning state for replicated cache.
+                    if (r == null) { // Check only once.
+                        for (int p = 0; p < partsCnt; p++) {
+                            GridDhtLocalPartition part = cctx.topology().localPartition(p, topVer, false);
 
-                    GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false);
+                            // We don't need to reserve partitions because they will not be evicted in replicated caches.
+                            if (part == null || part.state() != OWNING)
+                                return false;
 
-                    if (part == null || part.state() != OWNING || !part.reserve())
-                        return false;
+                            // Mark that we checked this replicated cache.
+                            reservations.putIfAbsent(grpKey, ReplicatedReservation.INSTANCE);
+                        }
+                    }
+                }
+                else { // Reserve primary partitions for partitioned cache (if no explicit given).
+                    if (explicitParts == null)
+                        partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
 
-                    reserved.add(part);
+                    for (int partId : partIds) {
+                        GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false);
 
-                    if (part.state() != OWNING)
-                        return false;
+                        if (part == null || part.state() != OWNING || !part.reserve())
+                            return false;
+
+                        reserved.add(part);
+
+                        // Double check that we are still in owning state and partition contents are not cleared.
+                        if (part.state() != OWNING)
+                            return false;
+                    }
+
+                    if (explicitParts == null) {
+                        // We reserved all the primary partitions for cache, attempt to add group reservation.
+                        GridDhtPartitionsReservation reservation = new GridDhtPartitionsReservation(topVer, cctx,
+                            new CI1<GridDhtPartitionsReservation>() {
+                                @Override public void apply(GridDhtPartitionsReservation r) {
+                                    reservations.remove(grpKey, r);
+                                }
+                            });
+
+                        for (int p = reserved.size() - partIds.size(); p < reserved.size(); p++) {
+                            if (!((GridDhtLocalPartition)reserved.get(p)).addReservation(reservation)) {
+                                // Can fail to add only on the first partition because of the same order of partitions.
+                                assert p == reserved.size() - partIds.size() : p;
+
+                                reservation = null;
+
+                                break;
+                            }
+                        }
+
+                        if (reservation != null) { // If we were able to add reservation to all partitions, publish it.
+                            if (reservations.putIfAbsent(grpKey, reservation) != null)
+                                throw new IllegalStateException();
+                        }
+                    }
                 }
             }
         }
@@ -277,7 +325,10 @@ public class GridMapQueryExecutor {
      * @return Collection wrapper.
      */
     private static Collection<Integer> wrap(final int[] ints) {
-        if (F.isEmpty(ints))
+        if (ints == null)
+            return null;
+
+        if (ints.length == 0)
             return Collections.emptySet();
 
         return new AbstractCollection<Integer>() {
@@ -317,7 +368,7 @@ public class GridMapQueryExecutor {
 
         QueryResults qr = null;
 
-        List<GridDhtLocalPartition> reserved = new ArrayList<>();
+        List<GridReservable> reserved = new ArrayList<>();
 
         try {
             // Unmarshall query params.
@@ -343,7 +394,7 @@ public class GridMapQueryExecutor {
             final AffinityTopologyVersion topVer = req.topologyVersion();
 
             if (topVer != null) {
-                // Reserve primary partitions.
+                // Reserve primary for topology version or explicit partitions.
                 if (!reservePartitions(caches, topVer, req.partitions(), reserved)) {
                     sendRetry(node, req.requestId());
 
@@ -352,7 +403,10 @@ public class GridMapQueryExecutor {
             }
 
             // Prepare to run queries.
-            GridCacheContext<?,?> mainCctx = cacheContext(req.space(), topVer);
+            GridCacheContext<?,?> mainCctx = cacheContext(req.space());
+
+            if (mainCctx == null)
+                throw new CacheException("Cache was destroyed: " + req.space());
 
             qr = new QueryResults(req.requestId(), qrys.size(), mainCctx);
 
@@ -420,8 +474,8 @@ public class GridMapQueryExecutor {
             h2.setFilters(null);
 
             // Release reserved partitions.
-            for (GridDhtLocalPartition part : reserved)
-                part.release();
+            for (GridReservable r : reserved)
+                r.release();
         }
     }
 
@@ -738,4 +792,22 @@ public class GridMapQueryExecutor {
             U.close(stmt, log);
         }
     }
+
+    /**
+     * Fake reservation object for replicated caches.
+     */
+    private static class ReplicatedReservation implements GridReservable {
+        /** */
+        static final ReplicatedReservation INSTANCE = new ReplicatedReservation();
+
+        /** {@inheritDoc} */
+        @Override public boolean reserve() {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void release() {
+            // No-op.
+        }
+    }
 }