You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ignite.apache.org by "Pavel Pereslegin (Jira)" <ji...@apache.org> on 2019/11/01 11:44:00 UTC

[jira] [Created] (IGNITE-12349) File transmission can cause the cluster to freeze.

Pavel Pereslegin created IGNITE-12349:
-----------------------------------------

             Summary: File transmission can cause the cluster to freeze.
                 Key: IGNITE-12349
                 URL: https://issues.apache.org/jira/browse/IGNITE-12349
             Project: Ignite
          Issue Type: Bug
    Affects Versions: 2.8
            Reporter: Pavel Pereslegin
            Assignee: Maxim Muzafarov


When we initiating file transmission - a timeout object with mutable endTime is added to the timeout processor "queue" (see TcpCommunicationSpi#openChannel).
Since endTime is mutable, a timeout for this object will never occur, moreover, at some point, this object will be the first in the "queue" and TimeoutProcessor will stop working at all.

Reproducer
{code:java}
public class FileTransmissionTimeoutProcessorTest extends GridCommonAbstractTest {
    @After
    public void after() throws Exception {
        cleanPersistenceDir();
        stopAllGrids();
    }

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        return super.getConfiguration(igniteInstanceName)
            .setDataStorageConfiguration(new DataStorageConfiguration()
                .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
                    .setPersistenceEnabled(true)
                    .setMaxSize(500L * 1024 * 1024)))
            .setCacheConfiguration(new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME));
    }

    @Test
    public void testChannelTimeoutObject() throws Exception {
        IgniteEx snd = startGrid(0);
        IgniteEx rcv = startGrid(1);

        // Do some transfer between nodes.
        initiateFileTransfer(snd, rcv);

        GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();

        // Add new timeout object after file transmission timeout object.
        snd.context().timeout().addTimeoutObject(new GridTimeoutObjectAdapter(DFLT_CONN_TIMEOUT + 1_000) {
            @Override public void onTimeout() {
                fut.onDone(true);
            }
        });

        // The timeout processor will hang on the file transfer timeout object and will never complete the remaining tasks.
        boolean success = fut.get(DFLT_CONN_TIMEOUT + 30_000);

        assertTrue(success);
    }

    /** */
    private void initiateFileTransfer(IgniteEx snd, IgniteEx rcv) throws IOException, IgniteCheckedException, InterruptedException {
        snd.cluster().active(true);

        awaitPartitionMapExchange();

        try (IgniteDataStreamer<Integer, Integer> dataStreamer = snd.dataStreamer(DEFAULT_CACHE_NAME)) {
            dataStreamer.allowOverwrite(true);

            for (int i = 0; i < 10_000; i++)
                dataStreamer.addData(i, i + DEFAULT_CACHE_NAME.hashCode());
        }

        Map<String, Long> fileSizes = new HashMap<>();
        Map<String, Integer> fileCrcs = new HashMap<>();
        Map<String, Serializable> fileParams = new HashMap<>();

        assertTrue(snd.context().io().fileTransmissionSupported(rcv.localNode()));

        File tempStore = U.resolveWorkDirectory(U.defaultWorkDirectory(), "ctmp", true);

        rcv.context().io().addTransmissionHandler(GridTopic.TOPIC_CACHE.topic("test", 0), new TransmissionHandler() {
            @Override public void onException(UUID nodeId, Throwable err) {
                // No-op.
            }

            @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
                return new File(tempStore, fileMeta.name()).getAbsolutePath();
            }

            @Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta initMeta) {
                return null;
            }

            @Override public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta) {
                return new Consumer<File>() {
                    @Override public void accept(File file) {
                        assertTrue(fileSizes.containsKey(file.getName()));
                        // Save all params.
                        fileParams.putAll(initMeta.params());
                    }
                };
            }
        });

        IgniteInternalCache<Object, Object> defCache = snd.cachex(DEFAULT_CACHE_NAME);

        File cacheDirIg0 = ((FilePageStoreManager)(defCache).context()
            .shared()
            .pageStore()).cacheWorkDir(defCache.configuration());

        File[] cacheParts = cacheDirIg0.listFiles(new FilenameFilter() {
            @Override public boolean accept(File dir, String name) {
                return name.endsWith(FILE_SUFFIX);
            }
        });

        for (File file : cacheParts) {
            fileSizes.put(file.getName(), file.length());
            fileCrcs.put(file.getName(), FastCrc.calcCrc(file));
        }

        try (GridIoManager.TransmissionSender sender = snd.context()
            .io()
            .openTransmissionSender(rcv.localNode().id(), GridTopic.TOPIC_CACHE.topic("test", 0))) {
            // Iterate over cache partition cacheParts.
            for (File file : cacheParts) {
                Map<String, Serializable> params = new HashMap<>();

                params.put(file.getName(), file.hashCode());

                sender.send(file, params, TransmissionPolicy.FILE);
            }
        }
    }
}
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)