You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yq...@apache.org on 2019/12/04 10:19:32 UTC
[hadoop] branch trunk updated: HDFS-13811. RBF: Race condition
between router admin quota update and periodic quota update service.
Contributed by Jinglun.
This is an automated email from the ASF dual-hosted git repository.
yqlin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 47fdae7 HDFS-13811. RBF: Race condition between router admin quota update and periodic quota update service. Contributed by Jinglun.
47fdae7 is described below
commit 47fdae79041ba2bb036ef7723a93ade5b1ac3619
Author: Yiqun Lin <yq...@apache.org>
AuthorDate: Wed Dec 4 18:19:11 2019 +0800
HDFS-13811. RBF: Race condition between router admin quota update and periodic quota update service. Contributed by Jinglun.
---
.../hdfs/server/federation/router/Router.java | 8 ++
.../federation/router/RouterQuotaManager.java | 21 ++++
.../router/RouterQuotaUpdateService.java | 40 +------
.../server/federation/store/MountTableStore.java | 12 ++-
.../federation/store/impl/MountTableStoreImpl.java | 15 +++
.../server/federation/router/TestRouterQuota.java | 115 +++++++++++++++++++--
6 files changed, 166 insertions(+), 45 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
index a03d8d4..64fdabe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.server.federation.metrics.RBFMetrics;
import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -292,6 +293,13 @@ public class Router extends CompositeService implements
}
super.serviceInit(conf);
+
+ // Set quota manager in mount store to update quota usage in mount table.
+ if (stateStore != null) {
+ MountTableStore mountstore =
+ this.stateStore.getRegisteredRecordStore(MountTableStore.class);
+ mountstore.setQuotaManager(this.quotaManager);
+ }
}
private String getDisabledDependentServices() {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java
index c1a5146..ceb758e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java
@@ -170,6 +170,27 @@ public class RouterQuotaManager {
}
/**
+ * Update quota in cache. The usage will be preserved.
+ * @param path Mount table path.
+ * @param quota Corresponding quota value.
+ */
+ public void updateQuota(String path, RouterQuotaUsage quota) {
+ writeLock.lock();
+ try {
+ RouterQuotaUsage.Builder builder = new RouterQuotaUsage.Builder()
+ .quota(quota.getQuota()).spaceQuota(quota.getSpaceQuota());
+ RouterQuotaUsage current = this.cache.get(path);
+ if (current != null) {
+ builder.fileAndDirectoryCount(current.getFileAndDirectoryCount())
+ .spaceConsumed(current.getSpaceConsumed());
+ }
+ this.cache.put(path, builder.build());
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
* Remove the entity from cache.
* @param path Mount table path.
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
index e5d4472..f1a86bf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
@@ -35,15 +35,13 @@ import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
-import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Service to periodically update the {@link RouterQuotaUsage}
- * cached information in the {@link Router} and update corresponding
- * mount table in State Store.
+ * cached information in the {@link Router}.
*/
public class RouterQuotaUpdateService extends PeriodicService {
private static final Logger LOG =
@@ -81,7 +79,6 @@ public class RouterQuotaUpdateService extends PeriodicService {
protected void periodicInvoke() {
LOG.debug("Start to update quota cache.");
try {
- List<MountTable> updateMountTables = new LinkedList<>();
List<MountTable> mountTables = getQuotaSetMountTables();
Map<RemoteLocation, QuotaUsage> remoteQuotaUsage = new HashMap<>();
for (MountTable entry : mountTables) {
@@ -122,18 +119,6 @@ public class RouterQuotaUpdateService extends PeriodicService {
currentQuotaUsage);
this.quotaManager.put(src, newQuota);
entry.setQuota(newQuota);
-
- // only update mount tables which quota was changed
- if (!oldQuota.equals(newQuota)) {
- updateMountTables.add(entry);
-
- LOG.debug(
- "Update quota usage entity of path: {}, nsCount: {},"
- + " nsQuota: {}, ssCount: {}, ssQuota: {}.",
- src, newQuota.getFileAndDirectoryCount(),
- newQuota.getQuota(), newQuota.getSpaceConsumed(),
- newQuota.getSpaceQuota());
- }
}
// Fix inconsistent quota.
@@ -143,8 +128,6 @@ public class RouterQuotaUpdateService extends PeriodicService {
QuotaUsage currentQuota = en.getValue();
fixGlobalQuota(remoteLocation, currentQuota);
}
-
- updateMountTableEntries(updateMountTables);
} catch (IOException e) {
LOG.error("Quota cache updated error.", e);
}
@@ -219,7 +202,7 @@ public class RouterQuotaUpdateService extends PeriodicService {
// update mount table entries info in quota cache
String src = entry.getSourcePath();
- this.quotaManager.put(src, entry.getQuota());
+ this.quotaManager.updateQuota(src, entry.getQuota());
stalePaths.remove(src);
}
@@ -258,23 +241,4 @@ public class RouterQuotaUpdateService extends PeriodicService {
.spaceQuota(oldQuota.getSpaceQuota()).build();
return newQuota;
}
-
- /**
- * Write out updated mount table entries into State Store.
- * @param updateMountTables Mount tables to be updated.
- * @throws IOException
- */
- private void updateMountTableEntries(List<MountTable> updateMountTables)
- throws IOException {
- for (MountTable entry : updateMountTables) {
- UpdateMountTableEntryRequest updateRequest = UpdateMountTableEntryRequest
- .newInstance(entry);
- try {
- getMountTableStore().updateMountTableEntry(updateRequest);
- } catch (IOException e) {
- LOG.error("Quota update error for mount entry "
- + entry.getSourcePath(), e);
- }
- }
- }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java
index 9d4b64b..4b668c41 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.router.MountTableRefresherService;
+import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaManager;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.slf4j.Logger;
@@ -48,7 +49,8 @@ public abstract class MountTableStore extends CachedRecordStore<MountTable>
private static final Logger LOG =
LoggerFactory.getLogger(MountTableStore.class);
private MountTableRefresherService refreshService;
-
+ /** Router quota manager to update quota usage in mount table. */
+ private RouterQuotaManager quotaManager;
public MountTableStore(StateStoreDriver driver) {
super(MountTable.class, driver);
}
@@ -57,6 +59,14 @@ public abstract class MountTableStore extends CachedRecordStore<MountTable>
this.refreshService = refreshService;
}
+ public void setQuotaManager(RouterQuotaManager quotaManager) {
+ this.quotaManager = quotaManager;
+ }
+
+ public RouterQuotaManager getQuotaManager() {
+ return quotaManager;
+ }
+
/**
* Update mount table cache of this router as well as all other routers.
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
index fb9f6a3..ea54f8b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.server.federation.router.RouterAdminServer;
import org.apache.hadoop.hdfs.server.federation.router.RouterPermissionChecker;
+import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
@@ -153,6 +154,20 @@ public class MountTableStoreImpl extends MountTableStore {
it.remove();
}
}
+ // If quota manager is not null, update quota usage from quota cache.
+ if (this.getQuotaManager() != null) {
+ RouterQuotaUsage quota =
+ this.getQuotaManager().getQuotaUsage(record.getSourcePath());
+ if (quota != null) {
+ RouterQuotaUsage oldquota = record.getQuota();
+ RouterQuotaUsage newQuota = new RouterQuotaUsage.Builder()
+ .fileAndDirectoryCount(quota.getFileAndDirectoryCount())
+ .quota(oldquota.getQuota())
+ .spaceConsumed(quota.getSpaceConsumed())
+ .spaceQuota(oldquota.getSpaceQuota()).build();
+ record.setQuota(newQuota);
+ }
+ }
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java
index 19f7ad6..e01cea4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
@@ -64,6 +65,7 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableE
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
@@ -482,6 +484,10 @@ public class TestRouterQuota {
return removeResponse.getStatus();
}
+ /**
+ * Test {@link RouterQuotaUpdateService#periodicInvoke()} updates quota usage
+ * in RouterQuotaManager.
+ */
@Test
public void testQuotaUpdating() throws Exception {
long nsQuota = 30;
@@ -498,8 +504,7 @@ public class TestRouterQuota {
.spaceQuota(ssQuota).build());
addMountTable(mountTable);
- // Call periodicInvoke to ensure quota updated in quota manager
- // and state store.
+ // Call periodicInvoke to ensure quota updated in quota manager.
RouterQuotaUpdateService updateService = routerContext.getRouter()
.getQuotaCacheUpdateService();
updateService.periodicInvoke();
@@ -523,7 +528,7 @@ public class TestRouterQuota {
updatedMountTable = getMountTable(path);
quota = updatedMountTable.getQuota();
- // verify if quota has been updated in state store
+ // verify if quota usage has been updated in quota manager.
assertEquals(nsQuota, quota.getQuota());
assertEquals(ssQuota, quota.getSpaceQuota());
assertEquals(3, quota.getFileAndDirectoryCount());
@@ -697,10 +702,10 @@ public class TestRouterQuota {
}
/**
- * Verify whether mount table and quota usage cache is updated properly.
+ * Verify whether quota usage cache in RouterQuotaManager is updated properly.
* {@link RouterQuotaUpdateService#periodicInvoke()} should be able to update
- * the cache and the mount table even if the destination directory for some
- * mount entry is not present in the filesystem.
+ * the cache even if the destination directory for some mount entry is not
+ * present in the filesystem.
*/
@Test
public void testQuotaRefreshWhenDestinationNotPresent() throws Exception {
@@ -1007,6 +1012,104 @@ public class TestRouterQuota {
}
/**
+ * RouterQuotaUpdateService.periodicInvoke() should only update usage in
+ * cache. The mount table in state store shouldn't be updated.
+ */
+ @Test
+ public void testRouterQuotaUpdateService() throws Exception {
+ Router router = routerContext.getRouter();
+ StateStoreDriver driver = router.getStateStore().getDriver();
+ RouterQuotaUpdateService updateService =
+ router.getQuotaCacheUpdateService();
+ RouterQuotaManager quotaManager = router.getQuotaManager();
+ long nsQuota = 5;
+ long ssQuota = 3 * BLOCK_SIZE;
+ final FileSystem nnFs = nnContext1.getFileSystem();
+ nnFs.mkdirs(new Path("/dir-1"));
+
+ // init mount table.
+ MountTable mountTable = MountTable.newInstance("/dir-1",
+ Collections.singletonMap("ns0", "/dir-1"));
+ mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
+ .spaceQuota(ssQuota).build());
+ addMountTable(mountTable);
+ // verify the mount table in state store is updated.
+ QueryResult<MountTable> result = driver.get(MountTable.class);
+ RouterQuotaUsage quotaOnStorage = result.getRecords().get(0).getQuota();
+ assertEquals(nsQuota, quotaOnStorage.getQuota());
+ assertEquals(ssQuota, quotaOnStorage.getSpaceQuota());
+ assertEquals(0, quotaOnStorage.getFileAndDirectoryCount());
+ assertEquals(0, quotaOnStorage.getSpaceConsumed());
+
+ // test RouterQuotaUpdateService.periodicInvoke().
+ updateService.periodicInvoke();
+
+ // RouterQuotaUpdateService should update usage in local cache.
+ RouterQuotaUsage quotaUsage = quotaManager.getQuotaUsage("/dir-1");
+ assertEquals(nsQuota, quotaUsage.getQuota());
+ assertEquals(ssQuota, quotaUsage.getSpaceQuota());
+ assertEquals(1, quotaUsage.getFileAndDirectoryCount());
+ assertEquals(0, quotaUsage.getSpaceConsumed());
+ // RouterQuotaUpdateService shouldn't update mount entry in state store.
+ result = driver.get(MountTable.class);
+ quotaOnStorage = result.getRecords().get(0).getQuota();
+ assertEquals(nsQuota, quotaOnStorage.getQuota());
+ assertEquals(ssQuota, quotaOnStorage.getSpaceQuota());
+ assertEquals(0, quotaOnStorage.getFileAndDirectoryCount());
+ assertEquals(0, quotaOnStorage.getSpaceConsumed());
+ }
+
+ /**
+ * Verify whether quota is updated properly.
+ * {@link RouterQuotaUpdateService#periodicInvoke()} should be able to update
+ * the quota even if the destination directory for some mount entry is not
+ * present in the filesystem.
+ */
+ @Test
+ public void testQuotaUpdateWhenDestinationNotPresent() throws Exception {
+ long nsQuota = 5;
+ long ssQuota = 3 * BLOCK_SIZE;
+ String path = "/dst-not-present";
+ final FileSystem nnFs = nnContext1.getFileSystem();
+
+ // Add one mount table:
+ // /dst-not-present --> ns0---/dst-not-present.
+ nnFs.mkdirs(new Path(path));
+ MountTable mountTable =
+ MountTable.newInstance(path, Collections.singletonMap("ns0", path));
+ mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
+ .spaceQuota(ssQuota).build());
+ addMountTable(mountTable);
+
+ Router router = routerContext.getRouter();
+ RouterQuotaManager quotaManager = router.getQuotaManager();
+ RouterQuotaUpdateService updateService =
+ router.getQuotaCacheUpdateService();
+ // verify quota usage is updated in RouterQuotaManager.
+ updateService.periodicInvoke();
+ RouterQuotaUsage quotaUsage = quotaManager.getQuotaUsage(path);
+ assertEquals(nsQuota, quotaUsage.getQuota());
+ assertEquals(ssQuota, quotaUsage.getSpaceQuota());
+ assertEquals(1, quotaUsage.getFileAndDirectoryCount());
+ assertEquals(0, quotaUsage.getSpaceConsumed());
+
+ // Update quota to [nsQuota*2, ssQuota*2].
+ mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota * 2)
+ .spaceQuota(ssQuota * 2).build());
+ updateMountTable(mountTable);
+ // Remove /dst-not-present.
+ nnFs.delete(new Path(path), true);
+
+ // verify quota is updated in RouterQuotaManager.
+ updateService.periodicInvoke();
+ quotaUsage = quotaManager.getQuotaUsage(path);
+ assertEquals(nsQuota * 2, quotaUsage.getQuota());
+ assertEquals(ssQuota * 2, quotaUsage.getSpaceQuota());
+ assertEquals(0, quotaUsage.getFileAndDirectoryCount());
+ assertEquals(0, quotaUsage.getSpaceConsumed());
+ }
+
+ /**
* Add three mount tables.
* /dir-1 --> ns0---/dir-1 [nsQuota, ssQuota]
* /dir-1/dir-2 --> ns0---/dir-2 [QUOTA_UNSET, ssQuota * 2]
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org