You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/31 16:26:52 UTC
[2/6] ignite git commit: Ignite-1093 Improved rebalancing
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
new file mode 100644
index 0000000..01056ac
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.io.*;
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Partition supply message.
+ */
+public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements GridCacheDeployable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Worker ID. */
+ private int workerId = -1;
+
+ /** Update sequence. */
+ private long updateSeq;
+
+ /** Acknowledgement flag. */
+ private boolean ack;
+
+ /** Topology version. */
+ private AffinityTopologyVersion topVer;
+
+ /** Partitions that have been fully sent. */
+ @GridDirectCollection(int.class)
+ private Collection<Integer> last;
+
+ /** Partitions which were not found. */
+ @GridToStringInclude
+ @GridDirectCollection(int.class)
+ private Collection<Integer> missed;
+
+ /** Entries. */
+ @GridDirectMap(keyType = int.class, valueType = CacheEntryInfoCollection.class)
+ private Map<Integer, CacheEntryInfoCollection> infos = new HashMap<>();
+
+ /** Message size. */
+ @GridDirectTransient
+ private int msgSize;
+
+ /**
+ * @param workerId Worker ID.
+ * @param updateSeq Update sequence for this node.
+ * @param cacheId Cache ID.
+ */
+ GridDhtPartitionSupplyMessageV2(int workerId, long updateSeq, int cacheId, AffinityTopologyVersion topVer) {
+ assert workerId >= 0;
+ assert updateSeq > 0;
+
+ this.cacheId = cacheId;
+ this.updateSeq = updateSeq;
+ this.workerId = workerId;
+ this.topVer = topVer;
+ }
+
+ /**
+ * Empty constructor required for {@link Externalizable}.
+ */
+ public GridDhtPartitionSupplyMessageV2() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean allowForStartup() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean ignoreClassErrors() {
+ return true;
+ }
+
+ /**
+ * @return Worker ID.
+ */
+ int workerId() {
+ return workerId;
+ }
+
+ /**
+ * @return Update sequence.
+ */
+ long updateSequence() {
+ return updateSeq;
+ }
+
+ /**
+ * Marks this message for acknowledgment.
+ */
+ void markAck() {
+ ack = true;
+ }
+
+ /**
+ * @return Acknowledgement flag.
+ */
+ boolean ack() {
+ return ack;
+ }
+
+ /**
+ * @return Topology version for which demand message is sent.
+ */
+ @Override public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /**
+ * @return Flag to indicate last message for partition.
+ */
+ Collection<Integer> last() {
+ return last == null ? Collections.<Integer>emptySet() : last;
+ }
+
+ /**
+ * @param p Partition which was fully sent.
+ */
+ void last(int p) {
+ if (last == null)
+ last = new HashSet<>();
+
+ if (last.add(p)) {
+ msgSize += 4;
+
+ // If partition is empty, we need to add it.
+ if (!infos.containsKey(p)) {
+ CacheEntryInfoCollection infoCol = new CacheEntryInfoCollection();
+
+ infoCol.init();
+
+ infos.put(p, infoCol);
+ }
+ }
+ }
+
+ /**
+ * @param p Missed partition.
+ */
+ void missed(int p) {
+ if (missed == null)
+ missed = new HashSet<>();
+
+ if (missed.add(p))
+ msgSize += 4;
+ }
+
+ /**
+ * @return Missed partitions.
+ */
+ Collection<Integer> missed() {
+ return missed == null ? Collections.<Integer>emptySet() : missed;
+ }
+
+ /**
+ * @return Entries.
+ */
+ Map<Integer, CacheEntryInfoCollection> infos() {
+ return infos;
+ }
+
+ /**
+ * @return Message size.
+ */
+ int messageSize() {
+ return msgSize;
+ }
+
+ /**
+ * @param p Partition.
+ * @param info Entry to add.
+ * @param ctx Cache context.
+ * @throws IgniteCheckedException If failed.
+ */
+ void addEntry(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
+ assert info != null;
+
+ marshalInfo(info, ctx);
+
+ msgSize += info.marshalledSize(ctx);
+
+ CacheEntryInfoCollection infoCol = infos.get(p);
+
+ if (infoCol == null) {
+ msgSize += 4;
+
+ infos.put(p, infoCol = new CacheEntryInfoCollection());
+
+ infoCol.init();
+ }
+
+ infoCol.add(info);
+ }
+
+ /**
+ * @param p Partition.
+ * @param info Entry to add.
+ * @param ctx Cache context.
+ * @throws IgniteCheckedException If failed.
+ */
+ void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
+ assert info != null;
+ assert (info.key() != null || info.keyBytes() != null);
+ assert info.value() != null;
+
+ // Need to call this method to initialize info properly.
+ marshalInfo(info, ctx);
+
+ msgSize += info.marshalledSize(ctx);
+
+ CacheEntryInfoCollection infoCol = infos.get(p);
+
+ if (infoCol == null) {
+ msgSize += 4;
+
+ infos.put(p, infoCol = new CacheEntryInfoCollection());
+
+ infoCol.init();
+ }
+
+ infoCol.add(info);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ GridCacheContext cacheCtx = ctx.cacheContext(cacheId);
+
+ for (CacheEntryInfoCollection col : infos().values()) {
+ List<GridCacheEntryInfo> entries = col.infos();
+
+ for (int i = 0; i < entries.size(); i++)
+ entries.get(i).unmarshal(cacheCtx, ldr);
+ }
+ }
+
+ /**
+ * @return Number of entries in message.
+ */
+ public int size() {
+ return infos.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeBoolean("ack", ack))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeCollection("last", last, MessageCollectionItemType.INT))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
+ if (!writer.writeLong("updateSeq", updateSeq))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeInt("workerId", workerId))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ ack = reader.readBoolean("ack");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ last = reader.readCollection("last", MessageCollectionItemType.INT);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ missed = reader.readCollection("missed", MessageCollectionItemType.INT);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
+ updateSeq = reader.readLong("updateSeq");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ workerId = reader.readInt("workerId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 114;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 10;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtPartitionSupplyMessageV2.class, this,
+ "size", size(),
+ "parts", infos.keySet(),
+ "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
deleted file mode 100644
index 13cfef3..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ /dev/null
@@ -1,545 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.thread.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.locks.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
-
-/**
- * Thread pool for supplying partitions to demanding nodes.
- */
-class GridDhtPartitionSupplyPool {
- /** */
- private final GridCacheContext<?, ?> cctx;
-
- /** */
- private final IgniteLogger log;
-
- /** */
- private final ReadWriteLock busyLock;
-
- /** */
- private GridDhtPartitionTopology top;
-
- /** */
- private final Collection<SupplyWorker> workers = new LinkedList<>();
-
- /** */
- private final BlockingQueue<DemandMessage> queue = new LinkedBlockingDeque<>();
-
- /** */
- private final boolean depEnabled;
-
- /** Preload predicate. */
- private IgnitePredicate<GridCacheEntryInfo> preloadPred;
-
- /**
- * @param cctx Cache context.
- * @param busyLock Shutdown lock.
- */
- GridDhtPartitionSupplyPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
- assert cctx != null;
- assert busyLock != null;
-
- this.cctx = cctx;
- this.busyLock = busyLock;
-
- log = cctx.logger(getClass());
-
- top = cctx.dht().topology();
-
- if (!cctx.kernalContext().clientNode()) {
- int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
-
- for (int i = 0; i < poolSize; i++)
- workers.add(new SupplyWorker());
-
- cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
- @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
- processDemandMessage(id, m);
- }
- });
- }
-
- depEnabled = cctx.gridDeploy().enabled();
- }
-
- /**
- *
- */
- void start() {
- for (SupplyWorker w : workers)
- new IgniteThread(cctx.gridName(), "preloader-supply-worker", w).start();
- }
-
- /**
- *
- */
- void stop() {
- U.cancel(workers);
- U.join(workers, log);
-
- top = null;
- }
-
- /**
- * Sets preload predicate for supply pool.
- *
- * @param preloadPred Preload predicate.
- */
- void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
- this.preloadPred = preloadPred;
- }
-
- /**
- * @return Size of this thread pool.
- */
- int poolSize() {
- return cctx.config().getRebalanceThreadPoolSize();
- }
-
- /**
- * @return {@code true} if entered to busy state.
- */
- private boolean enterBusy() {
- if (busyLock.readLock().tryLock())
- return true;
-
- if (log.isDebugEnabled())
- log.debug("Failed to enter to busy state (supplier is stopping): " + cctx.nodeId());
-
- return false;
- }
-
- /**
- * @param nodeId Sender node ID.
- * @param d Message.
- */
- private void processDemandMessage(UUID nodeId, GridDhtPartitionDemandMessage d) {
- if (!enterBusy())
- return;
-
- try {
- if (cctx.rebalanceEnabled()) {
- if (log.isDebugEnabled())
- log.debug("Received partition demand [node=" + nodeId + ", demand=" + d + ']');
-
- queue.offer(new DemandMessage(nodeId, d));
- }
- else
- U.warn(log, "Received partition demand message when rebalancing is disabled (will ignore): " + d);
- }
- finally {
- leaveBusy();
- }
- }
-
- /**
- *
- */
- private void leaveBusy() {
- busyLock.readLock().unlock();
- }
-
- /**
- * @param deque Deque to poll from.
- * @param w Worker.
- * @return Polled item.
- * @throws InterruptedException If interrupted.
- */
- @Nullable private <T> T poll(BlockingQueue<T> deque, GridWorker w) throws InterruptedException {
- assert w != null;
-
- // There is currently a case where {@code interrupted}
- // flag on a thread gets flipped during stop which causes the pool to hang. This check
- // will always make sure that interrupted flag gets reset before going into wait conditions.
- // The true fix should actually make sure that interrupted flag does not get reset or that
- // interrupted exception gets propagated. Until we find a real fix, this method should
- // always work to make sure that there is no hanging during stop.
- if (w.isCancelled())
- Thread.currentThread().interrupt();
-
- return deque.poll(2000, MILLISECONDS);
- }
-
- /**
- * Supply work.
- */
- private class SupplyWorker extends GridWorker {
- /** Hide worker logger and use cache logger. */
- private IgniteLogger log = GridDhtPartitionSupplyPool.this.log;
-
- /**
- * Default constructor.
- */
- private SupplyWorker() {
- super(cctx.gridName(), "preloader-supply-worker", GridDhtPartitionSupplyPool.this.log);
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
- while (!isCancelled()) {
- DemandMessage msg = poll(queue, this);
-
- if (msg == null)
- continue;
-
- ClusterNode node = cctx.discovery().node(msg.senderId());
-
- if (node == null) {
- if (log.isDebugEnabled())
- log.debug("Received message from non-existing node (will ignore): " + msg);
-
- continue;
- }
-
- processMessage(msg, node);
- }
- }
-
- /**
- * @param msg Message.
- * @param node Demander.
- */
- private void processMessage(DemandMessage msg, ClusterNode node) {
- assert msg != null;
- assert node != null;
-
- GridDhtPartitionDemandMessage d = msg.message();
-
- GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
- d.updateSequence(), cctx.cacheId());
-
- long preloadThrottle = cctx.config().getRebalanceThrottle();
-
- boolean ack = false;
-
- try {
- for (int part : d.partitions()) {
- GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
-
- if (loc == null || loc.state() != OWNING || !loc.reserve()) {
- // Reply with partition of "-1" to let sender know that
- // this node is no longer an owner.
- s.missed(part);
-
- if (log.isDebugEnabled())
- log.debug("Requested partition is not owned by local node [part=" + part +
- ", demander=" + msg.senderId() + ']');
-
- continue;
- }
-
- GridCacheEntryInfoCollectSwapListener swapLsnr = null;
-
- try {
- if (cctx.isSwapOrOffheapEnabled()) {
- swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
-
- cctx.swap().addOffHeapListener(part, swapLsnr);
- cctx.swap().addSwapListener(part, swapLsnr);
- }
-
- boolean partMissing = false;
-
- for (GridCacheEntryEx e : loc.entries()) {
- if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
- // Demander no longer needs this partition, so we send '-1' partition and move on.
- s.missed(part);
-
- if (log.isDebugEnabled())
- log.debug("Demanding node does not need requested partition [part=" + part +
- ", nodeId=" + msg.senderId() + ']');
-
- partMissing = true;
-
- break;
- }
-
- if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
- ack = true;
-
- if (!reply(node, d, s))
- return;
-
- // Throttle preloading.
- if (preloadThrottle > 0)
- U.sleep(preloadThrottle);
-
- s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
- cctx.cacheId());
- }
-
- GridCacheEntryInfo info = e.info();
-
- if (info != null && !info.isNew()) {
- if (preloadPred == null || preloadPred.apply(info))
- s.addEntry(part, info, cctx);
- else if (log.isDebugEnabled())
- log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
- info);
- }
- }
-
- if (partMissing)
- continue;
-
- if (cctx.isSwapOrOffheapEnabled()) {
- GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
- cctx.swap().iterator(part);
-
- // Iterator may be null if space does not exist.
- if (iter != null) {
- try {
- boolean prepared = false;
-
- for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) {
- if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
- // Demander no longer needs this partition,
- // so we send '-1' partition and move on.
- s.missed(part);
-
- if (log.isDebugEnabled())
- log.debug("Demanding node does not need requested partition " +
- "[part=" + part + ", nodeId=" + msg.senderId() + ']');
-
- partMissing = true;
-
- break; // For.
- }
-
- if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
- ack = true;
-
- if (!reply(node, d, s))
- return;
-
- // Throttle preloading.
- if (preloadThrottle > 0)
- U.sleep(preloadThrottle);
-
- s = new GridDhtPartitionSupplyMessage(d.workerId(),
- d.updateSequence(), cctx.cacheId());
- }
-
- GridCacheSwapEntry swapEntry = e.getValue();
-
- GridCacheEntryInfo info = new GridCacheEntryInfo();
-
- info.keyBytes(e.getKey());
- info.ttl(swapEntry.ttl());
- info.expireTime(swapEntry.expireTime());
- info.version(swapEntry.version());
- info.value(swapEntry.value());
-
- if (preloadPred == null || preloadPred.apply(info))
- s.addEntry0(part, info, cctx);
- else {
- if (log.isDebugEnabled())
- log.debug("Rebalance predicate evaluated to false (will not send " +
- "cache entry): " + info);
-
- continue;
- }
-
- // Need to manually prepare cache message.
- if (depEnabled && !prepared) {
- ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
- cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
- swapEntry.valueClassLoaderId() != null ?
- cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
- null;
-
- if (ldr == null)
- continue;
-
- if (ldr instanceof GridDeploymentInfo) {
- s.prepare((GridDeploymentInfo)ldr);
-
- prepared = true;
- }
- }
- }
-
- if (partMissing)
- continue;
- }
- finally {
- iter.close();
- }
- }
- }
-
- // Stop receiving promote notifications.
- if (swapLsnr != null) {
- cctx.swap().removeOffHeapListener(part, swapLsnr);
- cctx.swap().removeSwapListener(part, swapLsnr);
- }
-
- if (swapLsnr != null) {
- Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
-
- swapLsnr = null;
-
- for (GridCacheEntryInfo info : entries) {
- if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
- // Demander no longer needs this partition,
- // so we send '-1' partition and move on.
- s.missed(part);
-
- if (log.isDebugEnabled())
- log.debug("Demanding node does not need requested partition " +
- "[part=" + part + ", nodeId=" + msg.senderId() + ']');
-
- // No need to continue iteration over swap entries.
- break;
- }
-
- if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
- ack = true;
-
- if (!reply(node, d, s))
- return;
-
- s = new GridDhtPartitionSupplyMessage(d.workerId(),
- d.updateSequence(),
- cctx.cacheId());
- }
-
- if (preloadPred == null || preloadPred.apply(info))
- s.addEntry(part, info, cctx);
- else if (log.isDebugEnabled())
- log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
- info);
- }
- }
-
- // Mark as last supply message.
- s.last(part);
-
- if (ack) {
- s.markAck();
-
- break; // Partition for loop.
- }
- }
- finally {
- loc.release();
-
- if (swapLsnr != null) {
- cctx.swap().removeOffHeapListener(part, swapLsnr);
- cctx.swap().removeSwapListener(part, swapLsnr);
- }
- }
- }
-
- reply(node, d, s);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send partition supply message to node: " + node.id(), e);
- }
- }
-
- /**
- * @param n Node.
- * @param d Demand message.
- * @param s Supply message.
- * @return {@code True} if message was sent, {@code false} if recipient left grid.
- * @throws IgniteCheckedException If failed.
- */
- private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
- throws IgniteCheckedException {
- try {
- if (log.isDebugEnabled())
- log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
-
- cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
-
- return true;
- }
- catch (ClusterTopologyCheckedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Failed to send partition supply message because node left grid: " + n.id());
-
- return false;
- }
- }
- }
-
- /**
- * Demand message wrapper.
- */
- private static class DemandMessage extends IgniteBiTuple<UUID, GridDhtPartitionDemandMessage> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * @param sndId Sender ID.
- * @param msg Message.
- */
- DemandMessage(UUID sndId, GridDhtPartitionDemandMessage msg) {
- super(sndId, msg);
- }
-
- /**
- * Empty constructor required for {@link Externalizable}.
- */
- public DemandMessage() {
- // No-op.
- }
-
- /**
- * @return Sender ID.
- */
- UUID senderId() {
- return get1();
- }
-
- /**
- * @return Message.
- */
- public GridDhtPartitionDemandMessage message() {
- return get2();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return "DemandMessage [senderId=" + senderId() + ", msg=" + message() + ']';
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index a43ebe2..7a9deba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
@@ -42,6 +41,7 @@ import java.util.concurrent.locks.*;
import static org.apache.ignite.events.EventType.*;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
import static org.apache.ignite.internal.util.GridConcurrentFactory.*;
/**
@@ -61,10 +61,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap();
/** Partition suppliers. */
- private GridDhtPartitionSupplyPool supplyPool;
+ private GridDhtPartitionSupplier supplier;
/** Partition demanders. */
- private GridDhtPartitionDemandPool demandPool;
+ private GridDhtPartitionDemander demander;
/** Start future. */
private GridFutureAdapter<Object> startFut;
@@ -76,6 +76,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
private ConcurrentMap<AffinityTopologyVersion, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts =
new ConcurrentHashMap8<>();
+ /** Demand lock. */
+ private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
+
/** Discovery listener. */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
@@ -159,8 +162,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
});
- supplyPool = new GridDhtPartitionSupplyPool(cctx, busyLock);
- demandPool = new GridDhtPartitionDemandPool(cctx, busyLock);
+ supplier = new GridDhtPartitionSupplier(cctx);
+ demander = new GridDhtPartitionDemander(cctx, demandLock);
cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
}
@@ -180,18 +183,18 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
topVer.setIfGreater(startTopVer);
- supplyPool.start();
- demandPool.start();
+ supplier.start();
+ demander.start();
}
/** {@inheritDoc} */
@Override public void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
super.preloadPredicate(preloadPred);
- assert supplyPool != null && demandPool != null : "preloadPredicate may be called only after start()";
+ assert supplier != null && demander != null : "preloadPredicate may be called only after start()";
- supplyPool.preloadPredicate(preloadPred);
- demandPool.preloadPredicate(preloadPred);
+ supplier.preloadPredicate(preloadPred);
+ demander.preloadPredicate(preloadPred);
}
/** {@inheritDoc} */
@@ -205,62 +208,165 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
// Acquire write busy lock.
busyLock.writeLock().lock();
- if (supplyPool != null)
- supplyPool.stop();
+ if (supplier != null)
+ supplier.stop();
- if (demandPool != null)
- demandPool.stop();
+ if (demander != null)
+ demander.stop();
top = null;
}
/** {@inheritDoc} */
@Override public void onInitialExchangeComplete(@Nullable Throwable err) {
- if (err == null) {
+ if (err == null)
startFut.onDone();
+ else
+ startFut.onDone(err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReconnected() {
+ startFut = new GridFutureAdapter<>();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
+ demander.updateLastExchangeFuture(lastFut);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
+ // No assignments for disabled preloader.
+ GridDhtPartitionTopology top = cctx.dht().topology();
+
+ if (!cctx.rebalanceEnabled())
+ return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
- final long start = U.currentTimeMillis();
+ int partCnt = cctx.affinity().partitions();
- final CacheConfiguration cfg = cctx.config();
+ assert exchFut.forcePreload() || exchFut.dummyReassign() ||
+ exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
+ "Topology version mismatch [exchId=" + exchFut.exchangeId() +
+ ", topVer=" + top.topologyVersion() + ']';
- if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) {
- U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name());
+ GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
- demandPool.syncFuture().listen(new CI1<Object>() {
- @Override public void apply(Object t) {
- U.log(log, "Completed rebalancing in " + cfg.getRebalanceMode() + " mode " +
- "[cache=" + cctx.name() + ", time=" + (U.currentTimeMillis() - start) + " ms]");
+ AffinityTopologyVersion topVer = assigns.topologyVersion();
+
+ for (int p = 0; p < partCnt; p++) {
+ if (cctx.shared().exchange().hasPendingExchange()) {
+ if (log.isDebugEnabled())
+ log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
+ exchFut.exchangeId());
+
+ break;
+ }
+
+ // If partition belongs to local node.
+ if (cctx.affinity().localNode(p, topVer)) {
+ GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+
+ assert part != null;
+ assert part.id() == p;
+
+ if (part.state() != MOVING) {
+ if (log.isDebugEnabled())
+ log.debug("Skipping partition assignment (state is not MOVING): " + part);
+
+ continue; // For.
+ }
+
+ Collection<ClusterNode> picked = pickedOwners(p, topVer);
+
+ if (picked.isEmpty()) {
+ top.own(part);
+
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+ DiscoveryEvent discoEvt = exchFut.discoveryEvent();
+
+ cctx.events().addPreloadEvent(p,
+ EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
+ discoEvt.type(), discoEvt.timestamp());
}
- });
+
+ if (log.isDebugEnabled())
+ log.debug("Owning partition as there are no other owners: " + part);
+ }
+ else {
+ ClusterNode n = F.first(picked);
+
+ GridDhtPartitionDemandMessage msg = assigns.get(n);
+
+ if (msg == null) {
+ assigns.put(n, msg = new GridDhtPartitionDemandMessage(
+ top.updateSequence(),
+ exchFut.exchangeId().topologyVersion(),
+ cctx.cacheId()));
+ }
+
+ msg.addPartition(p);
+ }
}
}
- else
- startFut.onDone(err);
+
+ return assigns;
}
- /** {@inheritDoc} */
- @Override public void onReconnected() {
- startFut = new GridFutureAdapter<>();
+ /**
+ * @param p Partition.
+ * @param topVer Topology version.
+ * @return Picked owners.
+ */
+ private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) {
+ Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
+
+ int affCnt = affNodes.size();
+
+ Collection<ClusterNode> rmts = remoteOwners(p, topVer);
+
+ int rmtCnt = rmts.size();
+
+ if (rmtCnt <= affCnt)
+ return rmts;
+
+ List<ClusterNode> sorted = new ArrayList<>(rmts);
+
+ // Sort in descending order, so nodes with higher order will be first.
+ Collections.sort(sorted, CU.nodeComparator(false));
+
+ // Pick newest nodes.
+ return sorted.subList(0, affCnt);
}
- /** {@inheritDoc} */
- @Override public void onExchangeFutureAdded() {
- demandPool.onExchangeFutureAdded();
+ /**
+ * @param p Partition.
+ * @param topVer Topology version.
+ * @return Nodes owning this partition.
+ */
+ private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
+ return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId()));
}
/** {@inheritDoc} */
- @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
- demandPool.updateLastExchangeFuture(lastFut);
+ public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s) {
+ demandLock.readLock().lock();
+ try {
+ demander.handleSupplyMessage(idx, id, s);
+ }
+ finally {
+ demandLock.readLock().unlock();
+ }
}
/** {@inheritDoc} */
- @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
- return demandPool.assign(exchFut);
+ public void handleDemandMessage(UUID id, GridDhtPartitionDemandMessage d){
+ supplier.handleDemandMessage(id, d);
}
/** {@inheritDoc} */
- @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
- demandPool.addAssignments(assignments, forcePreload);
+ @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) throws IgniteCheckedException {
+ demander.addAssignments(assignments, forcePreload);
}
/**
@@ -272,7 +378,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> syncFuture() {
- return cctx.kernalContext().clientNode() ? startFut : demandPool.syncFuture();
+ return cctx.kernalContext().clientNode() ? startFut : demander.syncFuture();
}
/**
@@ -531,12 +637,19 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public void forcePreload() {
- demandPool.forcePreload();
+ demander.forcePreload();
}
/** {@inheritDoc} */
@Override public void unwindUndeploys() {
- demandPool.unwindUndeploys();
+ demandLock.writeLock().lock();
+
+ try {
+ cctx.deploy().unwind(cctx);
+ }
+ finally {
+ demandLock.writeLock().unlock();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index e683dad..77b55a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1906,7 +1906,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
* <p>
* This method is intended for test purposes only.
*/
- void simulateNodeFailure() {
+ protected void simulateNodeFailure() {
impl.simulateNodeFailure();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingRebalanceErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingRebalanceErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingRebalanceErrorTest.java
index a37f585..88540db 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingRebalanceErrorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingRebalanceErrorTest.java
@@ -53,7 +53,7 @@ public class IgniteCacheP2pUnmarshallingRebalanceErrorTest extends IgniteCacheP2
readCnt.set(Integer.MAX_VALUE);
- for (int i = 0; i <= 1000; i++)
+ for (int i = 0; i <= 100000; i++)
jcache(0).put(new TestKey(String.valueOf(++key)), "");
startGrid(3);
@@ -68,6 +68,7 @@ public class IgniteCacheP2pUnmarshallingRebalanceErrorTest extends IgniteCacheP2
try {
jcache(3).get(new TestKey(String.valueOf(key)));
+ //Can fail in case rebalancing finished before get
assert false : "p2p marshalling failed, but error response was not sent";
}
catch (CacheException e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
new file mode 100644
index 0000000..a17fc7a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+
+/**
+ *
+ */
+public class GridCacheRebalancingAsyncSelfTest extends GridCacheRebalancingSyncSelfTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+ CacheConfiguration cacheCfg = iCfg.getCacheConfiguration()[0];
+
+ cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
+
+ cacheCfg = iCfg.getCacheConfiguration()[1];
+
+ cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
+
+ iCfg.setDiscoverySpi(new FailableTcpDiscoverySpi());
+
+ ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
+
+ if (getTestGridName(20).equals(gridName))
+ spi = (FailableTcpDiscoverySpi)iCfg.getDiscoverySpi();
+
+ return iCfg;
+ }
+
+ public static class FailableTcpDiscoverySpi extends TcpDiscoverySpi {
+ public void fail() {
+ simulateNodeFailure();
+ }
+ }
+
+ private volatile FailableTcpDiscoverySpi spi;
+
+ /**
+ * @throws Exception
+ */
+ public void testNodeFailedAtRebalancing() throws Exception {
+ Ignite ignite = startGrid(0);
+
+ generateData(ignite);
+
+ log.info("Preloading started.");
+
+ startGrid(1);
+
+ waitForRebalancing(1, 2);
+
+ startGrid(20);
+
+ waitForRebalancing(20, 3);
+
+ spi.fail();
+
+ waitForRebalancing(0, 4);
+ waitForRebalancing(1, 4);
+
+ stopAllGrids();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
new file mode 100644
index 0000000..bd1bf28
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ private static int TEST_SIZE = 1_000_000;
+
+ /** cache name. */
+ protected static String CACHE_NAME_DHT = "cache";
+
+ /** cache 2 name. */
+ protected static String CACHE_2_NAME_DHT = "cache2";
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return Long.MAX_VALUE;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
+
+ if (getTestGridName(10).equals(gridName))
+ iCfg.setClientMode(true);
+
+ CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>();
+
+ cacheCfg.setName(CACHE_NAME_DHT);
+ cacheCfg.setCacheMode(CacheMode.PARTITIONED);
+ //cacheCfg.setRebalanceBatchSize(1024);
+ //cacheCfg.setRebalanceBatchesCount(1);
+ cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cacheCfg.setBackups(1);
+
+ CacheConfiguration<Integer, Integer> cacheCfg2 = new CacheConfiguration<>();
+
+ cacheCfg2.setName(CACHE_2_NAME_DHT);
+ cacheCfg2.setCacheMode(CacheMode.PARTITIONED);
+ //cacheCfg2.setRebalanceBatchSize(1024);
+ //cacheCfg2.setRebalanceBatchesCount(1);
+ cacheCfg2.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cacheCfg2.setBackups(1);
+
+ iCfg.setRebalanceThreadPoolSize(4);
+ iCfg.setCacheConfiguration(cacheCfg, cacheCfg2);
+ return iCfg;
+ }
+
+ /**
+ * @param ignite Ignite.
+ */
+ protected void generateData(Ignite ignite) {
+ try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_NAME_DHT)) {
+ for (int i = 0; i < TEST_SIZE; i++) {
+ if (i % 1_000_000 == 0)
+ log.info("Prepared " + i / 1_000_000 + "m entries.");
+
+ stmr.addData(i, i);
+ }
+
+ stmr.flush();
+ }
+ try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_2_NAME_DHT)) {
+ for (int i = 0; i < TEST_SIZE; i++) {
+ if (i % 1_000_000 == 0)
+ log.info("Prepared " + i / 1_000_000 + "m entries.");
+
+ stmr.addData(i, i + 3);
+ }
+
+ stmr.flush();
+ }
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @throws IgniteCheckedException
+ */
+ protected void checkData(Ignite ignite) throws IgniteCheckedException {
+ for (int i = 0; i < TEST_SIZE; i++) {
+ if (i % 1_000_000 == 0)
+ log.info("Checked " + i / 1_000_000 + "m entries.");
+
+ assert ignite.cache(CACHE_NAME_DHT).get(i) != null && ignite.cache(CACHE_NAME_DHT).get(i).equals(i) :
+ "key " + i + " does not match (" + ignite.cache(CACHE_NAME_DHT).get(i) + ")";
+ }
+ for (int i = 0; i < TEST_SIZE; i++) {
+ if (i % 1_000_000 == 0)
+ log.info("Checked " + i / 1_000_000 + "m entries.");
+
+ assert ignite.cache(CACHE_2_NAME_DHT).get(i) != null && ignite.cache(CACHE_2_NAME_DHT).get(i).equals(i + 3) :
+ "key " + i + " does not match (" + ignite.cache(CACHE_2_NAME_DHT).get(i) + ")";
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testSimpleRebalancing() throws Exception {
+ Ignite ignite = startGrid(0);
+
+ generateData(ignite);
+
+ log.info("Preloading started.");
+
+ long start = System.currentTimeMillis();
+
+ startGrid(1);
+
+ waitForRebalancing(1, 2);
+
+ long spend = (System.currentTimeMillis() - start) / 1000;
+
+ stopGrid(0);
+
+ checkData(grid(1));
+
+ log.info("Spend " + spend + " seconds to rebalance entries.");
+
+ stopAllGrids();
+ }
+
+ /**
+ * @param id Id.
+ * @param top Topology.
+ */
+ protected void waitForRebalancing(int id, int top) throws IgniteCheckedException {
+ boolean finished = false;
+
+ while (!finished) {
+ finished = true;
+
+ for (GridCacheAdapter c : grid(id).context().cache().internalCaches()) {
+ GridDhtPartitionDemander.SyncFuture fut = (GridDhtPartitionDemander.SyncFuture)c.preloader().syncFuture();
+ if (fut.topologyVersion().topologyVersion() != top) {
+ finished = false;
+
+ break;
+ }
+ else
+ fut.get();
+ }
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testComplexRebalancing() throws Exception {
+ Ignite ignite = startGrid(0);
+
+ generateData(ignite);
+
+ log.info("Preloading started.");
+
+ long start = System.currentTimeMillis();
+
+ //will be started simultaneously in case of ASYNC mode
+ startGrid(1);
+ startGrid(2);
+ startGrid(3);
+ startGrid(4);
+
+ //wait until cache rebalanced in async mode
+ waitForRebalancing(1, 5);
+ waitForRebalancing(2, 5);
+ waitForRebalancing(3, 5);
+ waitForRebalancing(4, 5);
+
+ //cache rebalanced in async node
+
+ stopGrid(0);
+
+ //wait until cache rebalanced
+ waitForRebalancing(1, 6);
+ waitForRebalancing(2, 6);
+ waitForRebalancing(3, 6);
+ waitForRebalancing(4, 6);
+
+ //cache rebalanced
+
+ stopGrid(1);
+
+ //wait until cache rebalanced
+ waitForRebalancing(2, 7);
+ waitForRebalancing(3, 7);
+ waitForRebalancing(4, 7);
+
+ //cache rebalanced
+
+ stopGrid(2);
+
+ //wait until cache rebalanced
+ waitForRebalancing(3, 8);
+ waitForRebalancing(4, 8);
+
+ //cache rebalanced
+
+ stopGrid(3);
+
+ long spend = (System.currentTimeMillis() - start) / 1000;
+
+ checkData(grid(4));
+
+ log.info("Spend " + spend + " seconds to rebalance entries.");
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testBackwardCompatibility() throws Exception {
+ Ignite ignite = startGrid(0);
+
+ Map<String, Object> map = new HashMap<>(ignite.cluster().localNode().attributes());
+
+ map.put(IgniteNodeAttributes.REBALANCING_VERSION, 0);
+
+ ((TcpDiscoveryNode)ignite.cluster().localNode()).setAttributes(map);
+
+ generateData(ignite);
+
+ startGrid(1);
+
+ waitForRebalancing(1, 2);
+
+ stopGrid(0);
+
+ checkData(grid(1));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
index 0f4d24d..641fc5d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
@@ -126,26 +126,6 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
}
/**
- * @throws Exception If failed.
- */
- public void testSingleZeroPoolSize() throws Exception {
- preloadMode = SYNC;
- poolSize = 0;
-
- try {
- startGrid(1);
-
- assert false : "Grid should have been failed to start.";
- }
- catch (IgniteCheckedException e) {
- info("Caught expected exception: " + e);
- }
- finally {
- stopAllGrids();
- }
- }
-
- /**
* @throws Exception If test failed.
*/
public void testIntegrity() throws Exception {
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
index c20e901..c787f21 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.processors.cache.distributed.rebalancing.*;
import org.apache.ignite.internal.processors.cache.distributed.replicated.*;
import org.apache.ignite.internal.processors.cache.distributed.replicated.preloader.*;
import org.apache.ignite.internal.processors.cache.local.*;
@@ -79,6 +80,9 @@ public class IgniteCacheTestSuite3 extends TestSuite {
suite.addTestSuite(GridCacheReplicatedPreloadStartStopEventsSelfTest.class);
suite.addTestSuite(GridReplicatedTxPreloadTest.class);
+ suite.addTestSuite(GridCacheRebalancingSyncSelfTest.class);
+ suite.addTestSuite(GridCacheRebalancingAsyncSelfTest.class);
+
suite.addTestSuite(IgniteTxReentryNearSelfTest.class);
suite.addTestSuite(IgniteTxReentryColocatedSelfTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/yardstick/config/benchmark-rebalancing-win.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-rebalancing-win.properties b/modules/yardstick/config/benchmark-rebalancing-win.properties
new file mode 100644
index 0000000..978e388
--- /dev/null
+++ b/modules/yardstick/config/benchmark-rebalancing-win.properties
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+::
+:: Contains benchmarks for ATOMIC cache.
+::
+
+:: JVM options.
+set JVM_OPTS=%JVM_OPTS% -DIGNITE_QUIET=false
+
+:: Uncomment to enable concurrent garbage collection (GC) if you encounter long GC pauses.
+:: set JVM_OPTS=%JVM_OPTS%^
+:: -XX:+UseParNewGC^
+:: -XX:+UseConcMarkSweepGC^
+:: -XX:+UseTLAB^
+:: -XX:NewSize=128m^
+:: -XX:MaxNewSize=128m^
+:: -XX:MaxTenuringThreshold=0^
+:: -XX:SurvivorRatio=1024^
+:: -XX:+UseCMSInitiatingOccupancyOnly^
+:: -XX:CMSInitiatingOccupancyFraction=60
+
+:: List of default probes.
+BENCHMARK_DEFAULT_PROBES=ThroughputLatencyProbe,PercentileProbe
+
+:: Packages where the specified benchmark is searched by reflection mechanism.
+BENCHMARK_PACKAGES=org.yardstickframework,org.apache.ignite.yardstick
+
+:: Probe point writer class name.
+:: BENCHMARK_WRITER=
+
+:: Comma-separated list of the hosts to run BenchmarkServers on. 2 nodes on local host are enabled by default.
+set SERVER_HOSTS=localhost
+
+:: Comma-separated list of the hosts to run BenchmarkDrivers on. 1 node on local host is enabled by default.
+set DRIVER_HOSTS=localhost
+
+:: Remote username.
+:: set REMOTE_USER=
+:: set RESTART_SERVERS=localhost:10:100
+
+:: Run configuration which contains all benchmarks.
+:: Note that each benchmark is set to run for 300 seconds (5 mins) with warm-up set to 60 seconds (1 minute).
+
+set CONFIGS=^
+-cfg %SCRIPT_DIR%\..\config\ignite-rebalancing-multicast-win-config.xml -b 1 -w 0 -d 200 -t 64 -sm PRIMARY_SYNC -dn IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet2 -cl -r 5000000 -cn rebalance2
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/yardstick/config/benchmark-rebalancing.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-rebalancing.properties b/modules/yardstick/config/benchmark-rebalancing.properties
new file mode 100644
index 0000000..f6d6967
--- /dev/null
+++ b/modules/yardstick/config/benchmark-rebalancing.properties
@@ -0,0 +1,79 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Contains benchmarks for ATOMIC cache.
+#
+
+now0=`date +'%H%M%S'`
+
+# JVM options.
+JVM_OPTS=${JVM_OPTS}" -DIGNITE_QUIET=false -Xms15g -Xmx15g"
+
+# Uncomment to enable concurrent garbage collection (GC) if you encounter long GC pauses.
+JVM_OPTS=${JVM_OPTS}" \
+ -Xloggc:./gc${now0}.log \
+ -XX:+PrintGCDetails \
+ -verbose:gc \
+ -XX:+UseParNewGC \
+ -XX:+UseConcMarkSweepGC \
+ -XX:+UseTLAB \
+ -XX:NewSize=128m \
+ -XX:MaxNewSize=128m \
+ -XX:MaxPermSize=512m \
+ -XX:MaxTenuringThreshold=0 \
+ -XX:SurvivorRatio=1024 \
+ -XX:+UseCMSInitiatingOccupancyOnly \
+ -XX:CMSInitiatingOccupancyFraction=60 \
+ -XX:+DisableExplicitGC \
+"
+
+# List of default probes.
+# Add DStatProbe or VmStatProbe if your OS supports it (e.g. if running on Linux).
+BENCHMARK_DEFAULT_PROBES=ThroughputLatencyProbe,PercentileProbe,VmStatProbe,DStatProbe
+
+# Packages where the specified benchmark is searched by reflection mechanism.
+BENCHMARK_PACKAGES=org.yardstickframework,org.apache.ignite.yardstick
+
+# Probe point writer class name.
+# BENCHMARK_WRITER=
+
+# Comma-separated list of the hosts to run BenchmarkServers on. 2 nodes on local host are enabled by default.
+SERVER_HOSTS=10.20.0.221,10.20.0.222,10.20.0.223
+
+# Comma-separated list of the hosts to run BenchmarkDrivers on. 1 node on local host is enabled by default.
+DRIVER_HOSTS=localhost
+
+RESTART_SERVERS=10.20.0.221:145:0:1000000
+
+# Remote username.
+REMOTE_USER=gridgain
+
+# Number of nodes, used to wait for the specified number of nodes to start.
+nodesNum=$((`echo ${SERVER_HOSTS} | tr ',' '\n' | wc -l` + `echo ${DRIVER_HOSTS} | tr ',' '\n' | wc -l`))
+
+# Run configuration.
+CONFIGS="\
+-cfg ${SCRIPT_DIR}/../config/ignite-rebalancing-multicast-config.xml -nn ${nodesNum} -b 1 -w 0 -d 120 -t 64 -sm PRIMARY_SYNC -dn IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet1 -cl -r 20000000 -cn rebalance1,\
+"
+#-cfg ${SCRIPT_DIR}/../config/ignite-rebalancing-multicast-config.xml -nn ${nodesNum} -b 1 -w 0 -d 120 -t 64 -sm PRIMARY_SYNC -dn IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet2 -cl -r 20000000 -cn rebalance2,\
+#-cfg ${SCRIPT_DIR}/../config/ignite-rebalancing-multicast-config.xml -nn ${nodesNum} -b 1 -w 0 -d 120 -t 64 -sm PRIMARY_SYNC -dn IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet3 -cl -r 20000000 -cn rebalance3,\
+#-cfg ${SCRIPT_DIR}/../config/ignite-rebalancing-multicast-config.xml -nn ${nodesNum} -b 1 -w 0 -d 120 -t 64 -sm PRIMARY_SYNC -dn IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet5 -cl -r 20000000 -cn rebalance5,\
+#-cfg ${SCRIPT_DIR}/../config/ignite-rebalancing-multicast-config.xml -nn ${nodesNum} -b 1 -w 0 -d 120 -t 64 -sm PRIMARY_SYNC -dn IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet3-1024 -cl -r 20000000 -cn rebalance3-1024,\
+#-cfg ${SCRIPT_DIR}/../config/ignite-rebalancing-multicast-config.xml -nn ${nodesNum} -b 1 -w 0 -d 120 -t 64 -sm PRIMARY_SYNC -dn IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet3-10024 -cl -r 20000000 -cn rebalance3-10024,\
+#-cfg ${SCRIPT_DIR}/../config/ignite-rebalancing-multicast-config.xml -nn ${nodesNum} -b 1 -w 0 -d 120 -t 64 -sm PRIMARY_SYNC -dn IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet3-100024 -cl -r 20000000 -cn rebalance3-100024,\
+#"
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/yardstick/config/ignite-log4j.xml
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/ignite-log4j.xml b/modules/yardstick/config/ignite-log4j.xml
new file mode 100644
index 0000000..ab3c781
--- /dev/null
+++ b/modules/yardstick/config/ignite-log4j.xml
@@ -0,0 +1,143 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!DOCTYPE log4j:configuration PUBLIC "-//APACHE//DTD LOG4J 1.2//EN"
+ "http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd">
+
+<!--
+ Default log4j configuration for Ignite.
+-->
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+ <!--
+ Logs System.out messages to console.
+
+ Note, this appender is disabled by default.
+ To enable, uncomment the section below and also CONSOLE appender in the <root> element.
+ -->
+ <!--<!–-->
+ <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+
+ <param name="Threshold" value="DEBUG"/>
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="[%d{ABSOLUTE}][%-5p][%t][%c{1}] %m%n"/>
+ </layout>
+
+ <filter class="org.apache.log4j.varia.LevelRangeFilter">
+ <param name="levelMin" value="DEBUG"/>
+ <param name="levelMax" value="WARN"/>
+ </filter>
+ </appender>
+ <appender name="ASYNC_CONSOLE" class="org.apache.log4j.AsyncAppender">
+ <param name="BufferSize" value="500"/>
+ <appender-ref ref="CONSOLE"/>
+ </appender>
+ <!--–>-->
+
+ <!--
+ Logs all ERROR messages to console.
+ -->
+ <appender name="CONSOLE_ERR" class="org.apache.log4j.ConsoleAppender">
+ <!-- Log to STDERR. -->
+ <param name="Target" value="System.err"/>
+
+ <!-- Log from ERROR and higher (change to WARN if needed). -->
+ <param name="Threshold" value="ERROR"/>
+
+ <!-- The default pattern: Date Priority [Category] Message\n -->
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="[%d{ABSOLUTE}][%-5p][%t][%c{1}] %m%n"/>
+ </layout>
+ </appender>
+ <appender name="ASYNC_CONSOLE_ERR" class="org.apache.log4j.AsyncAppender">
+ <param name="BufferSize" value="500"/>
+ <appender-ref ref="CONSOLE_ERR"/>
+ </appender>
+
+ <!--
+ Logs all output to specified file.
+ By default, the logging goes to IGNITE_HOME/work/log folder
+ -->
+ <appender name="FILE" class="org.apache.ignite.logger.log4j.Log4jRollingFileAppender">
+ <param name="Threshold" value="DEBUG"/>
+ <param name="File" value="${IGNITE_HOME}/work/log/ignite.log"/>
+ <param name="Append" value="true"/>
+ <param name="MaxFileSize" value="10MB"/>
+ <param name="MaxBackupIndex" value="10"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="[%d{ABSOLUTE}][%-5p][%t][%c{1}] %m%n"/>
+ </layout>
+ </appender>
+ <appender name="ASYNC_FILE" class="org.apache.log4j.AsyncAppender">
+ <param name="BufferSize" value="500"/>
+ <appender-ref ref="FILE"/>
+ </appender>
+
+ <!--
+ <category name="org.apache.ignite">
+ <level value="DEBUG"/>
+ </category>
+ -->
+
+ <!--
+ Uncomment to disable courtesy notices, such as SPI configuration
+ consistency warnings.
+ -->
+ <!--
+ <category name="org.apache.ignite.CourtesyConfigNotice">
+ <level value="OFF"/>
+ </category>
+ -->
+
+ <category name="org.springframework">
+ <level value="WARN"/>
+ </category>
+
+ <category name="org.eclipse.jetty">
+ <level value="WARN"/>
+ </category>
+
+ <!--
+ Avoid warnings about failed bind attempt when multiple nodes running on the same host.
+ -->
+ <category name="org.eclipse.jetty.util.log">
+ <level value="ERROR"/>
+ </category>
+
+ <category name="org.eclipse.jetty.util.component">
+ <level value="ERROR"/>
+ </category>
+
+ <category name="com.amazonaws">
+ <level value="WARN"/>
+ </category>
+
+ <!-- Default settings. -->
+ <root>
+ <!-- Print out all info by default. -->
+ <level value="INFO"/>
+
+ <!-- Uncomment to enable logging to console. -->
+ <!--<appender-ref ref="ASYNC_CONSOLE"/>-->
+
+ <appender-ref ref="ASYNC_CONSOLE_ERR"/>
+ <appender-ref ref="ASYNC_FILE"/>
+ </root>
+</log4j:configuration>
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/yardstick/config/ignite-rebalancing-multicast-config.xml
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/ignite-rebalancing-multicast-config.xml b/modules/yardstick/config/ignite-rebalancing-multicast-config.xml
new file mode 100644
index 0000000..e16c351
--- /dev/null
+++ b/modules/yardstick/config/ignite-rebalancing-multicast-config.xml
@@ -0,0 +1,174 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!--
+ Ignite Spring configuration file to startup grid.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="gridLogger">
+ <bean class="org.apache.ignite.logger.log4j.Log4JLogger">
+ <constructor-arg type="java.lang.String" value="config/ignite-log4j.xml"/>
+ </bean>
+ </property>
+
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
+ <property name="addresses">
+ <list>
+ <value>10.20.0.219:47500..47509</value>
+ <value>10.20.0.221:47500..47509</value>
+ <value>10.20.0.222:47500..47509</value>
+ <value>10.20.0.223:47500..47509</value>
+ </list>
+ </property>
+ </bean>
+ </property>
+ </bean>
+ </property>
+
+ <property name="cacheConfiguration">
+ <list>
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="rebalance1"/>
+
+ <property name="cacheMode" value="PARTITIONED"/>
+
+ <property name="atomicityMode" value="ATOMIC"/>
+
+ <property name="swapEnabled" value="false"/>
+
+ <property name="backups" value="1"/>
+
+ <property name="rebalanceBatchesCount" value="1"/>
+ </bean>
+
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="rebalance2"/>
+
+ <property name="cacheMode" value="PARTITIONED"/>
+
+ <property name="atomicityMode" value="ATOMIC"/>
+
+ <property name="swapEnabled" value="false"/>
+
+ <property name="backups" value="1"/>
+
+ <property name="rebalanceBatchesCount" value="2"/>
+ </bean>
+
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="rebalance3"/>
+
+ <property name="cacheMode" value="PARTITIONED"/>
+
+ <property name="atomicityMode" value="ATOMIC"/>
+
+ <property name="swapEnabled" value="false"/>
+
+ <property name="backups" value="1"/>
+
+ <property name="rebalanceBatchesCount" value="3"/>
+ </bean>
+
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="rebalance5"/>
+
+ <property name="cacheMode" value="PARTITIONED"/>
+
+ <property name="atomicityMode" value="ATOMIC"/>
+
+ <property name="swapEnabled" value="false"/>
+
+ <property name="backups" value="1"/>
+
+ <property name="rebalanceBatchesCount" value="5"/>
+ </bean>
+
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="rebalance3-1024"/>
+
+ <property name="cacheMode" value="PARTITIONED"/>
+
+ <property name="atomicityMode" value="ATOMIC"/>
+
+ <property name="swapEnabled" value="false"/>
+
+ <property name="backups" value="1"/>
+
+ <property name="rebalanceBatchesCount" value="3"/>
+
+ <property name="rebalanceBatchSize" value="1024"/>
+ </bean>
+
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="rebalance3-10024"/>
+
+ <property name="cacheMode" value="PARTITIONED"/>
+
+ <property name="atomicityMode" value="ATOMIC"/>
+
+ <property name="swapEnabled" value="false"/>
+
+ <property name="backups" value="1"/>
+
+ <property name="rebalanceBatchesCount" value="3"/>
+
+ <property name="rebalanceBatchSize" value="10024"/>
+ </bean>
+
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="rebalance3-100024"/>
+
+ <property name="cacheMode" value="PARTITIONED"/>
+
+ <property name="atomicityMode" value="ATOMIC"/>
+
+ <property name="swapEnabled" value="false"/>
+
+ <property name="backups" value="1"/>
+
+ <property name="rebalanceBatchesCount" value="3"/>
+
+ <property name="rebalanceBatchSize" value="100024"/>
+ </bean>
+
+ </list>
+ </property>
+
+ <property name="failureDetectionTimeout" value="1000"/>
+
+ <property name="metricsLogFrequency" value="200"/>
+
+ <property name="warmupClosure" ref="warmupClosure"/>
+
+ <property name="rebalanceThreadPoolSize" value="0"/>
+ </bean>
+
+ <bean id="warmupClosure" class="org.apache.ignite.startup.BasicWarmupClosure">
+
+ </bean>
+</beans>
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/yardstick/config/ignite-rebalancing-multicast-win-config.xml
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/ignite-rebalancing-multicast-win-config.xml b/modules/yardstick/config/ignite-rebalancing-multicast-win-config.xml
new file mode 100644
index 0000000..0e55019
--- /dev/null
+++ b/modules/yardstick/config/ignite-rebalancing-multicast-win-config.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!--
+ Ignite Spring configuration file to startup grid.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="gridLogger">
+ <bean class="org.apache.ignite.logger.log4j.Log4JLogger">
+ <constructor-arg type="java.lang.String" value="config/ignite-log4j.xml"/>
+ </bean>
+ </property>
+
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
+ <property name="addresses">
+ <list>
+ <value>127.0.0.1:47500..47509</value>
+ </list>
+ </property>
+ </bean>
+ </property>
+ </bean>
+ </property>
+
+ <property name="cacheConfiguration">
+ <list>
+ <bean id="rebalance2" class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="rebalance2"/>
+
+ <property name="cacheMode" value="PARTITIONED"/>
+
+ <property name="atomicityMode" value="ATOMIC"/>
+
+ <property name="swapEnabled" value="false"/>
+
+ <property name="backups" value="1"/>
+ </bean>
+ </list>
+ </property>
+
+ <property name="failureDetectionTimeout" value="1000"/>
+
+ <property name="metricsLogFrequency" value="200"/>
+ </bean>
+</beans>
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/yardstick/pom.xml
----------------------------------------------------------------------
diff --git a/modules/yardstick/pom.xml b/modules/yardstick/pom.xml
index dc4a033..553158c 100644
--- a/modules/yardstick/pom.xml
+++ b/modules/yardstick/pom.xml
@@ -34,7 +34,7 @@
<version>1.4.1-SNAPSHOT</version>
<properties>
- <yardstick.version>0.7.0</yardstick.version>
+ <yardstick.version>0.7.1</yardstick.version>
</properties>
@@ -98,6 +98,12 @@
<artifactId>spring-aop</artifactId>
<version>${spring.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-log4j</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>