You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/11 03:03:57 UTC
[25/26] incubator-ignite git commit: ignite-484-1 - group partition
reservation
ignite-484-1 - group partition reservation
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1efefbd9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1efefbd9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1efefbd9
Branch: refs/heads/ignite-484-1
Commit: 1efefbd9497fdf31c36ff27634a5e13d5c74ea9a
Parents: ef50a38
Author: S.Vladykin <sv...@gridgain.com>
Authored: Thu Jun 11 04:02:29 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Thu Jun 11 04:02:29 2015 +0300
----------------------------------------------------------------------
.../distributed/dht/GridDhtLocalPartition.java | 58 +++++-
.../dht/GridDhtPartitionsReservation.java | 181 +++++++++++++++++++
.../cache/distributed/dht/GridReservable.java | 35 ++++
.../query/h2/twostep/GridMapQueryExecutor.java | 144 +++++++++++----
4 files changed, 374 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1efefbd9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index dc4982e..e858e42 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -47,7 +47,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
/**
* Key partition.
*/
-public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition> {
+public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, GridReservable {
/** Maximum size for delete queue. */
public static final int MAX_DELETE_QUEUE_SIZE = Integer.getInteger(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE,
200_000);
@@ -63,7 +63,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>
/** State. */
@GridToStringExclude
- private AtomicStampedReference<GridDhtPartitionState> state =
+ private final AtomicStampedReference<GridDhtPartitionState> state =
new AtomicStampedReference<>(MOVING, 0);
/** Rent future. */
@@ -94,7 +94,10 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>
private final LongAdder8 mapPubSize = new LongAdder8();
/** Remove queue. */
- private GridCircularBuffer<T2<KeyCacheObject, GridCacheVersion>> rmvQueue;
+ private final GridCircularBuffer<T2<KeyCacheObject, GridCacheVersion>> rmvQueue;
+
+ /** Group reservations. */
+ private final CopyOnWriteArrayList<GridDhtPartitionsReservation> reservations = new CopyOnWriteArrayList<>();
/**
* @param cctx Context.
@@ -131,6 +134,31 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>
}
/**
+ * Adds group reservation to this partition.
+ *
+ * @param r Reservation.
+ * @return {@code true} If reservation added successfully.
+ */
+ public boolean addReservation(GridDhtPartitionsReservation r) {
+ assert state.getReference() != EVICTED : "we can reserve only active partitions";
+ assert state.getStamp() != 0 : "partition must be already reserved before adding group reservation";
+
+ if (!reservations.addIfAbsent(r))
+ return false;
+
+ r.register(this);
+
+ return true;
+ }
+
+ /**
+ * @param r Reservation.
+ */
+ public void removeReservation(GridDhtPartitionsReservation r) {
+ reservations.remove(r);
+ }
+
+ /**
* @return Partition ID.
*/
public int id() {
@@ -334,7 +362,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>
*
* @return {@code True} if reserved.
*/
- public boolean reserve() {
+ @Override public boolean reserve() {
while (true) {
int reservations = state.getStamp();
@@ -351,7 +379,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>
/**
* Releases previously reserved partition.
*/
- public void release() {
+ @Override public void release() {
while (true) {
int reservations = state.getStamp();
@@ -441,7 +469,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>
* @param updateSeq Update sequence.
* @return Future for evict attempt.
*/
- private IgniteInternalFuture<Boolean> tryEvictAsync(boolean updateSeq) {
+ IgniteInternalFuture<Boolean> tryEvictAsync(boolean updateSeq) {
if (map.isEmpty() && !GridQueryProcessor.isEnabled(cctx.config()) &&
state.compareAndSet(RENTING, EVICTED, 0, 0)) {
if (log.isDebugEnabled())
@@ -471,12 +499,26 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>
}
/**
+ * @return {@code true} If there is a group reservation.
+ */
+ private boolean groupReserved() {
+ boolean reserved = false;
+
+ for (GridDhtPartitionsReservation reservation : reservations) {
+ if (!reservation.canEvict())
+ reserved = true;
+ }
+
+ return reserved;
+ }
+
+ /**
* @param updateSeq Update sequence.
* @return {@code True} if entry has been transitioned to state EVICTED.
*/
- private boolean tryEvict(boolean updateSeq) {
+ boolean tryEvict(boolean updateSeq) {
// Attempt to evict partition entries from cache.
- if (state.getReference() == RENTING && state.getStamp() == 0)
+ if (state.getReference() == RENTING && state.getStamp() == 0 && !groupReserved())
clearAll();
if (map.isEmpty() && state.compareAndSet(RENTING, EVICTED, 0, 0)) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1efefbd9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
new file mode 100644
index 0000000..fcd6088
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+
+/**
+ * Reservation mechanism for multiple partitions allowing to do a reservation in one operation.
+ */
+public class GridDhtPartitionsReservation implements GridReservable {
+ /** */
+ private final GridCacheContext<?,?> cctx;
+
+ /** */
+ private final AffinityTopologyVersion topVer;
+
+ /** */
+ private final List<GridDhtLocalPartition> parts = new ArrayList<>();
+
+ /** */
+ private final AtomicInteger reservations = new AtomicInteger();
+
+ /** */
+ private final IgniteInClosure<GridDhtPartitionsReservation> finalize;
+
+ /**
+ * @param topVer AffinityTopologyVersion version.
+ * @param cctx Cache context.
+ * @param finalize Finalizing closure.
+ */
+ public GridDhtPartitionsReservation(
+ AffinityTopologyVersion topVer,
+ GridCacheContext<?,?> cctx,
+ IgniteInClosure<GridDhtPartitionsReservation> finalize) {
+ assert topVer != null;
+ assert cctx != null;
+
+ this.topVer = topVer;
+ this.cctx = cctx;
+ this.finalize = finalize;
+ }
+
+ /**
+ * @return Topology version.
+ */
+ public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /**
+ * @return Cache context.
+ */
+ public GridCacheContext<?,?> cacheContext() {
+ return cctx;
+ }
+
+ /**
+ * Registers partition for this group reservation.
+ *
+ * @param part Partition.
+ */
+ public void register(GridDhtLocalPartition part) {
+ parts.add(part);
+ }
+
+ /**
+ * Reserves all the registered partitions.
+ *
+ * @return {@code true} If succeeded.
+ */
+ @Override public boolean reserve() {
+ for (;;) {
+ int r = reservations.get();
+
+ if (r == -1) // Invalidated by successful canEvict call.
+ return false;
+
+ assert r >= 0 : r;
+
+ if (reservations.compareAndSet(r, r + 1))
+ return true;
+ }
+ }
+
+ /**
+ * Releases all the registered partitions.
+ */
+ @Override public void release() {
+ for (;;) {
+ int r = reservations.get();
+
+ if (r <= 0)
+ throw new IllegalStateException("Method 'reserve' must be called before 'release'.");
+
+ if (reservations.compareAndSet(r, r - 1)) {
+ // If it was the last reservation and topology version changed -> attempt to evict partitions.
+ if (r == 1 && !topVer.equals(cctx.topology().topologyVersion())) {
+ for (GridDhtLocalPartition part : parts) {
+ if (part.state() == RENTING)
+ part.tryEvictAsync(true);
+ }
+ }
+
+ return;
+ }
+ }
+ }
+
+ /**
+ * Must be checked in {@link GridDhtLocalPartition#tryEvict(boolean)}.
+ * If returns {@code true} then probably partition will be evicted (or at least cleared),
+ * so this reservation object becomes invalid and must be dropped from the partition.
+ * Also this means that after returning {@code true} here method {@link #reserve()} can not
+ * return {@code true} anymore.
+ *
+ * @return {@code true} If this reservation is NOT reserved and partition CAN be evicted.
+ */
+ public boolean canEvict() {
+ int r = reservations.get();
+
+ assert r >= -1 : r;
+
+ if (r != 0)
+ return r == -1;
+
+ if (reservations.compareAndSet(0, -1)) {
+ // Remove our self.
+ for (GridDhtLocalPartition part : parts)
+ part.removeReservation(this);
+
+ if (finalize != null)
+ finalize.apply(this);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ GridDhtPartitionsReservation that = (GridDhtPartitionsReservation)o;
+
+ return topVer.equals(that.topVer) && cctx == that.cctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ String cache = cctx.name();
+
+ return 31 * topVer.hashCode() + cache == null ? 0 : cache.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1efefbd9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java
new file mode 100644
index 0000000..326b077
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+/**
+ * Reservations support.
+ */
+public interface GridReservable {
+ /**
+ * Reserves.
+ *
+ * @return {@code true} If reserved successfully.
+ */
+ public boolean reserve();
+
+ /**
+ * Releases.
+ */
+ public void release();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1efefbd9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index d9e9066..a8bc6e0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -87,6 +87,10 @@ public class GridMapQueryExecutor {
/** */
private final GridSpinBusyLock busyLock;
+ /** */
+ private final ConcurrentMap<T2<String, AffinityTopologyVersion>, GridReservable> reservations =
+ new ConcurrentHashMap8<>();
+
/**
* @param busyLock Busy lock.
*/
@@ -202,15 +206,13 @@ public class GridMapQueryExecutor {
/**
* @param cacheName Cache name.
- * @param topVer Topology version.
* @return Cache context or {@code null} if none.
*/
- @Nullable private GridCacheContext<?,?> cacheContext(String cacheName, AffinityTopologyVersion topVer) {
+ @Nullable private GridCacheContext<?,?> cacheContext(String cacheName) {
GridCacheAdapter<?,?> cache = ctx.cache().internalCache(cacheName);
- if (cache == null) // Since we've waited for for cache affinity updates, this must be a misconfiguration.
- throw new CacheException("Cache does not exist on current node: [nodeId=" + ctx.localNodeId() +
- ", cache=" + cacheName + ", topVer=" + topVer + "]");
+ if (cache == null)
+ return null;
return cache.context();
}
@@ -218,17 +220,23 @@ public class GridMapQueryExecutor {
/**
* @param cacheNames Cache names.
* @param topVer Topology version.
- * @param parts Explicit partitions.
+ * @param explicitParts Explicit partitions list.
* @param reserved Reserved list.
* @return {@code true} If all the needed partitions successfully reserved.
* @throws IgniteCheckedException If failed.
*/
- private boolean reservePartitions(Collection<String> cacheNames, AffinityTopologyVersion topVer, final int[] parts,
- List<GridDhtLocalPartition> reserved) throws IgniteCheckedException {
- Collection<Integer> partIds = parts == null ? null : wrap(parts);
+ private boolean reservePartitions(
+ Collection<String> cacheNames,
+ AffinityTopologyVersion topVer,
+ final int[] explicitParts,
+ List<GridReservable> reserved
+ ) throws IgniteCheckedException {
+ assert topVer != null;
+
+ Collection<Integer> partIds = wrap(explicitParts);
for (String cacheName : cacheNames) {
- GridCacheContext<?, ?> cctx = cacheContext(cacheName, topVer);
+ GridCacheContext<?, ?> cctx = cacheContext(cacheName);
if (cctx == null) // Cache was not found, probably was not deployed yet.
return false;
@@ -236,35 +244,75 @@ public class GridMapQueryExecutor {
if (cctx.isLocal())
continue;
- int partsCnt = cctx.affinity().partitions();
+ final T2<String,AffinityTopologyVersion> grpKey = new T2<>(cctx.name(), topVer);
- if (cctx.isReplicated()) { // Check all the partitions are in owning state for replicated cache.
- for (int p = 0; p < partsCnt; p++) {
- GridDhtLocalPartition part = cctx.topology().localPartition(p, topVer, false);
+ GridReservable r = reservations.get(grpKey);
- if (part == null || part.state() != OWNING)
- return false;
+ if (explicitParts == null && r != null) { // Try to reserve group partition if any and no explicits.
+ if (!r.reserve())
+ return false; // We need explicit partitions here -> retry.
- // We don't need to reserve partitions because they will not be evicted in replicated caches.
- }
+ reserved.add(r);
}
- else { // Reserve primary partitions for partitioned cache.
- if (parts == null)
- partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
+ else { // Try to reserve partitions one by one.
+ int partsCnt = cctx.affinity().partitions();
- for (int partId : partIds) {
- if (partId >= partsCnt)
- break; // We can have more partitions because `parts` array is shared for all caches.
+ if (cctx.isReplicated()) { // Check all the partitions are in owning state for replicated cache.
+ if (r == null) { // Check only once.
+ for (int p = 0; p < partsCnt; p++) {
+ GridDhtLocalPartition part = cctx.topology().localPartition(p, topVer, false);
- GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false);
+ // We don't need to reserve partitions because they will not be evicted in replicated caches.
+ if (part == null || part.state() != OWNING)
+ return false;
- if (part == null || part.state() != OWNING || !part.reserve())
- return false;
+ // Mark that we checked this replicated cache.
+ reservations.putIfAbsent(grpKey, ReplicatedReservation.INSTANCE);
+ }
+ }
+ }
+ else { // Reserve primary partitions for partitioned cache (if no explicit given).
+ if (explicitParts == null)
+ partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
- reserved.add(part);
+ for (int partId : partIds) {
+ GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false);
- if (part.state() != OWNING)
- return false;
+ if (part == null || part.state() != OWNING || !part.reserve())
+ return false;
+
+ reserved.add(part);
+
+ // Double check that we are still in owning state and partition contents are not cleared.
+ if (part.state() != OWNING)
+ return false;
+ }
+
+ if (explicitParts == null) {
+ // We reserved all the primary partitions for cache, attempt to add group reservation.
+ GridDhtPartitionsReservation reservation = new GridDhtPartitionsReservation(topVer, cctx,
+ new CI1<GridDhtPartitionsReservation>() {
+ @Override public void apply(GridDhtPartitionsReservation r) {
+ reservations.remove(grpKey, r);
+ }
+ });
+
+ for (int p = reserved.size() - partIds.size(); p < reserved.size(); p++) {
+ if (!((GridDhtLocalPartition)reserved.get(p)).addReservation(reservation)) {
+ // Can fail to add only on the first partition because of the same order of partitions.
+ assert p == reserved.size() - partIds.size() : p;
+
+ reservation = null;
+
+ break;
+ }
+ }
+
+ if (reservation != null) { // If we were able to add reservation to all partitions, publish it.
+ if (reservations.putIfAbsent(grpKey, reservation) != null)
+ throw new IllegalStateException();
+ }
+ }
}
}
}
@@ -277,7 +325,10 @@ public class GridMapQueryExecutor {
* @return Collection wrapper.
*/
private static Collection<Integer> wrap(final int[] ints) {
- if (F.isEmpty(ints))
+ if (ints == null)
+ return null;
+
+ if (ints.length == 0)
return Collections.emptySet();
return new AbstractCollection<Integer>() {
@@ -317,7 +368,7 @@ public class GridMapQueryExecutor {
QueryResults qr = null;
- List<GridDhtLocalPartition> reserved = new ArrayList<>();
+ List<GridReservable> reserved = new ArrayList<>();
try {
// Unmarshall query params.
@@ -343,7 +394,7 @@ public class GridMapQueryExecutor {
final AffinityTopologyVersion topVer = req.topologyVersion();
if (topVer != null) {
- // Reserve primary partitions.
+ // Reserve primary for topology version or explicit partitions.
if (!reservePartitions(caches, topVer, req.partitions(), reserved)) {
sendRetry(node, req.requestId());
@@ -352,7 +403,10 @@ public class GridMapQueryExecutor {
}
// Prepare to run queries.
- GridCacheContext<?,?> mainCctx = cacheContext(req.space(), topVer);
+ GridCacheContext<?,?> mainCctx = cacheContext(req.space());
+
+ if (mainCctx == null)
+ throw new CacheException("Cache was destroyed: " + req.space());
qr = new QueryResults(req.requestId(), qrys.size(), mainCctx);
@@ -420,8 +474,8 @@ public class GridMapQueryExecutor {
h2.setFilters(null);
// Release reserved partitions.
- for (GridDhtLocalPartition part : reserved)
- part.release();
+ for (GridReservable r : reserved)
+ r.release();
}
}
@@ -738,4 +792,22 @@ public class GridMapQueryExecutor {
U.close(stmt, log);
}
}
+
+ /**
+ * Fake reservation object for replicated caches.
+ */
+ private static class ReplicatedReservation implements GridReservable {
+ /** */
+ static final ReplicatedReservation INSTANCE = new ReplicatedReservation();
+
+ /** {@inheritDoc} */
+ @Override public boolean reserve() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void release() {
+ // No-op.
+ }
+ }
}