You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/12 16:25:39 UTC
[49/50] [abbrv] incubator-ignite git commit: ignite-484-1 - refactor
ignite-484-1 - refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4a534059
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4a534059
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4a534059
Branch: refs/heads/ignite-484-1
Commit: 4a534059e4bf46b061e1272d432fd00c6f87acb0
Parents: 2b7dc3b
Author: S.Vladykin <sv...@gridgain.com>
Authored: Fri Jun 12 17:21:14 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Fri Jun 12 17:21:14 2015 +0300
----------------------------------------------------------------------
.../distributed/dht/GridDhtLocalPartition.java | 10 +-
.../dht/GridDhtPartitionsReservation.java | 169 ++++++++++++++-----
.../query/h2/twostep/GridMapQueryExecutor.java | 24 +--
3 files changed, 141 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a534059/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 3670b8e..018ffd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -143,19 +143,15 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
assert state.getReference() != EVICTED : "we can reserve only active partitions";
assert state.getStamp() != 0 : "partition must be already reserved before adding group reservation";
- if (!reservations.addIfAbsent(r))
- return false;
-
- r.register(this);
-
- return true;
+ return reservations.addIfAbsent(r);
}
/**
* @param r Reservation.
*/
public void removeReservation(GridDhtPartitionsReservation r) {
- reservations.remove(r);
+ if (!reservations.remove(r))
+ throw new IllegalStateException("Reservation was already removed.");
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a534059/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
index fcd6088..71a1859 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
@@ -19,70 +19,126 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
import java.util.*;
import java.util.concurrent.atomic.*;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING;
/**
* Reservation mechanism for multiple partitions allowing to do a reservation in one operation.
*/
public class GridDhtPartitionsReservation implements GridReservable {
/** */
+ private static final GridDhtLocalPartition[] EMPTY = {};
+
+ /** */
+ private final Object appKey;
+
+ /** */
private final GridCacheContext<?,?> cctx;
/** */
private final AffinityTopologyVersion topVer;
/** */
- private final List<GridDhtLocalPartition> parts = new ArrayList<>();
+ private final AtomicReference<GridDhtLocalPartition[]> parts = new AtomicReference<>();
/** */
private final AtomicInteger reservations = new AtomicInteger();
/** */
- private final IgniteInClosure<GridDhtPartitionsReservation> finalize;
+ private volatile CI1<GridDhtPartitionsReservation> unpublish;
/**
* @param topVer AffinityTopologyVersion version.
* @param cctx Cache context.
- * @param finalize Finalizing closure.
+ * @param appKey Application key for reservation.
*/
- public GridDhtPartitionsReservation(
- AffinityTopologyVersion topVer,
- GridCacheContext<?,?> cctx,
- IgniteInClosure<GridDhtPartitionsReservation> finalize) {
+ public GridDhtPartitionsReservation(AffinityTopologyVersion topVer, GridCacheContext<?,?> cctx, Object appKey) {
assert topVer != null;
assert cctx != null;
+ assert appKey != null;
this.topVer = topVer;
this.cctx = cctx;
- this.finalize = finalize;
+ this.appKey = appKey;
}
/**
- * @return Topology version.
+ * Registers all the given partitions for this reservation.
+ *
+ * @param parts Partitions.
+ * @return {@code true} If registration succeeded and this reservation can be published.
*/
- public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
+ public boolean register(Collection<? extends GridReservable> parts) {
+ assert !F.isEmpty(parts) : "empty partitions list";
- /**
- * @return Cache context.
- */
- public GridCacheContext<?,?> cacheContext() {
- return cctx;
+ GridDhtLocalPartition[] arr = new GridDhtLocalPartition[parts.size()];
+
+ int i = 0;
+ int prevPart = -1;
+ boolean sorted = true; // Most probably it is a sorted list.
+
+ for (GridReservable part : parts) {
+ arr[i] = (GridDhtLocalPartition)part;
+
+ if (sorted) { // Make sure it will be a sorted array.
+ int id = arr[i].id();
+
+ if (id <= prevPart)
+ sorted = false;
+
+ prevPart = id;
+ }
+
+ i++;
+ }
+
+ if (!sorted)
+ Arrays.sort(arr);
+
+ i = 0;
+ prevPart = -1;
+
+ // Register in correct sort order.
+ for (GridDhtLocalPartition part : arr) {
+ if (prevPart == part.id())
+ throw new IllegalStateException("Duplicated partitions.");
+
+ prevPart = part.id();
+
+ if (!part.addReservation(this)) {
+ if (i != 0)
+ throw new IllegalStateException(
+ "Trying to reserve different sets of partitions for the same topology version.");
+
+ return false;
+ }
+
+ i++;
+ }
+
+ if (!this.parts.compareAndSet(null, arr))
+ throw new IllegalStateException("Partitions can be registered only once.");
+
+ return true;
}
/**
- * Registers partition for this group reservation.
+ * Must be called when this reservation is published.
*
- * @param part Partition.
+ * @param unpublish Closure to unpublish this reservation when it will become invalid.
*/
- public void register(GridDhtLocalPartition part) {
- parts.add(part);
+ public void onPublish(CI1<GridDhtPartitionsReservation> unpublish) {
+ assert unpublish != null;
+ assert this.unpublish == null;
+
+ this.unpublish = unpublish;
+
+ if (reservations.get() == -1)
+ unregister();
}
/**
@@ -91,6 +147,8 @@ public class GridDhtPartitionsReservation implements GridReservable {
* @return {@code true} If succeeded.
*/
@Override public boolean reserve() {
+ assert parts.get() != null : "partitions must be registered before the first reserve attempt";
+
for (;;) {
int r = reservations.get();
@@ -105,6 +163,25 @@ public class GridDhtPartitionsReservation implements GridReservable {
}
/**
+ * @param parts Partitions.
+ */
+ private static void tryEvict(GridDhtLocalPartition[] parts) {
+ if (parts == null) // Can be not initialized yet.
+ return ;
+
+ for (GridDhtLocalPartition part : parts)
+ tryEvict(part);
+ }
+
+ /**
+ * @param part Partition.
+ */
+ private static void tryEvict(GridDhtLocalPartition part) {
+ if (part.state() == RENTING && part.reservations() == 0)
+ part.tryEvictAsync(true);
+ }
+
+ /**
* Releases all the registered partitions.
*/
@Override public void release() {
@@ -116,12 +193,8 @@ public class GridDhtPartitionsReservation implements GridReservable {
if (reservations.compareAndSet(r, r - 1)) {
// If it was the last reservation and topology version changed -> attempt to evict partitions.
- if (r == 1 && !topVer.equals(cctx.topology().topologyVersion())) {
- for (GridDhtLocalPartition part : parts) {
- if (part.state() == RENTING)
- part.tryEvictAsync(true);
- }
- }
+ if (r == 1 && !topVer.equals(cctx.topology().topologyVersion()))
+ tryEvict(parts.get());
return;
}
@@ -129,6 +202,26 @@ public class GridDhtPartitionsReservation implements GridReservable {
}
/**
+ * Unregisters this reservation from all the partitions.
+ */
+ private void unregister() {
+ GridDhtLocalPartition[] arr = parts.get();
+
+ if (!F.isEmpty(arr) && unpublish != null && parts.compareAndSet(arr, EMPTY)) {
+ // Reverse order makes sure that addReservation on the same topVer reservation will fail on the first partition.
+ for (int i = arr.length - 1; i >= 0; i--) {
+ GridDhtLocalPartition part = arr[i];
+
+ part.removeReservation(this);
+
+ tryEvict(part);
+ }
+
+ unpublish.apply(this);
+ }
+ }
+
+ /**
* Must be checked in {@link GridDhtLocalPartition#tryEvict(boolean)}.
* If returns {@code true} then probably partition will be evicted (or at least cleared),
* so this reservation object becomes invalid and must be dropped from the partition.
@@ -146,12 +239,7 @@ public class GridDhtPartitionsReservation implements GridReservable {
return r == -1;
if (reservations.compareAndSet(0, -1)) {
- // Remove our self.
- for (GridDhtLocalPartition part : parts)
- part.removeReservation(this);
-
- if (finalize != null)
- finalize.apply(this);
+ unregister();
return true;
}
@@ -169,13 +257,18 @@ public class GridDhtPartitionsReservation implements GridReservable {
GridDhtPartitionsReservation that = (GridDhtPartitionsReservation)o;
- return topVer.equals(that.topVer) && cctx == that.cctx;
+ return cctx == that.cctx && topVer.equals(that.topVer) && appKey.equals(that.appKey);
}
/** {@inheritDoc} */
@Override public int hashCode() {
- String cache = cctx.name();
+ String name = cctx.name();
+
+ int result = name == null ? 0 : name.hashCode();
+
+ result = 31 * result + appKey.hashCode();
+ result = 31 * result + topVer.hashCode();
- return 31 * topVer.hashCode() + cache == null ? 0 : cache.hashCode();
+ return result;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a534059/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index a8bc6e0..42f01cb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -290,27 +290,17 @@ public class GridMapQueryExecutor {
if (explicitParts == null) {
// We reserved all the primary partitions for cache, attempt to add group reservation.
- GridDhtPartitionsReservation reservation = new GridDhtPartitionsReservation(topVer, cctx,
- new CI1<GridDhtPartitionsReservation>() {
+ GridDhtPartitionsReservation grp = new GridDhtPartitionsReservation(topVer, cctx, "SQL");
+
+ if (grp.register(reserved.subList(reserved.size() - partIds.size(), reserved.size()))) {
+ if (reservations.putIfAbsent(grpKey, grp) != null)
+ throw new IllegalStateException("Reservation already exists.");
+
+ grp.onPublish(new CI1<GridDhtPartitionsReservation>() {
@Override public void apply(GridDhtPartitionsReservation r) {
reservations.remove(grpKey, r);
}
});
-
- for (int p = reserved.size() - partIds.size(); p < reserved.size(); p++) {
- if (!((GridDhtLocalPartition)reserved.get(p)).addReservation(reservation)) {
- // Can fail to add only on the first partition because of the same order of partitions.
- assert p == reserved.size() - partIds.size() : p;
-
- reservation = null;
-
- break;
- }
- }
-
- if (reservation != null) { // If we were able to add reservation to all partitions, publish it.
- if (reservations.putIfAbsent(grpKey, reservation) != null)
- throw new IllegalStateException();
}
}
}