You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/06 11:28:04 UTC

[5/9] ignite git commit: IGNITE-4901 Decrease logging level for DataStremer retry

IGNITE-4901 Decrease logging level for DataStremer retry


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c3401cbd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c3401cbd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c3401cbd

Branch: refs/heads/ignite-5075-1
Commit: c3401cbd009d7b9cfc8aed0cc9c3f34fb5f433db
Parents: 740b0b2
Author: Alexey Kukushkin <Al...@yahoo.com>
Authored: Thu Jul 6 12:44:27 2017 +0300
Committer: Alexey Kukushkin <Al...@yahoo.com>
Committed: Thu Jul 6 12:44:27 2017 +0300

----------------------------------------------------------------------
 .../datastreamer/DataStreamProcessor.java       |   3 +-
 .../datastreamer/DataStreamerImpl.java          |  14 ++-
 .../datastreamer/DataStreamerImplSelfTest.java  | 123 ++++++++++++++++++-
 3 files changed, 132 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c3401cbd/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 789f0d2..84d536f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
@@ -340,7 +341,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
                 AffinityTopologyVersion topVer = fut.topologyVersion();
 
                 if (!allowOverwrite && !topVer.equals(req.topologyVersion())) {
-                    Exception err = new IgniteCheckedException(
+                    Exception err = new ClusterTopologyCheckedException(
                         "DataStreamer will retry data transfer at stable topology " +
                             "[reqTop=" + req.topologyVersion() + ", topVer=" + topVer + ", node=remote]");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3401cbd/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index ae441de..df51fac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1782,10 +1782,16 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                 try {
                     GridPeerDeployAware jobPda0 = jobPda;
 
-                    err = new IgniteCheckedException("DataStreamer request failed [node=" + nodeId + "]",
-                        (Throwable)U.unmarshal(ctx,
-                            errBytes,
-                            U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config())));
+                    final Throwable cause = U.unmarshal(
+                        ctx,
+                        errBytes,
+                        U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config()));
+
+                    final String msg = "DataStreamer request failed [node=" + nodeId + "]";
+
+                    err = cause instanceof ClusterTopologyCheckedException ?
+                        new ClusterTopologyCheckedException(msg, cause) :
+                        new IgniteCheckedException(msg, cause);
                 }
                 catch (IgniteCheckedException e) {
                     f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e));

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3401cbd/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
index 6d10312..e72a9b4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.datastreamer;
 
+import java.io.StringWriter;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.Callable;
@@ -25,17 +26,29 @@ import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheServerNotFoundException;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Logger;
+import org.apache.log4j.SimpleLayout;
+import org.apache.log4j.WriterAppender;
 
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -50,12 +63,18 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
     /** Number of keys to load via data streamer. */
     private static final int KEYS_COUNT = 1000;
 
+    /** Next nodes after MAX_CACHE_COUNT start without cache */
+    private static final int MAX_CACHE_COUNT = 4;
+
     /** Started grid counter. */
     private static int cnt;
 
     /** No nodes filter. */
     private static volatile boolean noNodesFilter;
 
+    /** Indicates whether we need to make the topology stale */
+    private static boolean needStaleTop = false;
+
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         super.afterTest();
@@ -72,8 +91,9 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
 
         cfg.setDiscoverySpi(discoSpi);
 
-        // Forth node goes without cache.
-        if (cnt < 4)
+        cfg.setCommunicationSpi(new StaleTopologyCommunicationSpi());
+
+        if (cnt < MAX_CACHE_COUNT)
             cfg.setCacheConfiguration(cacheConfiguration());
 
         cnt++;
@@ -232,6 +252,44 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Cluster topology mismatch shall result in DataStreamer retrying cache update with the latest topology and
+     * no error logged to the console.
+     *
+     * @throws Exception if failed
+     */
+    public void testRetryWhenTopologyMismatch() throws Exception {
+        final int KEY = 1;
+        final String VAL = "1";
+
+        cnt = 0;
+
+        StringWriter logWriter = new StringWriter();
+        Appender logAppender = new WriterAppender(new SimpleLayout(), logWriter);
+
+        Logger.getRootLogger().addAppender(logAppender);
+
+        startGrids(MAX_CACHE_COUNT - 1); // cache-enabled nodes
+
+        try (Ignite ignite = startGrid(MAX_CACHE_COUNT);
+             IgniteDataStreamer<Integer, String> streamer = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
+
+            needStaleTop = true; // simulate stale topology for the next action
+
+            streamer.addData(KEY, VAL);
+        } finally {
+            needStaleTop = false;
+
+            logWriter.flush();
+
+            Logger.getRootLogger().removeAppender(logAppender);
+
+            logAppender.close();
+        }
+
+        assertFalse(logWriter.toString().contains("DataStreamer will retry data transfer at stable topology"));
+    }
+
+    /**
      * Gets cache configuration.
      *
      * @return Cache configuration.
@@ -248,4 +306,63 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
 
         return cacheCfg;
     }
-}
+
+    /**
+     * Simulate stale (not up-to-date) topology
+     */
+    private static class StaleTopologyCommunicationSpi extends TcpCommunicationSpi {
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) {
+            // Send stale topology only in the first request to avoid indefinitely getting failures.
+            if (needStaleTop) {
+                if (msg instanceof GridIoMessage) {
+                    GridIoMessage ioMsg = (GridIoMessage)msg;
+
+                    Message appMsg = ioMsg.message();
+
+                    if (appMsg != null && appMsg instanceof DataStreamerRequest) {
+                        DataStreamerRequest req = (DataStreamerRequest)appMsg;
+
+                        AffinityTopologyVersion validTop = req.topologyVersion();
+
+                        // Simulate situation when a node did not receive the latest "node joined" topology update causing
+                        // topology mismatch
+                        AffinityTopologyVersion staleTop = new AffinityTopologyVersion(
+                            validTop.topologyVersion() - 1,
+                            validTop.minorTopologyVersion());
+
+                        appMsg = new DataStreamerRequest(
+                            req.requestId(),
+                            req.responseTopicBytes(),
+                            req.cacheName(),
+                            req.updaterBytes(),
+                            req.entries(),
+                            req.ignoreDeploymentOwnership(),
+                            req.skipStore(),
+                            req.keepBinary(),
+                            req.deploymentMode(),
+                            req.sampleClassName(),
+                            req.userVersion(),
+                            req.participants(),
+                            req.classLoaderId(),
+                            req.forceLocalDeployment(),
+                            staleTop);
+
+                        msg = new GridIoMessage(
+                            GridTestUtils.<Byte>getFieldValue(ioMsg, "plc"),
+                            GridTestUtils.getFieldValue(ioMsg, "topic"),
+                            GridTestUtils.<Integer>getFieldValue(ioMsg, "topicOrd"),
+                            appMsg,
+                            GridTestUtils.<Boolean>getFieldValue(ioMsg, "ordered"),
+                            ioMsg.timeout(),
+                            ioMsg.skipOnTimeout());
+
+                        needStaleTop = false;
+                    }
+                }
+            }
+
+            super.sendMessage(node, msg, ackC);
+        }
+    }
+}
\ No newline at end of file