You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2022/03/28 17:30:20 UTC
[ignite] branch master updated: IGNITE-16730 Eliminate usage of FileLock in MarshallerMappingFileStore (#9912)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 1ef8248 IGNITE-16730 Eliminate usage of FileLock in MarshallerMappingFileStore (#9912)
1ef8248 is described below
commit 1ef8248e28d004a0dbcae82756adaa1ec3a39b8b
Author: Nikolay <ni...@apache.org>
AuthorDate: Mon Mar 28 20:29:19 2022 +0300
IGNITE-16730 Eliminate usage of FileLock in MarshallerMappingFileStore (#9912)
Co-authored-by: Pavel Pereslegin <xx...@gmail.com>
---
.../internal/MarshallerMappingFileStore.java | 117 +++++++--------------
.../ConcurrentMappingFileReadWriteTest.java | 112 ++++++++++++++++++++
.../ignite/testsuites/IgniteBasicTestSuite2.java | 4 +-
3 files changed, 151 insertions(+), 82 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
index 605ca31..960de35 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
@@ -25,9 +25,6 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.Writer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.nio.channels.OverlappingFileLockException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.concurrent.ThreadLocalRandom;
@@ -39,6 +36,9 @@ import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.MarshallerContext;
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+
/**
* File-based persistence provider for {@link MarshallerContextImpl}.
*
@@ -51,9 +51,6 @@ final class MarshallerMappingFileStore {
/** */
private static final String FILE_EXTENSION = ".classname";
- /** File lock timeout in milliseconds. */
- private static final int FILE_LOCK_TIMEOUT_MS = 5000;
-
/** */
private static final GridStripedLock fileLock = new GridStripedLock(32);
@@ -74,8 +71,7 @@ final class MarshallerMappingFileStore {
* @param kctx Grid kernal context.
* @param workDir custom marshaller work directory.
*/
- MarshallerMappingFileStore(final GridKernalContext kctx,
- final File workDir) throws IgniteCheckedException {
+ MarshallerMappingFileStore(final GridKernalContext kctx, final File workDir) throws IgniteCheckedException {
this.ctx = kctx;
this.mappingDir = workDir;
log = kctx.log(MarshallerMappingFileStore.class);
@@ -88,36 +84,30 @@ final class MarshallerMappingFileStore {
* @param typeId Type id.
* @param typeName Type name.
*/
- void writeMapping(byte platformId, int typeId, String typeName) {
+ public void writeMapping(byte platformId, int typeId, String typeName) {
String fileName = getFileName(platformId, typeId);
+ File tmpFile = new File(mappingDir, fileName + ThreadLocalRandom.current().nextInt() + ".tmp");
+ File file = new File(mappingDir, fileName);
+
Lock lock = fileLock(fileName);
lock.lock();
try {
- File file = new File(mappingDir, fileName);
-
- try (FileOutputStream out = new FileOutputStream(file)) {
+ try (FileOutputStream out = new FileOutputStream(tmpFile)) {
try (Writer writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)) {
- try (FileLock ignored = fileLock(out.getChannel(), false)) {
- writer.write(typeName);
+ writer.write(typeName);
- writer.flush();
- }
+ writer.flush();
}
}
- catch (IOException e) {
- U.error(log, "Failed to write class name to file [platformId=" + platformId + "id=" + typeId +
- ", clsName=" + typeName + ", file=" + file.getAbsolutePath() + ']', e);
- }
- catch (OverlappingFileLockException ignored) {
- if (log.isDebugEnabled())
- log.debug("File already locked (will ignore): " + file.getAbsolutePath());
- }
- catch (IgniteInterruptedCheckedException e) {
- U.error(log, "Interrupted while waiting for acquiring file lock: " + file, e);
- }
+
+ Files.move(tmpFile.toPath(), file.toPath(), REPLACE_EXISTING, ATOMIC_MOVE);
+ }
+ catch (IOException e) {
+ U.error(log, "Failed to write class name to file [platformId=" + platformId + ", id=" + typeId +
+ ", clsName=" + typeName + ", file=" + file.getAbsolutePath() + ']', e);
}
finally {
lock.unlock();
@@ -125,11 +115,17 @@ final class MarshallerMappingFileStore {
}
/**
- * @param fileName File name.
+ * @param platformId Platform id.
+ * @param typeId Type id.
*/
- private String readMapping(String fileName) throws IgniteCheckedException {
- ThreadLocalRandom rnd = null;
+ public String readMapping(byte platformId, int typeId) {
+ return readMapping(getFileName(platformId, typeId));
+ }
+ /**
+ * @param fileName File name.
+ */
+ private String readMapping(String fileName) {
Lock lock = fileLock(fileName);
lock.lock();
@@ -137,30 +133,17 @@ final class MarshallerMappingFileStore {
try {
File file = new File(mappingDir, fileName);
- long time = 0;
-
- while (true) {
- try (FileInputStream in = new FileInputStream(file)) {
- try (BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
- try (FileLock ignored = fileLock(in.getChannel(), true)) {
- if (file.length() > 0)
- return reader.readLine();
+ try (FileInputStream in = new FileInputStream(file)) {
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
+ if (file.length() == 0)
+ return null;
- if (rnd == null)
- rnd = ThreadLocalRandom.current();
-
- if (time == 0)
- time = System.nanoTime();
- else if (U.millisSinceNanos(time) >= FILE_LOCK_TIMEOUT_MS)
- return null;
-
- U.sleep(rnd.nextLong(50));
- }
- }
- }
- catch (IOException ignored) {
- return null;
+ return reader.readLine();
}
+
+ }
+ catch (IOException ignored) {
+ return null;
}
}
finally {
@@ -169,14 +152,6 @@ final class MarshallerMappingFileStore {
}
/**
- * @param platformId Platform id.
- * @param typeId Type id.
- */
- String readMapping(byte platformId, int typeId) throws IgniteCheckedException {
- return readMapping(getFileName(platformId, typeId));
- }
-
- /**
* Restores all mappings available in file system to marshaller context.
* This method should be used only on node startup.
*
@@ -310,7 +285,7 @@ final class MarshallerMappingFileStore {
* @param platformId Platform id.
* @param typeId Type id.
*/
- private String getFileName(byte platformId, int typeId) {
+ String getFileName(byte platformId, int typeId) {
return typeId + FILE_EXTENSION + platformId;
}
@@ -321,24 +296,4 @@ final class MarshallerMappingFileStore {
private static Lock fileLock(String fileName) {
return fileLock.getLock(fileName.hashCode());
}
-
- /**
- * @param ch File channel.
- * @param shared Shared.
- */
- private static FileLock fileLock(
- FileChannel ch,
- boolean shared
- ) throws IOException, IgniteInterruptedCheckedException {
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
- while (true) {
- FileLock fileLock = ch.tryLock(0L, Long.MAX_VALUE, shared);
-
- if (fileLock != null)
- return fileLock;
-
- U.sleep(rnd.nextLong(50));
- }
- }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ConcurrentMappingFileReadWriteTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ConcurrentMappingFileReadWriteTest.java
new file mode 100644
index 0000000..f34b569
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/ConcurrentMappingFileReadWriteTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.binary.BinaryBasicIdMapper;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_MARSHALLER_PATH;
+
+/**
+ * Tests concurrent read/write operations for {@code org.apache.ignite.internal.MarshallerMappingFileStore}.
+ */
+public class ConcurrentMappingFileReadWriteTest extends GridCommonAbstractTest {
+ /** */
+ private static final byte PLATFORM_ID = (byte)0;
+
+ /** */
+ private static final int TYPE_ID = new BinaryBasicIdMapper().typeId(String.class.getName());
+
+ /** */
+ private File mappingDir;
+
+ /** */
+ private MarshallerMappingFileStore mappingFileStore;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ mappingDir = new File(U.workDirectory(null, null) + DFLT_MARSHALLER_PATH);
+ mappingDir.mkdirs();
+
+ mappingFileStore = new MarshallerMappingFileStore(
+ new StandaloneGridKernalContext(log, null, null),
+ mappingDir
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ U.delete(mappingDir);
+ }
+
+ /** */
+ @Test
+ public void testConcurrentMappingReadWrite() throws Exception {
+ int thCnt = 3;
+
+ CyclicBarrier barrier = new CyclicBarrier(thCnt);
+
+ IgniteInternalFuture<?> fut = multithreadedAsync(() -> {
+ try {
+ barrier.await(getTestTimeout(), MILLISECONDS);
+ }
+ catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
+ throw new RuntimeException(e);
+ }
+
+ mappingFileStore.writeMapping(PLATFORM_ID, TYPE_ID, String.class.getName());
+
+ for (int i = 0; i < 10; i++) {
+ assertEquals(String.class.getName(), mappingFileStore.readMapping(PLATFORM_ID, TYPE_ID));
+
+ mappingFileStore.writeMapping(PLATFORM_ID, TYPE_ID, String.class.getName());
+ }
+ }, thCnt);
+
+ fut.get(getTestTimeout(), MILLISECONDS);
+ }
+
+ /** */
+ @Test
+ public void testRewriteOpenedFile() throws Exception {
+ mappingFileStore.writeMapping(PLATFORM_ID, TYPE_ID, String.class.getName());
+
+ File mappingFile = new File(mappingDir, mappingFileStore.getFileName(PLATFORM_ID, TYPE_ID));
+
+ try (FileInputStream in = new FileInputStream(mappingFile)) {
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
+ // Rewrite file while it open. This can happen if CDC application read newly created file.
+ mappingFileStore.writeMapping(PLATFORM_ID, TYPE_ID, String.class.getName());
+
+ assertEquals(String.class.getName(), reader.readLine());
+ }
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java
index f6a2eef..027d64f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java
@@ -26,6 +26,7 @@ import org.apache.ignite.failure.OomFailureHandlerTest;
import org.apache.ignite.failure.StopNodeFailureHandlerTest;
import org.apache.ignite.failure.StopNodeOrHaltFailureHandlerTest;
import org.apache.ignite.internal.ClassSetTest;
+import org.apache.ignite.internal.ConcurrentMappingFileReadWriteTest;
import org.apache.ignite.internal.ConsistentIdImplicitlyExplicitlyTest;
import org.apache.ignite.internal.DiagnosticLogForPartitionStatesTest;
import org.apache.ignite.internal.GridPeerDeploymentRetryModifiedTest;
@@ -216,8 +217,9 @@ import org.junit.runners.Suite;
NoopCheckpointSpiLoggingTest.class,
JvmConfigurationSuggestionsTest.class,
ExponentialBackoffTest.class,
- ProgressSpeedCalculationTest.class
+ ProgressSpeedCalculationTest.class,
+ ConcurrentMappingFileReadWriteTest.class
})
public class IgniteBasicTestSuite2 {
}