You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2022/11/29 18:20:26 UTC
[hadoop] branch branch-3.3.5 updated: HDFS-16847: RBF: Prevents StateStoreFileSystemImpl from committing tmp file after encountering an IOException. (#5145)
This is an automated email from the ASF dual-hosted git repository.
omalley pushed a commit to branch branch-3.3.5
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3.5 by this push:
new 19468c1232c HDFS-16847: RBF: Prevents StateStoreFileSystemImpl from committing tmp file after encountering an IOException. (#5145)
19468c1232c is described below
commit 19468c1232c2bcf86f93480f065c055b0cfd0475
Author: Simbarashe Dzinamarira <sd...@linkedin.com>
AuthorDate: Mon Nov 28 16:47:01 2022 -0800
HDFS-16847: RBF: Prevents StateStoreFileSystemImpl from committing tmp file after encountering an IOException. (#5145)
---
.../store/driver/impl/StateStoreFileBaseImpl.java | 18 ++++++----------
.../store/driver/impl/StateStoreFileImpl.java | 4 +++-
.../driver/impl/StateStoreFileSystemImpl.java | 19 ++++++-----------
.../store/driver/TestStateStoreDriverBase.java | 19 +++++++++++++++++
.../store/driver/TestStateStoreFileSystem.java | 24 ++++++++++++++++++++++
5 files changed, 58 insertions(+), 26 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
index 8352bca12e9..1ed9f38474a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
@@ -85,7 +85,8 @@ public abstract class StateStoreFileBaseImpl
* @param path Path of the record to write.
* @return Writer for the record.
*/
- protected abstract <T extends BaseRecord> BufferedWriter getWriter(
+ @VisibleForTesting
+ public abstract <T extends BaseRecord> BufferedWriter getWriter(
String path);
/**
@@ -348,25 +349,18 @@ public abstract class StateStoreFileBaseImpl
for (Entry<String, T> entry : toWrite.entrySet()) {
String recordPath = entry.getKey();
String recordPathTemp = recordPath + "." + now() + TMP_MARK;
- BufferedWriter writer = getWriter(recordPathTemp);
- try {
+ boolean recordWrittenSuccessfully = true;
+ try (BufferedWriter writer = getWriter(recordPathTemp)) {
T record = entry.getValue();
String line = serializeString(record);
writer.write(line);
} catch (IOException e) {
LOG.error("Cannot write {}", recordPathTemp, e);
+ recordWrittenSuccessfully = false;
success = false;
- } finally {
- if (writer != null) {
- try {
- writer.close();
- } catch (IOException e) {
- LOG.error("Cannot close the writer for {}", recordPathTemp, e);
- }
- }
}
// Commit
- if (!rename(recordPathTemp, recordPath)) {
+ if (recordWrittenSuccessfully && !rename(recordPathTemp, recordPath)) {
LOG.error("Failed committing record into {}", recordPath);
success = false;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java
index 9d2b1ab2fb7..6ca26637161 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java
@@ -31,6 +31,7 @@ import java.util.Collections;
import java.util.List;
import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.slf4j.Logger;
@@ -125,7 +126,8 @@ public class StateStoreFileImpl extends StateStoreFileBaseImpl {
}
@Override
- protected <T extends BaseRecord> BufferedWriter getWriter(String filename) {
+ @VisibleForTesting
+ public <T extends BaseRecord> BufferedWriter getWriter(String filename) {
BufferedWriter writer = null;
try {
LOG.debug("Writing file: {}", filename);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java
index e6bf159e2f5..ee34d8a4cab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java
@@ -28,13 +28,14 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
@@ -82,17 +83,8 @@ public class StateStoreFileSystemImpl extends StateStoreFileBaseImpl {
@Override
protected boolean rename(String src, String dst) {
try {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem dfs = (DistributedFileSystem)fs;
- dfs.rename(new Path(src), new Path(dst), Options.Rename.OVERWRITE);
- return true;
- } else {
- // Replace should be atomic but not available
- if (fs.exists(new Path(dst))) {
- fs.delete(new Path(dst), true);
- }
- return fs.rename(new Path(src), new Path(dst));
- }
+ FileUtil.rename(fs, new Path(src), new Path(dst), Options.Rename.OVERWRITE);
+ return true;
} catch (Exception e) {
LOG.error("Cannot rename {} to {}", src, dst, e);
return false;
@@ -148,7 +140,8 @@ public class StateStoreFileSystemImpl extends StateStoreFileBaseImpl {
}
@Override
- protected <T extends BaseRecord> BufferedWriter getWriter(String pathName) {
+ @VisibleForTesting
+ public <T extends BaseRecord> BufferedWriter getWriter(String pathName) {
BufferedWriter writer = null;
Path path = new Path(pathName);
try {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
index fe1b9a5bfa0..06b05f45bbe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
@@ -234,6 +234,25 @@ public class TestStateStoreDriverBase {
assertEquals(11, records2.size());
}
+ public <T extends BaseRecord> void testInsertWithErrorDuringWrite(
+ StateStoreDriver driver, Class<T> recordClass)
+ throws IllegalArgumentException, IllegalAccessException, IOException {
+
+ assertTrue(driver.removeAll(recordClass));
+ QueryResult<T> queryResult0 = driver.get(recordClass);
+ List<T> records0 = queryResult0.getRecords();
+ assertTrue(records0.isEmpty());
+
+ // Insert single
+ BaseRecord record = generateFakeRecord(recordClass);
+ driver.put(record, true, false);
+
+ // Verify that no record was inserted.
+ QueryResult<T> queryResult1 = driver.get(recordClass);
+ List<T> records1 = queryResult1.getRecords();
+ assertEquals(0, records1.size());
+ }
+
public <T extends BaseRecord> void testFetchErrors(StateStoreDriver driver,
Class<T> clazz) throws IllegalAccessException, IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java
index 8c4b188cc47..dbd4b9bdae2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java
@@ -17,16 +17,26 @@
*/
package org.apache.hadoop.hdfs.server.federation.store.driver;
+import java.io.BufferedWriter;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils;
+import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileBaseImpl;
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileSystemImpl;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
/**
* Test the FileSystem (e.g., HDFS) implementation of the State Store driver.
@@ -91,4 +101,18 @@ public class TestStateStoreFileSystem extends TestStateStoreDriverBase {
throws IllegalArgumentException, IllegalAccessException, IOException {
testMetrics(getStateStoreDriver());
}
+
+ @Test
+ public void testInsertWithErrorDuringWrite()
+ throws IllegalArgumentException, IllegalAccessException, IOException {
+ StateStoreFileBaseImpl driver = spy((StateStoreFileBaseImpl)getStateStoreDriver());
+ doAnswer((Answer<BufferedWriter>) a -> {
+ BufferedWriter writer = (BufferedWriter) a.callRealMethod();
+ BufferedWriter spyWriter = spy(writer);
+ doThrow(IOException.class).when(spyWriter).write(any(String.class));
+ return spyWriter;
+ }).when(driver).getWriter(any());
+
+ testInsertWithErrorDuringWrite(driver, MembershipState.class);
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org