You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2022/03/28 17:30:20 UTC

[ignite] branch master updated: IGNITE-16730 Eliminate usage of FileLock in MarshallerMappingFileStore (#9912)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ef8248  IGNITE-16730 Eliminate usage of FileLock in MarshallerMappingFileStore (#9912)
1ef8248 is described below

commit 1ef8248e28d004a0dbcae82756adaa1ec3a39b8b
Author: Nikolay <ni...@apache.org>
AuthorDate: Mon Mar 28 20:29:19 2022 +0300

    IGNITE-16730 Eliminate usage of FileLock in MarshallerMappingFileStore (#9912)
    
    Co-authored-by: Pavel Pereslegin <xx...@gmail.com>
---
 .../internal/MarshallerMappingFileStore.java       | 117 +++++++--------------
 .../ConcurrentMappingFileReadWriteTest.java        | 112 ++++++++++++++++++++
 .../ignite/testsuites/IgniteBasicTestSuite2.java   |   4 +-
 3 files changed, 151 insertions(+), 82 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
index 605ca31..960de35 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
@@ -25,9 +25,6 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.io.Writer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.nio.channels.OverlappingFileLockException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.util.concurrent.ThreadLocalRandom;
@@ -39,6 +36,9 @@ import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.marshaller.MarshallerContext;
 
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+
 /**
  * File-based persistence provider for {@link MarshallerContextImpl}.
  *
@@ -51,9 +51,6 @@ final class MarshallerMappingFileStore {
     /** */
     private static final String FILE_EXTENSION = ".classname";
 
-    /** File lock timeout in milliseconds. */
-    private static final int FILE_LOCK_TIMEOUT_MS = 5000;
-
     /** */
     private static final GridStripedLock fileLock = new GridStripedLock(32);
 
@@ -74,8 +71,7 @@ final class MarshallerMappingFileStore {
      * @param kctx Grid kernal context.
      * @param workDir custom marshaller work directory.
      */
-    MarshallerMappingFileStore(final GridKernalContext kctx,
-        final File workDir) throws IgniteCheckedException {
+    MarshallerMappingFileStore(final GridKernalContext kctx, final File workDir) throws IgniteCheckedException {
         this.ctx = kctx;
         this.mappingDir = workDir;
         log = kctx.log(MarshallerMappingFileStore.class);
@@ -88,36 +84,30 @@ final class MarshallerMappingFileStore {
      * @param typeId Type id.
      * @param typeName Type name.
      */
-    void writeMapping(byte platformId, int typeId, String typeName) {
+    public void writeMapping(byte platformId, int typeId, String typeName) {
         String fileName = getFileName(platformId, typeId);
 
+        File tmpFile = new File(mappingDir, fileName + ThreadLocalRandom.current().nextInt() + ".tmp");
+        File file = new File(mappingDir, fileName);
+
         Lock lock = fileLock(fileName);
 
         lock.lock();
 
         try {
-            File file = new File(mappingDir, fileName);
-
-            try (FileOutputStream out = new FileOutputStream(file)) {
+            try (FileOutputStream out = new FileOutputStream(tmpFile)) {
                 try (Writer writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)) {
-                    try (FileLock ignored = fileLock(out.getChannel(), false)) {
-                        writer.write(typeName);
+                    writer.write(typeName);
 
-                        writer.flush();
-                    }
+                    writer.flush();
                 }
             }
-            catch (IOException e) {
-                U.error(log, "Failed to write class name to file [platformId=" + platformId + "id=" + typeId +
-                        ", clsName=" + typeName + ", file=" + file.getAbsolutePath() + ']', e);
-            }
-            catch (OverlappingFileLockException ignored) {
-                if (log.isDebugEnabled())
-                    log.debug("File already locked (will ignore): " + file.getAbsolutePath());
-            }
-            catch (IgniteInterruptedCheckedException e) {
-                U.error(log, "Interrupted while waiting for acquiring file lock: " + file, e);
-            }
+
+            Files.move(tmpFile.toPath(), file.toPath(), REPLACE_EXISTING, ATOMIC_MOVE);
+        }
+        catch (IOException e) {
+            U.error(log, "Failed to write class name to file [platformId=" + platformId + ", id=" + typeId +
+                ", clsName=" + typeName + ", file=" + file.getAbsolutePath() + ']', e);
         }
         finally {
             lock.unlock();
@@ -125,11 +115,17 @@ final class MarshallerMappingFileStore {
     }
 
     /**
-     * @param fileName File name.
+     * @param platformId Platform id.
+     * @param typeId Type id.
      */
-    private String readMapping(String fileName) throws IgniteCheckedException {
-        ThreadLocalRandom rnd = null;
+    public String readMapping(byte platformId, int typeId) {
+        return readMapping(getFileName(platformId, typeId));
+    }
 
+    /**
+     * @param fileName File name.
+     */
+    private String readMapping(String fileName) {
         Lock lock = fileLock(fileName);
 
         lock.lock();
@@ -137,30 +133,17 @@ final class MarshallerMappingFileStore {
         try {
             File file = new File(mappingDir, fileName);
 
-            long time = 0;
-
-            while (true) {
-                try (FileInputStream in = new FileInputStream(file)) {
-                    try (BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
-                        try (FileLock ignored = fileLock(in.getChannel(), true)) {
-                            if (file.length() > 0)
-                                return reader.readLine();
+            try (FileInputStream in = new FileInputStream(file)) {
+                try (BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
+                    if (file.length() == 0)
+                        return null;
 
-                            if (rnd == null)
-                                rnd = ThreadLocalRandom.current();
-
-                            if (time == 0)
-                                time = System.nanoTime();
-                            else if (U.millisSinceNanos(time) >= FILE_LOCK_TIMEOUT_MS)
-                                return null;
-
-                            U.sleep(rnd.nextLong(50));
-                        }
-                    }
-                }
-                catch (IOException ignored) {
-                    return null;
+                    return reader.readLine();
                 }
+
+            }
+            catch (IOException ignored) {
+                return null;
             }
         }
         finally {
@@ -169,14 +152,6 @@ final class MarshallerMappingFileStore {
     }
 
     /**
-     * @param platformId Platform id.
-     * @param typeId Type id.
-     */
-    String readMapping(byte platformId, int typeId) throws IgniteCheckedException {
-        return readMapping(getFileName(platformId, typeId));
-    }
-
-    /**
      * Restores all mappings available in file system to marshaller context.
      * This method should be used only on node startup.
      *
@@ -310,7 +285,7 @@ final class MarshallerMappingFileStore {
      * @param platformId Platform id.
      * @param typeId Type id.
      */
-    private String getFileName(byte platformId, int typeId) {
+    String getFileName(byte platformId, int typeId) {
         return typeId + FILE_EXTENSION + platformId;
     }
 
@@ -321,24 +296,4 @@ final class MarshallerMappingFileStore {
     private static Lock fileLock(String fileName) {
         return fileLock.getLock(fileName.hashCode());
     }
-
-    /**
-     * @param ch File channel.
-     * @param shared Shared.
-     */
-    private static FileLock fileLock(
-            FileChannel ch,
-            boolean shared
-    ) throws IOException, IgniteInterruptedCheckedException {
-        ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
-        while (true) {
-            FileLock fileLock = ch.tryLock(0L, Long.MAX_VALUE, shared);
-
-            if (fileLock != null)
-                return fileLock;
-
-            U.sleep(rnd.nextLong(50));
-        }
-    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ConcurrentMappingFileReadWriteTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ConcurrentMappingFileReadWriteTest.java
new file mode 100644
index 0000000..f34b569
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/ConcurrentMappingFileReadWriteTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.binary.BinaryBasicIdMapper;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_MARSHALLER_PATH;
+
+/**
+ * Tests concurrent read/write operations for {@code org.apache.ignite.internal.MarshallerMappingFileStore}.
+ */
+public class ConcurrentMappingFileReadWriteTest extends GridCommonAbstractTest {
+    /** */
+    private static final byte PLATFORM_ID = (byte)0;
+
+    /** */
+    private static final int TYPE_ID = new BinaryBasicIdMapper().typeId(String.class.getName());
+
+    /** */
+    private File mappingDir;
+
+    /** */
+    private MarshallerMappingFileStore mappingFileStore;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        mappingDir = new File(U.workDirectory(null, null) + DFLT_MARSHALLER_PATH);
+        mappingDir.mkdirs();
+
+        mappingFileStore = new MarshallerMappingFileStore(
+            new StandaloneGridKernalContext(log, null, null),
+            mappingDir
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        U.delete(mappingDir);
+    }
+
+    /** */
+    @Test
+    public void testConcurrentMappingReadWrite() throws Exception {
+        int thCnt = 3;
+
+        CyclicBarrier barrier = new CyclicBarrier(thCnt);
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(() -> {
+            try {
+                barrier.await(getTestTimeout(), MILLISECONDS);
+            }
+            catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
+                throw new RuntimeException(e);
+            }
+
+            mappingFileStore.writeMapping(PLATFORM_ID, TYPE_ID, String.class.getName());
+
+            for (int i = 0; i < 10; i++) {
+                assertEquals(String.class.getName(), mappingFileStore.readMapping(PLATFORM_ID, TYPE_ID));
+
+                mappingFileStore.writeMapping(PLATFORM_ID, TYPE_ID, String.class.getName());
+            }
+        }, thCnt);
+
+        fut.get(getTestTimeout(), MILLISECONDS);
+    }
+
+    /** */
+    @Test
+    public void testRewriteOpenedFile() throws Exception {
+        mappingFileStore.writeMapping(PLATFORM_ID, TYPE_ID, String.class.getName());
+
+        File mappingFile = new File(mappingDir, mappingFileStore.getFileName(PLATFORM_ID, TYPE_ID));
+
+        try (FileInputStream in = new FileInputStream(mappingFile)) {
+            try (BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
+                // Rewrite file while it open. This can happen if CDC application read newly created file.
+                mappingFileStore.writeMapping(PLATFORM_ID, TYPE_ID, String.class.getName());
+
+                assertEquals(String.class.getName(), reader.readLine());
+            }
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java
index f6a2eef..027d64f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java
@@ -26,6 +26,7 @@ import org.apache.ignite.failure.OomFailureHandlerTest;
 import org.apache.ignite.failure.StopNodeFailureHandlerTest;
 import org.apache.ignite.failure.StopNodeOrHaltFailureHandlerTest;
 import org.apache.ignite.internal.ClassSetTest;
+import org.apache.ignite.internal.ConcurrentMappingFileReadWriteTest;
 import org.apache.ignite.internal.ConsistentIdImplicitlyExplicitlyTest;
 import org.apache.ignite.internal.DiagnosticLogForPartitionStatesTest;
 import org.apache.ignite.internal.GridPeerDeploymentRetryModifiedTest;
@@ -216,8 +217,9 @@ import org.junit.runners.Suite;
     NoopCheckpointSpiLoggingTest.class,
     JvmConfigurationSuggestionsTest.class,
     ExponentialBackoffTest.class,
-    ProgressSpeedCalculationTest.class
+    ProgressSpeedCalculationTest.class,
 
+    ConcurrentMappingFileReadWriteTest.class
 })
 public class IgniteBasicTestSuite2 {
 }