You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/07/07 09:34:33 UTC
[30/46] ignite git commit: IGNITE-4901 Decrease logging level for
DataStremer retry
IGNITE-4901 Decrease logging level for DataStremer retry
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c3401cbd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c3401cbd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c3401cbd
Branch: refs/heads/ignite-2.1
Commit: c3401cbd009d7b9cfc8aed0cc9c3f34fb5f433db
Parents: 740b0b2
Author: Alexey Kukushkin <Al...@yahoo.com>
Authored: Thu Jul 6 12:44:27 2017 +0300
Committer: Alexey Kukushkin <Al...@yahoo.com>
Committed: Thu Jul 6 12:44:27 2017 +0300
----------------------------------------------------------------------
.../datastreamer/DataStreamProcessor.java | 3 +-
.../datastreamer/DataStreamerImpl.java | 14 ++-
.../datastreamer/DataStreamerImplSelfTest.java | 123 ++++++++++++++++++-
3 files changed, 132 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c3401cbd/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 789f0d2..84d536f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
@@ -340,7 +341,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
AffinityTopologyVersion topVer = fut.topologyVersion();
if (!allowOverwrite && !topVer.equals(req.topologyVersion())) {
- Exception err = new IgniteCheckedException(
+ Exception err = new ClusterTopologyCheckedException(
"DataStreamer will retry data transfer at stable topology " +
"[reqTop=" + req.topologyVersion() + ", topVer=" + topVer + ", node=remote]");
http://git-wip-us.apache.org/repos/asf/ignite/blob/c3401cbd/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index ae441de..df51fac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1782,10 +1782,16 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
try {
GridPeerDeployAware jobPda0 = jobPda;
- err = new IgniteCheckedException("DataStreamer request failed [node=" + nodeId + "]",
- (Throwable)U.unmarshal(ctx,
- errBytes,
- U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config())));
+ final Throwable cause = U.unmarshal(
+ ctx,
+ errBytes,
+ U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config()));
+
+ final String msg = "DataStreamer request failed [node=" + nodeId + "]";
+
+ err = cause instanceof ClusterTopologyCheckedException ?
+ new ClusterTopologyCheckedException(msg, cause) :
+ new IgniteCheckedException(msg, cause);
}
catch (IgniteCheckedException e) {
f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e));
http://git-wip-us.apache.org/repos/asf/ignite/blob/c3401cbd/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
index 6d10312..e72a9b4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.datastreamer;
+import java.io.StringWriter;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
@@ -25,17 +26,29 @@ import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheServerNotFoundException;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Logger;
+import org.apache.log4j.SimpleLayout;
+import org.apache.log4j.WriterAppender;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -50,12 +63,18 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
/** Number of keys to load via data streamer. */
private static final int KEYS_COUNT = 1000;
+ /** Next nodes after MAX_CACHE_COUNT start without cache */
+ private static final int MAX_CACHE_COUNT = 4;
+
/** Started grid counter. */
private static int cnt;
/** No nodes filter. */
private static volatile boolean noNodesFilter;
+ /** Indicates whether we need to make the topology stale */
+ private static boolean needStaleTop = false;
+
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
@@ -72,8 +91,9 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
cfg.setDiscoverySpi(discoSpi);
- // Forth node goes without cache.
- if (cnt < 4)
+ cfg.setCommunicationSpi(new StaleTopologyCommunicationSpi());
+
+ if (cnt < MAX_CACHE_COUNT)
cfg.setCacheConfiguration(cacheConfiguration());
cnt++;
@@ -232,6 +252,44 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
}
/**
+ * Cluster topology mismatch shall result in DataStreamer retrying cache update with the latest topology and
+ * no error logged to the console.
+ *
+ * @throws Exception if failed
+ */
+ public void testRetryWhenTopologyMismatch() throws Exception {
+ final int KEY = 1;
+ final String VAL = "1";
+
+ cnt = 0;
+
+ StringWriter logWriter = new StringWriter();
+ Appender logAppender = new WriterAppender(new SimpleLayout(), logWriter);
+
+ Logger.getRootLogger().addAppender(logAppender);
+
+ startGrids(MAX_CACHE_COUNT - 1); // cache-enabled nodes
+
+ try (Ignite ignite = startGrid(MAX_CACHE_COUNT);
+ IgniteDataStreamer<Integer, String> streamer = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
+
+ needStaleTop = true; // simulate stale topology for the next action
+
+ streamer.addData(KEY, VAL);
+ } finally {
+ needStaleTop = false;
+
+ logWriter.flush();
+
+ Logger.getRootLogger().removeAppender(logAppender);
+
+ logAppender.close();
+ }
+
+ assertFalse(logWriter.toString().contains("DataStreamer will retry data transfer at stable topology"));
+ }
+
+ /**
* Gets cache configuration.
*
* @return Cache configuration.
@@ -248,4 +306,63 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
return cacheCfg;
}
-}
+
+ /**
+ * Simulate stale (not up-to-date) topology
+ */
+ private static class StaleTopologyCommunicationSpi extends TcpCommunicationSpi {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) {
+ // Send stale topology only in the first request to avoid indefinitely getting failures.
+ if (needStaleTop) {
+ if (msg instanceof GridIoMessage) {
+ GridIoMessage ioMsg = (GridIoMessage)msg;
+
+ Message appMsg = ioMsg.message();
+
+ if (appMsg != null && appMsg instanceof DataStreamerRequest) {
+ DataStreamerRequest req = (DataStreamerRequest)appMsg;
+
+ AffinityTopologyVersion validTop = req.topologyVersion();
+
+ // Simulate situation when a node did not receive the latest "node joined" topology update causing
+ // topology mismatch
+ AffinityTopologyVersion staleTop = new AffinityTopologyVersion(
+ validTop.topologyVersion() - 1,
+ validTop.minorTopologyVersion());
+
+ appMsg = new DataStreamerRequest(
+ req.requestId(),
+ req.responseTopicBytes(),
+ req.cacheName(),
+ req.updaterBytes(),
+ req.entries(),
+ req.ignoreDeploymentOwnership(),
+ req.skipStore(),
+ req.keepBinary(),
+ req.deploymentMode(),
+ req.sampleClassName(),
+ req.userVersion(),
+ req.participants(),
+ req.classLoaderId(),
+ req.forceLocalDeployment(),
+ staleTop);
+
+ msg = new GridIoMessage(
+ GridTestUtils.<Byte>getFieldValue(ioMsg, "plc"),
+ GridTestUtils.getFieldValue(ioMsg, "topic"),
+ GridTestUtils.<Integer>getFieldValue(ioMsg, "topicOrd"),
+ appMsg,
+ GridTestUtils.<Boolean>getFieldValue(ioMsg, "ordered"),
+ ioMsg.timeout(),
+ ioMsg.skipOnTimeout());
+
+ needStaleTop = false;
+ }
+ }
+ }
+
+ super.sendMessage(node, msg, ackC);
+ }
+ }
+}
\ No newline at end of file