You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ignite.apache.org by "Pavel Pereslegin (Jira)" <ji...@apache.org> on 2019/11/01 11:44:00 UTC
[jira] [Created] (IGNITE-12349) File transmission can cause the
cluster to freeze.
Pavel Pereslegin created IGNITE-12349:
-----------------------------------------
Summary: File transmission can cause the cluster to freeze.
Key: IGNITE-12349
URL: https://issues.apache.org/jira/browse/IGNITE-12349
Project: Ignite
Issue Type: Bug
Affects Versions: 2.8
Reporter: Pavel Pereslegin
Assignee: Maxim Muzafarov
When we initiating file transmission - a timeout object with mutable endTime is added to the timeout processor "queue" (see TcpCommunicationSpi#openChannel).
Since endTime is mutable, a timeout for this object will never occur, moreover, at some point, this object will be the first in the "queue" and TimeoutProcessor will stop working at all.
Reproducer
{code:java}
public class FileTransmissionTimeoutProcessorTest extends GridCommonAbstractTest {
@After
public void after() throws Exception {
cleanPersistenceDir();
stopAllGrids();
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setPersistenceEnabled(true)
.setMaxSize(500L * 1024 * 1024)))
.setCacheConfiguration(new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME));
}
@Test
public void testChannelTimeoutObject() throws Exception {
IgniteEx snd = startGrid(0);
IgniteEx rcv = startGrid(1);
// Do some transfer between nodes.
initiateFileTransfer(snd, rcv);
GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
// Add new timeout object after file transmission timeout object.
snd.context().timeout().addTimeoutObject(new GridTimeoutObjectAdapter(DFLT_CONN_TIMEOUT + 1_000) {
@Override public void onTimeout() {
fut.onDone(true);
}
});
// The timeout processor will hang on the file transfer timeout object and will never complete the remaining tasks.
boolean success = fut.get(DFLT_CONN_TIMEOUT + 30_000);
assertTrue(success);
}
/** */
private void initiateFileTransfer(IgniteEx snd, IgniteEx rcv) throws IOException, IgniteCheckedException, InterruptedException {
snd.cluster().active(true);
awaitPartitionMapExchange();
try (IgniteDataStreamer<Integer, Integer> dataStreamer = snd.dataStreamer(DEFAULT_CACHE_NAME)) {
dataStreamer.allowOverwrite(true);
for (int i = 0; i < 10_000; i++)
dataStreamer.addData(i, i + DEFAULT_CACHE_NAME.hashCode());
}
Map<String, Long> fileSizes = new HashMap<>();
Map<String, Integer> fileCrcs = new HashMap<>();
Map<String, Serializable> fileParams = new HashMap<>();
assertTrue(snd.context().io().fileTransmissionSupported(rcv.localNode()));
File tempStore = U.resolveWorkDirectory(U.defaultWorkDirectory(), "ctmp", true);
rcv.context().io().addTransmissionHandler(GridTopic.TOPIC_CACHE.topic("test", 0), new TransmissionHandler() {
@Override public void onException(UUID nodeId, Throwable err) {
// No-op.
}
@Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
return new File(tempStore, fileMeta.name()).getAbsolutePath();
}
@Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta initMeta) {
return null;
}
@Override public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta) {
return new Consumer<File>() {
@Override public void accept(File file) {
assertTrue(fileSizes.containsKey(file.getName()));
// Save all params.
fileParams.putAll(initMeta.params());
}
};
}
});
IgniteInternalCache<Object, Object> defCache = snd.cachex(DEFAULT_CACHE_NAME);
File cacheDirIg0 = ((FilePageStoreManager)(defCache).context()
.shared()
.pageStore()).cacheWorkDir(defCache.configuration());
File[] cacheParts = cacheDirIg0.listFiles(new FilenameFilter() {
@Override public boolean accept(File dir, String name) {
return name.endsWith(FILE_SUFFIX);
}
});
for (File file : cacheParts) {
fileSizes.put(file.getName(), file.length());
fileCrcs.put(file.getName(), FastCrc.calcCrc(file));
}
try (GridIoManager.TransmissionSender sender = snd.context()
.io()
.openTransmissionSender(rcv.localNode().id(), GridTopic.TOPIC_CACHE.topic("test", 0))) {
// Iterate over cache partition cacheParts.
for (File file : cacheParts) {
Map<String, Serializable> params = new HashMap<>();
params.put(file.getName(), file.hashCode());
sender.send(file, params, TransmissionPolicy.FILE);
}
}
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)