You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2022/11/29 01:09:33 UTC

[hadoop] branch branch-3.3 updated: HDFS-16847: RBF: Prevents StateStoreFileSystemImpl from committing tmp file after encountering an IOException. (#5145)

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 9d37ee082cf HDFS-16847: RBF: Prevents StateStoreFileSystemImpl from committing tmp file after encountering an IOException. (#5145)
9d37ee082cf is described below

commit 9d37ee082cf3157abb5ae08d4120cf6fbe3ad3a6
Author: Simbarashe Dzinamarira <sd...@linkedin.com>
AuthorDate: Mon Nov 28 16:47:01 2022 -0800

    HDFS-16847: RBF: Prevents StateStoreFileSystemImpl from committing tmp file after encountering an IOException. (#5145)
---
 .../store/driver/impl/StateStoreFileBaseImpl.java  | 18 ++++++----------
 .../store/driver/impl/StateStoreFileImpl.java      |  4 +++-
 .../driver/impl/StateStoreFileSystemImpl.java      | 19 ++++++-----------
 .../store/driver/TestStateStoreDriverBase.java     | 19 +++++++++++++++++
 .../store/driver/TestStateStoreFileSystem.java     | 24 ++++++++++++++++++++++
 5 files changed, 58 insertions(+), 26 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
index 8352bca12e9..1ed9f38474a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
@@ -85,7 +85,8 @@ public abstract class StateStoreFileBaseImpl
    * @param path Path of the record to write.
    * @return Writer for the record.
    */
-  protected abstract <T extends BaseRecord> BufferedWriter getWriter(
+  @VisibleForTesting
+  public abstract <T extends BaseRecord> BufferedWriter getWriter(
       String path);
 
   /**
@@ -348,25 +349,18 @@ public abstract class StateStoreFileBaseImpl
     for (Entry<String, T> entry : toWrite.entrySet()) {
       String recordPath = entry.getKey();
       String recordPathTemp = recordPath + "." + now() + TMP_MARK;
-      BufferedWriter writer = getWriter(recordPathTemp);
-      try {
+      boolean recordWrittenSuccessfully = true;
+      try (BufferedWriter writer = getWriter(recordPathTemp)) {
         T record = entry.getValue();
         String line = serializeString(record);
         writer.write(line);
       } catch (IOException e) {
         LOG.error("Cannot write {}", recordPathTemp, e);
+        recordWrittenSuccessfully = false;
         success = false;
-      } finally {
-        if (writer != null) {
-          try {
-            writer.close();
-          } catch (IOException e) {
-            LOG.error("Cannot close the writer for {}", recordPathTemp, e);
-          }
-        }
       }
       // Commit
-      if (!rename(recordPathTemp, recordPath)) {
+      if (recordWrittenSuccessfully && !rename(recordPathTemp, recordPath)) {
         LOG.error("Failed committing record into {}", recordPath);
         success = false;
       }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java
index 9d2b1ab2fb7..6ca26637161 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java
@@ -31,6 +31,7 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
 import org.slf4j.Logger;
@@ -125,7 +126,8 @@ public class StateStoreFileImpl extends StateStoreFileBaseImpl {
   }
 
   @Override
-  protected <T extends BaseRecord> BufferedWriter getWriter(String filename) {
+  @VisibleForTesting
+  public <T extends BaseRecord> BufferedWriter getWriter(String filename) {
     BufferedWriter writer = null;
     try {
       LOG.debug("Writing file: {}", filename);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java
index e6bf159e2f5..ee34d8a4cab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java
@@ -28,13 +28,14 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
 import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
@@ -82,17 +83,8 @@ public class StateStoreFileSystemImpl extends StateStoreFileBaseImpl {
   @Override
   protected boolean rename(String src, String dst) {
     try {
-      if (fs instanceof DistributedFileSystem) {
-        DistributedFileSystem dfs = (DistributedFileSystem)fs;
-        dfs.rename(new Path(src), new Path(dst), Options.Rename.OVERWRITE);
-        return true;
-      } else {
-        // Replace should be atomic but not available
-        if (fs.exists(new Path(dst))) {
-          fs.delete(new Path(dst), true);
-        }
-        return fs.rename(new Path(src), new Path(dst));
-      }
+      FileUtil.rename(fs, new Path(src), new Path(dst), Options.Rename.OVERWRITE);
+      return true;
     } catch (Exception e) {
       LOG.error("Cannot rename {} to {}", src, dst, e);
       return false;
@@ -148,7 +140,8 @@ public class StateStoreFileSystemImpl extends StateStoreFileBaseImpl {
   }
 
   @Override
-  protected <T extends BaseRecord> BufferedWriter getWriter(String pathName) {
+  @VisibleForTesting
+  public <T extends BaseRecord> BufferedWriter getWriter(String pathName) {
     BufferedWriter writer = null;
     Path path = new Path(pathName);
     try {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
index fe1b9a5bfa0..06b05f45bbe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
@@ -234,6 +234,25 @@ public class TestStateStoreDriverBase {
     assertEquals(11, records2.size());
   }
 
+  public <T extends BaseRecord> void testInsertWithErrorDuringWrite(
+      StateStoreDriver driver, Class<T> recordClass)
+      throws IllegalArgumentException, IllegalAccessException, IOException {
+
+    assertTrue(driver.removeAll(recordClass));
+    QueryResult<T> queryResult0 = driver.get(recordClass);
+    List<T> records0 = queryResult0.getRecords();
+    assertTrue(records0.isEmpty());
+
+    // Insert single
+    BaseRecord record = generateFakeRecord(recordClass);
+    driver.put(record, true, false);
+
+    // Verify that no record was inserted.
+    QueryResult<T> queryResult1 = driver.get(recordClass);
+    List<T> records1 = queryResult1.getRecords();
+    assertEquals(0, records1.size());
+  }
+
   public <T extends BaseRecord> void testFetchErrors(StateStoreDriver driver,
       Class<T> clazz) throws IllegalAccessException, IOException {
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java
index 8c4b188cc47..dbd4b9bdae2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java
@@ -17,16 +17,26 @@
  */
 package org.apache.hadoop.hdfs.server.federation.store.driver;
 
+import java.io.BufferedWriter;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils;
+import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileBaseImpl;
 import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileSystemImpl;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
 
 /**
  * Test the FileSystem (e.g., HDFS) implementation of the State Store driver.
@@ -91,4 +101,18 @@ public class TestStateStoreFileSystem extends TestStateStoreDriverBase {
       throws IllegalArgumentException, IllegalAccessException, IOException {
     testMetrics(getStateStoreDriver());
   }
+
+  @Test
+  public void testInsertWithErrorDuringWrite()
+      throws IllegalArgumentException, IllegalAccessException, IOException {
+    StateStoreFileBaseImpl driver = spy((StateStoreFileBaseImpl)getStateStoreDriver());
+    doAnswer((Answer<BufferedWriter>) a -> {
+      BufferedWriter writer = (BufferedWriter) a.callRealMethod();
+      BufferedWriter spyWriter = spy(writer);
+      doThrow(IOException.class).when(spyWriter).write(any(String.class));
+      return spyWriter;
+    }).when(driver).getWriter(any());
+
+    testInsertWithErrorDuringWrite(driver, MembershipState.class);
+  }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org