You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/03/30 11:49:48 UTC
[19/50] [abbrv] ignite git commit: IGNITE-4827: Remove compatibility
logic for 1.x versions. This closes #1654.
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index e585b56..d5f2246 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -27,12 +27,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheRebalanceMode;
@@ -54,13 +50,11 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -71,12 +65,10 @@ import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.IgniteSpiException;
import org.jetbrains.annotations.Nullable;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_LOADED;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
-import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD;
@@ -109,18 +101,6 @@ public class GridDhtPartitionDemander {
/** Last exchange future. */
private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
- /** Demand lock. */
- @Deprecated//Backward compatibility. To be removed in future.
- private final ReadWriteLock demandLock;
-
- /** DemandWorker index. */
- @Deprecated//Backward compatibility. To be removed in future.
- private final AtomicInteger dmIdx = new AtomicInteger();
-
- /** DemandWorker. */
- @Deprecated//Backward compatibility. To be removed in future.
- private volatile DemandWorker worker;
-
/** Cached rebalance topics. */
private final Map<Integer, Object> rebalanceTopics;
@@ -138,13 +118,11 @@ public class GridDhtPartitionDemander {
/**
* @param cctx Cctx.
- * @param demandLock Demand lock.
*/
- public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx, ReadWriteLock demandLock) {
+ public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx) {
assert cctx != null;
this.cctx = cctx;
- this.demandLock = demandLock;
log = cctx.logger(getClass());
@@ -184,11 +162,6 @@ public class GridDhtPartitionDemander {
rebalanceFut.onDone(false);
}
- DemandWorker dw = worker;
-
- if (dw != null)
- dw.cancel();
-
lastExchangeFut = null;
lastTimeoutObj.set(null);
@@ -466,65 +439,47 @@ public class GridDhtPartitionDemander {
GridDhtPartitionDemandMessage d = e.getValue();
- //Check remote node rebalancing API version.
- if (node.version().compareTo(GridDhtPreloader.REBALANCING_VER_2_SINCE) >= 0) {
- U.log(log, "Starting rebalancing [mode=" + cfg.getRebalanceMode() +
- ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() +
- ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]");
-
- int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
+ U.log(log, "Starting rebalancing [mode=" + cfg.getRebalanceMode() +
+ ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() +
+ ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]");
- List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
+ int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
- for (int cnt = 0; cnt < lsnrCnt; cnt++)
- sParts.add(new HashSet<Integer>());
+ List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
- Iterator<Integer> it = parts.iterator();
+ for (int cnt = 0; cnt < lsnrCnt; cnt++)
+ sParts.add(new HashSet<Integer>());
- int cnt = 0;
+ Iterator<Integer> it = parts.iterator();
- while (it.hasNext())
- sParts.get(cnt++ % lsnrCnt).add(it.next());
+ int cnt = 0;
- for (cnt = 0; cnt < lsnrCnt; cnt++) {
- if (!sParts.get(cnt).isEmpty()) {
- // Create copy.
- GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
+ while (it.hasNext())
+ sParts.get(cnt++ % lsnrCnt).add(it.next());
- initD.topic(rebalanceTopics.get(cnt));
- initD.updateSequence(fut.updateSeq);
- initD.timeout(cctx.config().getRebalanceTimeout());
-
- synchronized (fut) {
- if (!fut.isDone()) {
- // Future can be already cancelled at this moment and all failovers happened.
- // New requests will not be covered by failovers.
- cctx.io().sendOrderedMessage(node,
- rebalanceTopics.get(cnt), initD, cctx.ioPolicy(), initD.timeout());
- }
+ for (cnt = 0; cnt < lsnrCnt; cnt++) {
+ if (!sParts.get(cnt).isEmpty()) {
+ // Create copy.
+ GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
+
+ initD.topic(rebalanceTopics.get(cnt));
+ initD.updateSequence(fut.updateSeq);
+ initD.timeout(cctx.config().getRebalanceTimeout());
+
+ synchronized (fut) {
+ if (!fut.isDone()) {
+ // Future can be already cancelled at this moment and all failovers happened.
+ // New requests will not be covered by failovers.
+ cctx.io().sendOrderedMessage(node,
+ rebalanceTopics.get(cnt), initD, cctx.ioPolicy(), initD.timeout());
}
-
- if (log.isDebugEnabled())
- log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" +
- cnt + ", partitions count=" + sParts.get(cnt).size() +
- " (" + partitionsList(sParts.get(cnt)) + ")]");
}
- }
- }
- else {
- U.log(log, "Starting rebalancing (old api) [cache=" + cctx.name() +
- ", mode=" + cfg.getRebalanceMode() +
- ", fromNode=" + node.id() +
- ", partitionsCount=" + parts.size() +
- ", topology=" + fut.topologyVersion() +
- ", updateSeq=" + fut.updateSeq + "]");
- d.timeout(cctx.config().getRebalanceTimeout());
- d.workerId(0);//old api support.
-
- worker = new DemandWorker(dmIdx.incrementAndGet(), fut);
-
- worker.run(node, d);
+ if (log.isDebugEnabled())
+ log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" +
+ cnt + ", partitions count=" + sParts.get(cnt).size() +
+ " (" + partitionsList(sParts.get(cnt)) + ")]");
+ }
}
}
}
@@ -997,26 +952,23 @@ public class GridDhtPartitionDemander {
if (node == null)
return;
- //Check remote node rebalancing API version.
- if (node.version().compareTo(GridDhtPreloader.REBALANCING_VER_2_SINCE) >= 0) {
- GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
- -1/* remove supply context signal */, this.topologyVersion(), cctx.cacheId());
+ GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
+ -1/* remove supply context signal */, this.topologyVersion(), cctx.cacheId());
- d.timeout(cctx.config().getRebalanceTimeout());
+ d.timeout(cctx.config().getRebalanceTimeout());
- try {
- for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) {
- d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
+ try {
+ for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) {
+ d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
- cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
- d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
- }
- }
- catch (IgniteCheckedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Failed to send failover context cleanup request to node");
+ cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
+ d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
}
}
+ catch (IgniteCheckedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send failover context cleanup request to node");
+ }
}
/**
@@ -1147,373 +1099,4 @@ public class GridDhtPartitionDemander {
return S.toString(RebalanceFuture.class, this);
}
}
-
- /**
- * Supply message wrapper.
- */
- @Deprecated//Backward compatibility. To be removed in future.
- private static class SupplyMessage {
- /** Sender ID. */
- private UUID sndId;
-
- /** Supply message. */
- private GridDhtPartitionSupplyMessage supply;
-
- /**
- * Dummy constructor.
- */
- private SupplyMessage() {
- // No-op.
- }
-
- /**
- * @param sndId Sender ID.
- * @param supply Supply message.
- */
- SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) {
- this.sndId = sndId;
- this.supply = supply;
- }
-
- /**
- * @return Sender ID.
- */
- UUID senderId() {
- return sndId;
- }
-
- /**
- * @return Message.
- */
- GridDhtPartitionSupplyMessage supply() {
- return supply;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(SupplyMessage.class, this);
- }
- }
-
- /**
- *
- */
- @Deprecated//Backward compatibility. To be removed in future.
- private class DemandWorker {
- /** Worker ID. */
- private int id;
-
- /** Partition-to-node assignments. */
- private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
-
- /** Message queue. */
- private final LinkedBlockingDeque<SupplyMessage> msgQ =
- new LinkedBlockingDeque<>();
-
- /** Counter. */
- private long cntr;
-
- /** Hide worker logger and use cache logger instead. */
- private IgniteLogger log = GridDhtPartitionDemander.this.log;
-
- /** */
- private volatile RebalanceFuture fut;
-
- /**
- * @param id Worker ID.
- * @param fut Rebalance future.
- */
- private DemandWorker(int id, RebalanceFuture fut) {
- assert id >= 0;
-
- this.id = id;
- this.fut = fut;
- }
-
- /**
- * @param msg Message.
- */
- private void addMessage(SupplyMessage msg) {
- msgQ.offer(msg);
- }
-
- /**
- * @param deque Deque to poll from.
- * @param time Time to wait.
- * @return Polled item.
- * @throws InterruptedException If interrupted.
- */
- @Nullable private <T> T poll(BlockingQueue<T> deque, long time) throws InterruptedException {
- return deque.poll(time, MILLISECONDS);
- }
-
- /**
- * @param idx Unique index for this topic.
- * @return Topic for partition.
- */
- public Object topic(long idx) {
- return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx);
- }
-
- /** */
- public void cancel() {
- msgQ.clear();
-
- msgQ.offer(new SupplyMessage(null, null));
- }
-
- /**
- * @param node Node to demand from.
- * @param topVer Topology version.
- * @param d Demand message.
- * @param exchFut Exchange future.
- * @throws InterruptedException If interrupted.
- * @throws ClusterTopologyCheckedException If node left.
- * @throws IgniteCheckedException If failed to send message.
- */
- private void demandFromNode(
- ClusterNode node,
- final AffinityTopologyVersion topVer,
- GridDhtPartitionDemandMessage d,
- GridDhtPartitionsExchangeFuture exchFut
- ) throws InterruptedException, IgniteCheckedException {
- GridDhtPartitionTopology top = cctx.dht().topology();
-
- cntr++;
-
- d.topic(topic(cntr));
- d.workerId(id);
-
- if (fut.isDone() || topologyChanged(fut))
- return;
-
- cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
- @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
- addMessage(new SupplyMessage(nodeId, msg));
- }
- });
-
- try {
- boolean retry;
-
- // DoWhile.
- // =======
- do {
- retry = false;
-
- // Create copy.
- d = new GridDhtPartitionDemandMessage(d, fut.remaining.get(node.id()).get2());
-
- long timeout = cctx.config().getRebalanceTimeout();
-
- d.timeout(timeout);
-
- if (log.isDebugEnabled())
- log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']');
-
- // Send demand message.
- cctx.io().send(node, d, cctx.ioPolicy());
-
- // While.
- // =====
- while (!fut.isDone() && !topologyChanged(fut)) {
- SupplyMessage s = poll(msgQ, timeout);
-
- // If timed out.
- if (s == null) {
- if (msgQ.isEmpty()) { // Safety check.
- U.warn(log, "Timed out waiting for partitions to load, will retry in " + timeout +
- " ms (you may need to increase 'networkTimeout' or 'rebalanceBatchSize'" +
- " configuration properties).");
-
- // Ordered listener was removed if timeout expired.
- cctx.io().removeOrderedHandler(d.topic());
-
- // Must create copy to be able to work with IO manager thread local caches.
- d = new GridDhtPartitionDemandMessage(d, fut.remaining.get(node.id()).get2());
-
- // Create new topic.
- d.topic(topic(++cntr));
-
- // Create new ordered listener.
- cctx.io().addOrderedHandler(d.topic(),
- new CI2<UUID, GridDhtPartitionSupplyMessage>() {
- @Override public void apply(UUID nodeId,
- GridDhtPartitionSupplyMessage msg) {
- addMessage(new SupplyMessage(nodeId, msg));
- }
- });
-
- // Resend message with larger timeout.
- retry = true;
-
- break; // While.
- }
- else
- continue; // While.
- }
-
- if (s.senderId() == null)
- return; // Stopping now.
-
- // Check that message was received from expected node.
- if (!s.senderId().equals(node.id())) {
- U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
- ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
-
- continue; // While.
- }
-
- if (log.isDebugEnabled())
- log.debug("Received supply message: " + s);
-
- GridDhtPartitionSupplyMessage supply = s.supply();
-
- // Check whether there were class loading errors on unmarshal
- if (supply.classError() != null) {
- if (log.isDebugEnabled())
- log.debug("Class got undeployed during preloading: " + supply.classError());
-
- retry = true;
-
- // Quit preloading.
- break;
- }
-
- // Preload.
- for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
- int p = e.getKey();
-
- if (cctx.affinity().partitionLocalNode(p, topVer)) {
- GridDhtLocalPartition part = top.localPartition(p, topVer, true);
-
- assert part != null;
-
- if (part.state() == MOVING) {
- boolean reserved = part.reserve();
-
- assert reserved : "Failed to reserve partition [igniteInstanceName=" +
- cctx.igniteInstanceName() + ", cacheName=" + cctx.namex() +
- ", part=" + part + ']';
-
- part.lock();
-
- try {
- Collection<Integer> invalidParts = new GridLeanSet<>();
-
- // Loop through all received entries and try to preload them.
- for (GridCacheEntryInfo entry : e.getValue().infos()) {
- if (!invalidParts.contains(p)) {
- if (!part.preloadingPermitted(entry.key(), entry.version())) {
- if (log.isDebugEnabled())
- log.debug("Preloading is not permitted for entry due to " +
- "evictions [key=" + entry.key() +
- ", ver=" + entry.version() + ']');
-
- continue;
- }
-
- if (!preloadEntry(node, p, entry, topVer)) {
- invalidParts.add(p);
-
- if (log.isDebugEnabled())
- log.debug("Got entries for invalid partition during " +
- "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
- }
- }
- }
-
- boolean last = supply.last().contains(p);
-
- // If message was last for this partition,
- // then we take ownership.
- if (last) {
- fut.partitionDone(node.id(), p);
-
- top.own(part);
-
- if (log.isDebugEnabled())
- log.debug("Finished rebalancing partition: " + part);
-
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
- preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
- exchFut.discoveryEvent());
- }
- }
- finally {
- part.unlock();
- part.release();
- }
- }
- else {
- fut.partitionDone(node.id(), p);
-
- if (log.isDebugEnabled())
- log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
- }
- }
- else {
- fut.partitionDone(node.id(), p);
-
- if (log.isDebugEnabled())
- log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
- }
- }
-
- // Only request partitions based on latest topology version.
- for (Integer miss : s.supply().missed()) {
- if (cctx.affinity().partitionLocalNode(miss, topVer))
- fut.partitionMissed(node.id(), miss);
- }
-
- for (Integer miss : s.supply().missed())
- fut.partitionDone(node.id(), miss);
-
- if (fut.remaining.get(node.id()) == null)
- break; // While.
-
- if (s.supply().ack()) {
- retry = true;
-
- break;
- }
- }
- }
- while (retry && !fut.isDone() && !topologyChanged(fut));
- }
- finally {
- cctx.io().removeOrderedHandler(d.topic());
- }
- }
-
- /**
- * @param node Node.
- * @param d D.
- * @throws IgniteCheckedException If failed.
- */
- public void run(ClusterNode node, GridDhtPartitionDemandMessage d) throws IgniteCheckedException {
- demandLock.readLock().lock();
-
- try {
- GridDhtPartitionsExchangeFuture exchFut = fut.exchFut;
-
- AffinityTopologyVersion topVer = fut.topVer;
-
- try {
- demandFromNode(node, topVer, d, exchFut);
- }
- catch (InterruptedException e) {
- throw new IgniteCheckedException(e);
- }
- }
- finally {
- demandLock.readLock().unlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(DemandWorker.class, this, "assignQ", assignQ, "msgQ", msgQ, "super", super.toString());
- }
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
index e8860f2..27e6777 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
@@ -31,7 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
/**
* Full partition map.
*/
-public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2>
+public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap>
implements Comparable<GridDhtPartitionFullMap>, Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -65,32 +65,9 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2>
* @param nodeOrder Node order.
* @param updateSeq Update sequence number.
* @param m Map to copy.
- */
- @Deprecated // Backward compatibility.
- public GridDhtPartitionFullMap(UUID nodeId, long nodeOrder, long updateSeq, Map<UUID, GridDhtPartitionMap2> m) {
- assert nodeId != null;
- assert updateSeq > 0;
- assert nodeOrder > 0;
-
- this.nodeId = nodeId;
- this.nodeOrder = nodeOrder;
- this.updateSeq = updateSeq;
-
- for (Map.Entry<UUID, GridDhtPartitionMap2> e : m.entrySet()) {
- GridDhtPartitionMap2 part = e.getValue();
-
- put(e.getKey(), new GridDhtPartitionMap(part.nodeId(), part.updateSequence(), part.map()));
- }
- }
-
- /**
- * @param nodeId Node ID.
- * @param nodeOrder Node order.
- * @param updateSeq Update sequence number.
- * @param m Map to copy.
* @param onlyActive If {@code true}, then only active partitions will be included.
*/
- public GridDhtPartitionFullMap(UUID nodeId, long nodeOrder, long updateSeq, Map<UUID, GridDhtPartitionMap2> m,
+ public GridDhtPartitionFullMap(UUID nodeId, long nodeOrder, long updateSeq, Map<UUID, GridDhtPartitionMap> m,
boolean onlyActive) {
assert nodeId != null;
assert updateSeq > 0;
@@ -100,10 +77,10 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2>
this.nodeOrder = nodeOrder;
this.updateSeq = updateSeq;
- for (Map.Entry<UUID, GridDhtPartitionMap2> e : m.entrySet()) {
- GridDhtPartitionMap2 part = e.getValue();
+ for (Map.Entry<UUID, GridDhtPartitionMap> e : m.entrySet()) {
+ GridDhtPartitionMap part = e.getValue();
- GridDhtPartitionMap2 cpy = new GridDhtPartitionMap2(part.nodeId(),
+ GridDhtPartitionMap cpy = new GridDhtPartitionMap(part.nodeId(),
part.updateSequence(),
part.topologyVersion(),
part.map(),
@@ -168,8 +145,8 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2>
if (size() != fullMap.size())
return false;
- for (Map.Entry<UUID, GridDhtPartitionMap2> e : entrySet()) {
- GridDhtPartitionMap2 m = fullMap.get(e.getKey());
+ for (Map.Entry<UUID, GridDhtPartitionMap> e : entrySet()) {
+ GridDhtPartitionMap m = fullMap.get(e.getKey());
if (m == null || !m.map().equals(e.getValue().map()))
return false;
@@ -238,7 +215,7 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2>
nodeOrder = in.readLong();
updateSeq = in.readLong();
- putAll(U.<UUID, GridDhtPartitionMap2>readMap(in));
+ putAll(U.<UUID, GridDhtPartitionMap>readMap(in));
}
/** {@inheritDoc} */
@@ -260,7 +237,7 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2>
* @return Map string representation.
*/
public String map2string() {
- Iterator<Map.Entry<UUID, GridDhtPartitionMap2>> it = entrySet().iterator();
+ Iterator<Map.Entry<UUID, GridDhtPartitionMap>> it = entrySet().iterator();
if (!it.hasNext())
return "{}";
@@ -270,11 +247,11 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2>
buf.append('{');
while(true) {
- Map.Entry<UUID, GridDhtPartitionMap2> e = it.next();
+ Map.Entry<UUID, GridDhtPartitionMap> e = it.next();
UUID nodeId = e.getKey();
- GridDhtPartitionMap2 partMap = e.getValue();
+ GridDhtPartitionMap partMap = e.getValue();
buf.append(nodeId).append('=').append(partMap.toFullString());
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
index 3096d63..43087ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
@@ -22,46 +22,215 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
+
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
+
/**
* Partition map.
*/
-@Deprecated // Backward compatibility, use GridDhtPartitionMap2 instead.
-public class GridDhtPartitionMap extends GridDhtPartitionMap2 {
+public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, Externalizable {
/** */
private static final long serialVersionUID = 0L;
+ /** Node ID. */
+ protected UUID nodeId;
+
+ /** Update sequence number. */
+ protected long updateSeq;
+
+ /** Topology version. */
+ protected AffinityTopologyVersion top;
+
+ /** */
+ protected Map<Integer, GridDhtPartitionState> map;
+
+ /** */
+ private volatile int moving;
+
+ /**
+ * Empty constructor required for {@link Externalizable}.
+ */
+ public GridDhtPartitionMap() {
+ // No-op.
+ }
+
/**
* @param nodeId Node ID.
* @param updateSeq Update sequence number.
+ * @param top Topology version.
* @param m Map to copy.
+ * @param onlyActive If {@code true}, then only active states will be included.
*/
- public GridDhtPartitionMap(UUID nodeId, long updateSeq,
- Map<Integer, GridDhtPartitionState> m) {
+ public GridDhtPartitionMap(UUID nodeId,
+ long updateSeq,
+ AffinityTopologyVersion top,
+ Map<Integer, GridDhtPartitionState> m,
+ boolean onlyActive) {
assert nodeId != null;
assert updateSeq > 0;
this.nodeId = nodeId;
this.updateSeq = updateSeq;
+ this.top = top;
map = U.newHashMap(m.size());
for (Map.Entry<Integer, GridDhtPartitionState> e : m.entrySet()) {
GridDhtPartitionState state = e.getValue();
- put(e.getKey(), state);
+ if (!onlyActive || state.active())
+ put(e.getKey(), state);
}
}
/**
- * Empty constructor required for {@link Externalizable}.
+ * @param nodeId Node ID.
+ * @param updateSeq Update sequence number.
+ * @param top Topology version.
+ * @param map Map.
+ * @param moving Number of moving partitions.
*/
- public GridDhtPartitionMap() {
- // No-op.
+ private GridDhtPartitionMap(UUID nodeId,
+ long updateSeq,
+ AffinityTopologyVersion top,
+ Map<Integer, GridDhtPartitionState> map,
+ int moving) {
+ this.nodeId = nodeId;
+ this.updateSeq = updateSeq;
+ this.top = top;
+ this.map = map;
+ this.moving = moving;
+ }
+
+ /**
+ * @return Copy with empty partition state map.
+ */
+ public GridDhtPartitionMap emptyCopy() {
+ return new GridDhtPartitionMap(nodeId,
+ updateSeq,
+ top,
+ U.<Integer, GridDhtPartitionState>newHashMap(0),
+ 0);
+ }
+
+ /**
+ * @param part Partition.
+ * @param state Partition state.
+ */
+ public void put(Integer part, GridDhtPartitionState state) {
+ GridDhtPartitionState old = map.put(part, state);
+
+ if (old == MOVING)
+ moving--;
+
+ if (state == MOVING)
+ moving++;
+ }
+
+ /**
+ * @return {@code true} If partition map contains moving partitions.
+ */
+ public boolean hasMovingPartitions() {
+ assert moving >= 0 : moving;
+
+ return moving != 0;
+ }
+
+ /**
+ * @param part Partition.
+ * @return Partition state.
+ */
+ public GridDhtPartitionState get(Integer part) {
+ return map.get(part);
+ }
+
+ /**
+ * @param part Partition.
+ * @return {@code True} if contains given partition.
+ */
+ public boolean containsKey(Integer part) {
+ return map.containsKey(part);
+ }
+
+ /**
+ * @return Entries.
+ */
+ public Set<Map.Entry<Integer, GridDhtPartitionState>> entrySet() {
+ return map.entrySet();
+ }
+
+ /**
+ * @return Map size.
+ */
+ public int size() {
+ return map.size();
+ }
+
+ /**
+ * @return Partitions.
+ */
+ public Set<Integer> keySet() {
+ return map.keySet();
+ }
+
+ /**
+ * @return Underlying map.
+ */
+ public Map<Integer, GridDhtPartitionState> map() {
+ return map;
+ }
+
+ /**
+ * @return Node ID.
+ */
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @return Update sequence.
+ */
+ public long updateSequence() {
+ return updateSeq;
+ }
+
+ /**
+ * @param updateSeq New update sequence value.
+ * @param topVer Current topology version.
+ * @return Old update sequence value.
+ */
+ public long updateSequence(long updateSeq, AffinityTopologyVersion topVer) {
+ long old = this.updateSeq;
+
+ assert updateSeq >= old : "Invalid update sequence [cur=" + old + ", new=" + updateSeq + ']';
+
+ this.updateSeq = updateSeq;
+
+ top = topVer;
+
+ return old;
+ }
+
+ /**
+ * @return Topology version.
+ */
+ public AffinityTopologyVersion topologyVersion() {
+ return top;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareTo(GridDhtPartitionMap o) {
+ assert nodeId.equals(o.nodeId);
+
+ return Long.compare(updateSeq, o.updateSeq);
}
/** {@inheritDoc} */
@@ -80,7 +249,7 @@ public class GridDhtPartitionMap extends GridDhtPartitionMap2 {
int ordinal = entry.getValue().ordinal();
assert ordinal == (ordinal & 0x3);
- assert entry.getKey() == (entry.getKey() & 0x3FFF);
+ assert entry.getKey() < CacheConfiguration.MAX_PARTITIONS_COUNT : entry.getKey();
int coded = (ordinal << 14) | entry.getKey();
@@ -90,6 +259,15 @@ public class GridDhtPartitionMap extends GridDhtPartitionMap2 {
}
assert i == size;
+
+ if (top != null) {
+ out.writeLong(topologyVersion().topologyVersion());
+ out.writeInt(topologyVersion().minorTopologyVersion());
+ }
+ else {
+ out.writeLong(0);
+ out.writeInt(0);
+ }
}
/** {@inheritDoc} */
@@ -110,6 +288,12 @@ public class GridDhtPartitionMap extends GridDhtPartitionMap2 {
put(part, GridDhtPartitionState.fromOrdinal(ordinal));
}
+
+ long ver = in.readLong();
+ int minorVer = in.readInt();
+
+ if (ver != 0)
+ top = new AffinityTopologyVersion(ver, minorVer);
}
/** {@inheritDoc} */
@@ -117,7 +301,7 @@ public class GridDhtPartitionMap extends GridDhtPartitionMap2 {
if (this == o)
return true;
- GridDhtPartitionMap2 other = (GridDhtPartitionMap2)o;
+ GridDhtPartitionMap other = (GridDhtPartitionMap)o;
return other.nodeId.equals(nodeId) && other.updateSeq == updateSeq;
}
@@ -131,11 +315,11 @@ public class GridDhtPartitionMap extends GridDhtPartitionMap2 {
* @return Full string representation.
*/
public String toFullString() {
- return S.toString(GridDhtPartitionMap2.class, this, "size", size(), "map", map.toString());
+ return S.toString(GridDhtPartitionMap.class, this, "size", size(), "map", map.toString(), "top", top);
}
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridDhtPartitionMap2.class, this, "size", size());
+ return S.toString(GridDhtPartitionMap.class, this, "size", size());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
deleted file mode 100644
index 7d6f272..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteProductVersion;
-
-import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
-
-/**
- * Partition map.
- */
-public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Used since. */
- public static final IgniteProductVersion SINCE = IgniteProductVersion.fromString("1.5.0");
-
- /** Node ID. */
- protected UUID nodeId;
-
- /** Update sequence number. */
- protected long updateSeq;
-
- /** Topology version. */
- protected AffinityTopologyVersion top;
-
- /** */
- protected Map<Integer, GridDhtPartitionState> map;
-
- /** */
- private volatile int moving;
-
- /**
- * Empty constructor required for {@link Externalizable}.
- */
- public GridDhtPartitionMap2() {
- // No-op.
- }
-
- /**
- * @param nodeId Node ID.
- * @param updateSeq Update sequence number.
- * @param top Topology version.
- * @param m Map to copy.
- * @param onlyActive If {@code true}, then only active states will be included.
- */
- public GridDhtPartitionMap2(UUID nodeId,
- long updateSeq,
- AffinityTopologyVersion top,
- Map<Integer, GridDhtPartitionState> m,
- boolean onlyActive) {
- assert nodeId != null;
- assert updateSeq > 0;
-
- this.nodeId = nodeId;
- this.updateSeq = updateSeq;
- this.top = top;
-
- map = U.newHashMap(m.size());
-
- for (Map.Entry<Integer, GridDhtPartitionState> e : m.entrySet()) {
- GridDhtPartitionState state = e.getValue();
-
- if (!onlyActive || state.active())
- put(e.getKey(), state);
- }
- }
-
- /**
- * @param nodeId Node ID.
- * @param updateSeq Update sequence number.
- * @param top Topology version.
- * @param map Map.
- * @param moving Number of moving partitions.
- */
- private GridDhtPartitionMap2(UUID nodeId,
- long updateSeq,
- AffinityTopologyVersion top,
- Map<Integer, GridDhtPartitionState> map,
- int moving) {
- this.nodeId = nodeId;
- this.updateSeq = updateSeq;
- this.top = top;
- this.map = map;
- this.moving = moving;
- }
-
- /**
- * @return Copy with empty partition state map.
- */
- public GridDhtPartitionMap2 emptyCopy() {
- return new GridDhtPartitionMap2(nodeId,
- updateSeq,
- top,
- U.<Integer, GridDhtPartitionState>newHashMap(0),
- 0);
- }
-
- /**
- * @param part Partition.
- * @param state Partition state.
- */
- public void put(Integer part, GridDhtPartitionState state) {
- GridDhtPartitionState old = map.put(part, state);
-
- if (old == MOVING)
- moving--;
-
- if (state == MOVING)
- moving++;
- }
-
- /**
- * @return {@code true} If partition map contains moving partitions.
- */
- public boolean hasMovingPartitions() {
- assert moving >= 0 : moving;
-
- return moving != 0;
- }
-
- /**
- * @param part Partition.
- * @return Partition state.
- */
- public GridDhtPartitionState get(Integer part) {
- return map.get(part);
- }
-
- /**
- * @param part Partition.
- * @return {@code True} if contains given partition.
- */
- public boolean containsKey(Integer part) {
- return map.containsKey(part);
- }
-
- /**
- * @return Entries.
- */
- public Set<Map.Entry<Integer, GridDhtPartitionState>> entrySet() {
- return map.entrySet();
- }
-
- /**
- * @return Map size.
- */
- public int size() {
- return map.size();
- }
-
- /**
- * @return Partitions.
- */
- public Set<Integer> keySet() {
- return map.keySet();
- }
-
- /**
- * @return Underlying map.
- */
- public Map<Integer, GridDhtPartitionState> map() {
- return map;
- }
-
- /**
- * @return Node ID.
- */
- public UUID nodeId() {
- return nodeId;
- }
-
- /**
- * @return Update sequence.
- */
- public long updateSequence() {
- return updateSeq;
- }
-
- /**
- * @param updateSeq New update sequence value.
- * @param topVer Current topology version.
- * @return Old update sequence value.
- */
- public long updateSequence(long updateSeq, AffinityTopologyVersion topVer) {
- long old = this.updateSeq;
-
- assert updateSeq >= old : "Invalid update sequence [cur=" + old + ", new=" + updateSeq + ']';
-
- this.updateSeq = updateSeq;
-
- top = topVer;
-
- return old;
- }
-
- /**
- * @return Topology version.
- */
- public AffinityTopologyVersion topologyVersion() {
- return top;
- }
-
- /** {@inheritDoc} */
- @Override public int compareTo(GridDhtPartitionMap2 o) {
- assert nodeId.equals(o.nodeId);
-
- return Long.compare(updateSeq, o.updateSeq);
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeUuid(out, nodeId);
-
- out.writeLong(updateSeq);
-
- int size = map.size();
-
- out.writeInt(size);
-
- int i = 0;
-
- for (Map.Entry<Integer, GridDhtPartitionState> entry : map.entrySet()) {
- int ordinal = entry.getValue().ordinal();
-
- assert ordinal == (ordinal & 0x3);
- assert entry.getKey() < CacheConfiguration.MAX_PARTITIONS_COUNT : entry.getKey();
-
- int coded = (ordinal << 14) | entry.getKey();
-
- out.writeShort((short)coded);
-
- i++;
- }
-
- assert i == size;
-
- if (top != null) {
- out.writeLong(topologyVersion().topologyVersion());
- out.writeInt(topologyVersion().minorTopologyVersion());
- }
- else {
- out.writeLong(0);
- out.writeInt(0);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- nodeId = U.readUuid(in);
-
- updateSeq = in.readLong();
-
- int size = in.readInt();
-
- map = U.newHashMap(size);
-
- for (int i = 0; i < size; i++) {
- int entry = in.readShort() & 0xFFFF;
-
- int part = entry & 0x3FFF;
- int ordinal = entry >> 14;
-
- put(part, GridDhtPartitionState.fromOrdinal(ordinal));
- }
-
- long ver = in.readLong();
- int minorVer = in.readInt();
-
- if (ver != 0)
- top = new AffinityTopologyVersion(ver, minorVer);
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- GridDhtPartitionMap2 other = (GridDhtPartitionMap2)o;
-
- return other.nodeId.equals(nodeId) && other.updateSeq == updateSeq;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return 31 * nodeId.hashCode() + (int)(updateSeq ^ (updateSeq >>> 32));
- }
-
- /**
- * @return Full string representation.
- */
- public String toFullString() {
- return S.toString(GridDhtPartitionMap2.class, this, "size", size(), "map", map.toString(), "top", top);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtPartitionMap2.class, this, "size", size());
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index 6e69161..1f3dee7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
@@ -32,9 +31,6 @@ import org.jetbrains.annotations.Nullable;
*/
public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage {
/** */
- public static final IgniteProductVersion PART_MAP_COMPRESS_SINCE = IgniteProductVersion.fromString("1.6.11");
-
- /** */
protected static final byte COMPRESSED_FLAG_MASK = 1;
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 5eacc36..f41da2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1476,7 +1476,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @param msg Partitions single message.
*/
private void updatePartitionSingleMap(GridDhtPartitionsSingleMessage msg) {
- for (Map.Entry<Integer, GridDhtPartitionMap2> entry : msg.partitions().entrySet()) {
+ for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) {
Integer cacheId = entry.getKey();
GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 8a7adfc..33c23e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -241,13 +241,13 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
assert map2 != null : e.getValue();
assert map1.size() == map2.size();
- for (Map.Entry<UUID, GridDhtPartitionMap2> e0 : map2.entrySet()) {
- GridDhtPartitionMap2 partMap1 = map1.get(e0.getKey());
+ for (Map.Entry<UUID, GridDhtPartitionMap> e0 : map2.entrySet()) {
+ GridDhtPartitionMap partMap1 = map1.get(e0.getKey());
assert partMap1 != null && partMap1.map().isEmpty() : partMap1;
assert !partMap1.hasMovingPartitions() : partMap1;
- GridDhtPartitionMap2 partMap2 = e0.getValue();
+ GridDhtPartitionMap partMap2 = e0.getValue();
assert partMap2 != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index d65e405..da7403e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -47,7 +47,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/** Local partitions. */
@GridToStringInclude
@GridDirectTransient
- private Map<Integer, GridDhtPartitionMap2> parts;
+ private Map<Integer, GridDhtPartitionMap> parts;
/** */
@GridDirectMap(keyType = Integer.class, valueType = Integer.class)
@@ -106,7 +106,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
* @param locMap Local partition map.
* @param dupDataCache Optional ID of cache with the same partition state map.
*/
- public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap2 locMap, @Nullable Integer dupDataCache) {
+ public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap locMap, @Nullable Integer dupDataCache) {
if (parts == null)
parts = new HashMap<>();
@@ -152,7 +152,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/**
* @return Local partitions.
*/
- public Map<Integer, GridDhtPartitionMap2> partitions() {
+ public Map<Integer, GridDhtPartitionMap> partitions() {
return parts;
}
@@ -217,13 +217,13 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
assert parts != null;
for (Map.Entry<Integer, Integer> e : dupPartsData.entrySet()) {
- GridDhtPartitionMap2 map1 = parts.get(e.getKey());
+ GridDhtPartitionMap map1 = parts.get(e.getKey());
assert map1 != null : e.getKey();
assert F.isEmpty(map1.map());
assert !map1.hasMovingPartitions();
- GridDhtPartitionMap2 map2 = parts.get(e.getValue());
+ GridDhtPartitionMap map2 = parts.get(e.getValue());
assert map2 != null : e.getValue();
assert map2.map() != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 1d88742..dc988bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -36,7 +36,6 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@ -62,7 +61,6 @@ import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedDeque8;
@@ -80,13 +78,6 @@ import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
* DHT cache preloader.
*/
public class GridDhtPreloader extends GridCachePreloaderAdapter {
- /**
- * Rebalancing was refactored at version 1.5.0, but backward compatibility to previous implementation was saved.
- * Node automatically chose communication protocol depends on remote node's version.
- * Backward compatibility may be removed at Ignite 2.x.
- */
- public static final IgniteProductVersion REBALANCING_VER_2_SINCE = IgniteProductVersion.fromString("1.5.0");
-
/** Default preload resend timeout. */
public static final long DFLT_PRELOAD_RESEND_TIMEOUT = 1500;
@@ -194,7 +185,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
cctx.shared().affinity().onCacheCreated(cctx);
supplier = new GridDhtPartitionSupplier(cctx);
- demander = new GridDhtPartitionDemander(cctx, demandLock);
+ demander = new GridDhtPartitionDemander(cctx);
supplier.start();
demander.start();
@@ -619,14 +610,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
AffinityAssignment assignment = cctx.affinity().assignment(topVer);
- boolean newAffMode = node.version().compareTo(CacheAffinitySharedManager.LATE_AFF_ASSIGN_SINCE) >= 0;
-
GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(cctx.cacheId(),
topVer,
- assignment.assignment(),
- newAffMode);
+ assignment.assignment());
- if (newAffMode && cctx.affinity().affinityCache().centralizedAffinityFunction()) {
+ if (cctx.affinity().affinityCache().centralizedAffinityFunction()) {
assert assignment.idealAssignment() != null;
res.idealAffinityAssignment(assignment.idealAssignment());
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
index 4dd7978..76147ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
@@ -24,7 +24,6 @@ import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -34,9 +33,6 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
*/
public class CacheContinuousQueryBatchAck extends GridCacheMessage {
/** */
- public static final IgniteProductVersion SINCE_VER = IgniteProductVersion.fromString("1.5.0");
-
- /** */
private static final long serialVersionUID = 0L;
/** Routine ID. */
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 485059f..6c8df14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -1324,7 +1324,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
for (AffinityTopologyVersion topVer : t.get2()) {
for (ClusterNode node : ctx.discovery().cacheAffinityNodes(cctx.name(), topVer)) {
- if (!node.isLocal() && node.version().compareTo(CacheContinuousQueryBatchAck.SINCE_VER) >= 0) {
+ if (!node.isLocal()) {
try {
cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 6887a50..745bbde 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -58,14 +58,13 @@ import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
-import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteAsyncCallback;
-import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.resources.LoggerResource;
@@ -78,7 +77,6 @@ import static javax.cache.event.EventType.REMOVED;
import static javax.cache.event.EventType.UPDATED;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
-import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.QUERY_MSG_VER_2_SINCE;
/**
* Continuous queries manager.
@@ -424,49 +422,30 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
final boolean keepBinary,
final boolean includeExpired) throws IgniteCheckedException
{
- IgniteClosure<Boolean, CacheContinuousQueryHandler> clsr;
+ IgniteOutClosure<CacheContinuousQueryHandler> clsr;
if (rmtFilterFactory != null)
- clsr = new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
- @Override public CacheContinuousQueryHandler apply(Boolean v2) {
+ clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() {
+ @Override public CacheContinuousQueryHandler apply() {
CacheContinuousQueryHandler hnd;
- if (v2)
- hnd = new CacheContinuousQueryHandlerV2(
- cctx.name(),
- TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
- locLsnr,
- rmtFilterFactory,
- true,
- false,
- !includeExpired,
- false,
- null);
- else {
- CacheEntryEventFilter fltr = rmtFilterFactory.create();
-
- if (!(fltr instanceof CacheEntryEventSerializableFilter))
- throw new IgniteException("Topology has nodes of the old versions. In this case " +
- "EntryEventFilter should implement " +
- "org.apache.ignite.cache.CacheEntryEventSerializableFilter interface. Filter: " + fltr);
-
- hnd = new CacheContinuousQueryHandler(
- cctx.name(),
- TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
- locLsnr,
- (CacheEntryEventSerializableFilter)fltr,
- true,
- false,
- !includeExpired,
- false);
- }
+ hnd = new CacheContinuousQueryHandlerV2(
+ cctx.name(),
+ TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+ locLsnr,
+ rmtFilterFactory,
+ true,
+ false,
+ !includeExpired,
+ false,
+ null);
return hnd;
}
};
else
- clsr = new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
- @Override public CacheContinuousQueryHandler apply(Boolean ignore) {
+ clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() {
+ @Override public CacheContinuousQueryHandler apply() {
return new CacheContinuousQueryHandler(
cctx.name(),
TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
@@ -509,8 +488,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
{
return executeQuery0(
locLsnr,
- new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
- @Override public CacheContinuousQueryHandler apply(Boolean v2) {
+ new IgniteOutClosure<CacheContinuousQueryHandler>() {
+ @Override public CacheContinuousQueryHandler apply() {
return new CacheContinuousQueryHandler(
cctx.name(),
TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
@@ -603,18 +582,20 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
/**
* @param locLsnr Local listener.
+ * @param clsr Closure to create CacheContinuousQueryHandler.
* @param bufSize Buffer size.
* @param timeInterval Time interval.
* @param autoUnsubscribe Auto unsubscribe flag.
* @param internal Internal flag.
* @param notifyExisting Notify existing flag.
* @param loc Local flag.
+ * @param keepBinary Keep binary flag.
* @param onStart Waiting topology exchange.
* @return Continuous routine ID.
* @throws IgniteCheckedException In case of error.
*/
private UUID executeQuery0(CacheEntryUpdatedListener locLsnr,
- IgniteClosure<Boolean, CacheContinuousQueryHandler> clsr,
+ IgniteOutClosure<CacheContinuousQueryHandler> clsr,
int bufSize,
long timeInterval,
boolean autoUnsubscribe,
@@ -631,9 +612,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED && cctx.affinityNode();
- boolean v2 = useV2Protocol(cctx.discovery().allNodes());
-
- final CacheContinuousQueryHandler hnd = clsr.apply(v2);
+ final CacheContinuousQueryHandler hnd = clsr.apply();
hnd.taskNameHash(taskNameHash);
hnd.skipPrimaryCheck(skipPrimaryCheck);
@@ -799,20 +778,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/**
- * @param nodes Nodes.
- * @return {@code True} if all nodes greater than {@link GridContinuousProcessor#QUERY_MSG_VER_2_SINCE},
- * otherwise {@code false}.
- */
- private boolean useV2Protocol(Collection<ClusterNode> nodes) {
- for (ClusterNode node : nodes) {
- if (QUERY_MSG_VER_2_SINCE.compareTo(node.version()) > 0)
- return false;
- }
-
- return true;
- }
-
- /**
* @param lsnrId Listener ID.
* @param lsnr Listener.
* @param internal Internal flag.
@@ -922,14 +887,12 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
routineId = executeQuery0(
locLsnr,
- new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
- @Override public CacheContinuousQueryHandler apply(Boolean v2) {
+ new IgniteOutClosure<CacheContinuousQueryHandler>() {
+ @Override public CacheContinuousQueryHandler apply() {
CacheContinuousQueryHandler hnd;
Factory<CacheEntryEventFilter> rmtFilterFactory = cfg.getCacheEntryEventFilterFactory();
- v2 = rmtFilterFactory != null && v2;
-
- if (v2)
+ if (rmtFilterFactory != null)
hnd = new CacheContinuousQueryHandlerV2(
cctx.name(),
TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index ca4edb6..3814731 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -37,7 +37,6 @@ import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreSession;
import org.apache.ignite.cache.store.CacheStoreSessionListener;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
@@ -67,7 +66,6 @@ import org.apache.ignite.lang.IgniteAsyncSupport;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.transactions.Transaction;
@@ -87,13 +85,6 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
/** */
private static final int SES_ATTR = GridMetadataAwareAdapter.EntryKey.CACHE_STORE_MANAGER_KEY.key();
- /**
- * Behavior can be changed by setting {@link IgniteSystemProperties#IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY} property
- * to {@code True}.
- */
- private static final IgniteProductVersion LOCAL_STORE_KEEPS_PRIMARY_AND_BACKUPS_SINCE =
- IgniteProductVersion.fromString("1.5.22");
-
/** */
protected CacheStore<Object, Object> store;
@@ -237,22 +228,6 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
globalSesLsnrs = true;
}
-
- if (isLocal()) {
- for (ClusterNode node : cctx.kernalContext().cluster().get().forRemotes().nodes()) {
- if (LOCAL_STORE_KEEPS_PRIMARY_AND_BACKUPS_SINCE.compareTo(node.version()) > 0 &&
- !IgniteSystemProperties.getBoolean(IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY)) {
- IgniteProductVersion v = LOCAL_STORE_KEEPS_PRIMARY_AND_BACKUPS_SINCE;
-
- log.warning("Since Ignite " + v.major() + "." + v.minor() + "." + v.maintenance() +
- " Local Store keeps primary and backup partitions. " +
- "To keep primary partitions only please set system property " +
- IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY + " to 'true'.");
-
- break;
- }
- }
- }
}
/** {@inheritDoc} */