You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yq...@apache.org on 2019/12/04 10:19:32 UTC

[hadoop] branch trunk updated: HDFS-13811. RBF: Race condition between router admin quota update and periodic quota update service. Contributed by Jinglun.

This is an automated email from the ASF dual-hosted git repository.

yqlin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 47fdae7  HDFS-13811. RBF: Race condition between router admin quota update and periodic quota update service. Contributed by Jinglun.
47fdae7 is described below

commit 47fdae79041ba2bb036ef7723a93ade5b1ac3619
Author: Yiqun Lin <yq...@apache.org>
AuthorDate: Wed Dec 4 18:19:11 2019 +0800

    HDFS-13811. RBF: Race condition between router admin quota update and periodic quota update service. Contributed by Jinglun.
---
 .../hdfs/server/federation/router/Router.java      |   8 ++
 .../federation/router/RouterQuotaManager.java      |  21 ++++
 .../router/RouterQuotaUpdateService.java           |  40 +------
 .../server/federation/store/MountTableStore.java   |  12 ++-
 .../federation/store/impl/MountTableStoreImpl.java |  15 +++
 .../server/federation/router/TestRouterQuota.java  | 115 +++++++++++++++++++--
 6 files changed, 166 insertions(+), 45 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
index a03d8d4..64fdabe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.server.federation.metrics.RBFMetrics;
 import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
 import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
 import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -292,6 +293,13 @@ public class Router extends CompositeService implements
     }
 
     super.serviceInit(conf);
+
+    // Set quota manager in mount store to update quota usage in mount table.
+    if (stateStore != null) {
+      MountTableStore mountstore =
+          this.stateStore.getRegisteredRecordStore(MountTableStore.class);
+      mountstore.setQuotaManager(this.quotaManager);
+    }
   }
 
   private String getDisabledDependentServices() {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java
index c1a5146..ceb758e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java
@@ -170,6 +170,27 @@ public class RouterQuotaManager {
   }
 
   /**
+   * Update quota in cache. The usage will be preserved.
+   * @param path Mount table path.
+   * @param quota Corresponding quota value.
+   */
+  public void updateQuota(String path, RouterQuotaUsage quota) {
+    writeLock.lock();
+    try {
+      RouterQuotaUsage.Builder builder = new RouterQuotaUsage.Builder()
+          .quota(quota.getQuota()).spaceQuota(quota.getSpaceQuota());
+      RouterQuotaUsage current = this.cache.get(path);
+      if (current != null) {
+        builder.fileAndDirectoryCount(current.getFileAndDirectoryCount())
+            .spaceConsumed(current.getSpaceConsumed());
+      }
+      this.cache.put(path, builder.build());
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
    * Remove the entity from cache.
    * @param path Mount table path.
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
index e5d4472..f1a86bf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
@@ -35,15 +35,13 @@ import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
 import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
 import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
 import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
-import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
 import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Service to periodically update the {@link RouterQuotaUsage}
- * cached information in the {@link Router} and update corresponding
- * mount table in State Store.
+ * cached information in the {@link Router}.
  */
 public class RouterQuotaUpdateService extends PeriodicService {
   private static final Logger LOG =
@@ -81,7 +79,6 @@ public class RouterQuotaUpdateService extends PeriodicService {
   protected void periodicInvoke() {
     LOG.debug("Start to update quota cache.");
     try {
-      List<MountTable> updateMountTables = new LinkedList<>();
       List<MountTable> mountTables = getQuotaSetMountTables();
       Map<RemoteLocation, QuotaUsage> remoteQuotaUsage = new HashMap<>();
       for (MountTable entry : mountTables) {
@@ -122,18 +119,6 @@ public class RouterQuotaUpdateService extends PeriodicService {
             currentQuotaUsage);
         this.quotaManager.put(src, newQuota);
         entry.setQuota(newQuota);
-
-        // only update mount tables which quota was changed
-        if (!oldQuota.equals(newQuota)) {
-          updateMountTables.add(entry);
-
-          LOG.debug(
-              "Update quota usage entity of path: {}, nsCount: {},"
-                  + " nsQuota: {}, ssCount: {}, ssQuota: {}.",
-              src, newQuota.getFileAndDirectoryCount(),
-              newQuota.getQuota(), newQuota.getSpaceConsumed(),
-              newQuota.getSpaceQuota());
-        }
       }
 
       // Fix inconsistent quota.
@@ -143,8 +128,6 @@ public class RouterQuotaUpdateService extends PeriodicService {
         QuotaUsage currentQuota = en.getValue();
         fixGlobalQuota(remoteLocation, currentQuota);
       }
-
-      updateMountTableEntries(updateMountTables);
     } catch (IOException e) {
       LOG.error("Quota cache updated error.", e);
     }
@@ -219,7 +202,7 @@ public class RouterQuotaUpdateService extends PeriodicService {
 
       // update mount table entries info in quota cache
       String src = entry.getSourcePath();
-      this.quotaManager.put(src, entry.getQuota());
+      this.quotaManager.updateQuota(src, entry.getQuota());
       stalePaths.remove(src);
     }
 
@@ -258,23 +241,4 @@ public class RouterQuotaUpdateService extends PeriodicService {
         .spaceQuota(oldQuota.getSpaceQuota()).build();
     return newQuota;
   }
-
-  /**
-   * Write out updated mount table entries into State Store.
-   * @param updateMountTables Mount tables to be updated.
-   * @throws IOException
-   */
-  private void updateMountTableEntries(List<MountTable> updateMountTables)
-      throws IOException {
-    for (MountTable entry : updateMountTables) {
-      UpdateMountTableEntryRequest updateRequest = UpdateMountTableEntryRequest
-          .newInstance(entry);
-      try {
-        getMountTableStore().updateMountTableEntry(updateRequest);
-      } catch (IOException e) {
-        LOG.error("Quota update error for mount entry "
-            + entry.getSourcePath(), e);
-      }
-    }
-  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java
index 9d4b64b..4b668c41 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
 import org.apache.hadoop.hdfs.server.federation.router.MountTableRefresherService;
+import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaManager;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
 import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
 import org.slf4j.Logger;
@@ -48,7 +49,8 @@ public abstract class MountTableStore extends CachedRecordStore<MountTable>
   private static final Logger LOG =
       LoggerFactory.getLogger(MountTableStore.class);
   private MountTableRefresherService refreshService;
-
+  /** Router quota manager to update quota usage in mount table. */
+  private RouterQuotaManager quotaManager;
   public MountTableStore(StateStoreDriver driver) {
     super(MountTable.class, driver);
   }
@@ -57,6 +59,14 @@ public abstract class MountTableStore extends CachedRecordStore<MountTable>
     this.refreshService = refreshService;
   }
 
+  public void setQuotaManager(RouterQuotaManager quotaManager) {
+    this.quotaManager = quotaManager;
+  }
+
+  public RouterQuotaManager getQuotaManager() {
+    return quotaManager;
+  }
+
   /**
    * Update mount table cache of this router as well as all other routers.
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
index fb9f6a3..ea54f8b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.server.federation.router.RouterAdminServer;
 import org.apache.hadoop.hdfs.server.federation.router.RouterPermissionChecker;
+import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
 import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
 import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
@@ -153,6 +154,20 @@ public class MountTableStoreImpl extends MountTableStore {
             it.remove();
           }
         }
+        // If quota manager is not null, update quota usage from quota cache.
+        if (this.getQuotaManager() != null) {
+          RouterQuotaUsage quota =
+              this.getQuotaManager().getQuotaUsage(record.getSourcePath());
+          if (quota != null) {
+            RouterQuotaUsage oldquota = record.getQuota();
+            RouterQuotaUsage newQuota = new RouterQuotaUsage.Builder()
+                .fileAndDirectoryCount(quota.getFileAndDirectoryCount())
+                .quota(oldquota.getQuota())
+                .spaceConsumed(quota.getSpaceConsumed())
+                .spaceQuota(oldquota.getSpaceQuota()).build();
+            record.setQuota(newQuota);
+          }
+        }
       }
     }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java
index 19f7ad6..e01cea4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
 import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
 import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
 import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
 import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
 import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
@@ -64,6 +65,7 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableE
 import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
 import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.util.Time;
@@ -482,6 +484,10 @@ public class TestRouterQuota {
     return removeResponse.getStatus();
   }
 
+  /**
+   * Test {@link RouterQuotaUpdateService#periodicInvoke()} updates quota usage
+   * in RouterQuotaManager.
+   */
   @Test
   public void testQuotaUpdating() throws Exception {
     long nsQuota = 30;
@@ -498,8 +504,7 @@ public class TestRouterQuota {
         .spaceQuota(ssQuota).build());
     addMountTable(mountTable);
 
-    // Call periodicInvoke to ensure quota  updated in quota manager
-    // and state store.
+    // Call periodicInvoke to ensure quota  updated in quota manager.
     RouterQuotaUpdateService updateService = routerContext.getRouter()
         .getQuotaCacheUpdateService();
     updateService.periodicInvoke();
@@ -523,7 +528,7 @@ public class TestRouterQuota {
     updatedMountTable = getMountTable(path);
     quota = updatedMountTable.getQuota();
 
-    // verify if quota has been updated in state store
+    // verify if quota usage has been updated in quota manager.
     assertEquals(nsQuota, quota.getQuota());
     assertEquals(ssQuota, quota.getSpaceQuota());
     assertEquals(3, quota.getFileAndDirectoryCount());
@@ -697,10 +702,10 @@ public class TestRouterQuota {
   }
 
   /**
-   * Verify whether mount table and quota usage cache is updated properly.
+   * Verify whether quota usage cache in RouterQuotaManager is updated properly.
    * {@link RouterQuotaUpdateService#periodicInvoke()} should be able to update
-   * the cache and the mount table even if the destination directory for some
-   * mount entry is not present in the filesystem.
+   * the cache even if the destination directory for some mount entry is not
+   * present in the filesystem.
    */
   @Test
   public void testQuotaRefreshWhenDestinationNotPresent() throws Exception {
@@ -1007,6 +1012,104 @@ public class TestRouterQuota {
   }
 
   /**
+   * RouterQuotaUpdateService.periodicInvoke() should only update usage in
+   * cache. The mount table in state store shouldn't be updated.
+   */
+  @Test
+  public void testRouterQuotaUpdateService() throws Exception {
+    Router router = routerContext.getRouter();
+    StateStoreDriver driver = router.getStateStore().getDriver();
+    RouterQuotaUpdateService updateService =
+        router.getQuotaCacheUpdateService();
+    RouterQuotaManager quotaManager = router.getQuotaManager();
+    long nsQuota = 5;
+    long ssQuota = 3 * BLOCK_SIZE;
+    final FileSystem nnFs = nnContext1.getFileSystem();
+    nnFs.mkdirs(new Path("/dir-1"));
+
+    // init mount table.
+    MountTable mountTable = MountTable.newInstance("/dir-1",
+        Collections.singletonMap("ns0", "/dir-1"));
+    mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
+        .spaceQuota(ssQuota).build());
+    addMountTable(mountTable);
+    // verify the mount table in state store is updated.
+    QueryResult<MountTable> result = driver.get(MountTable.class);
+    RouterQuotaUsage quotaOnStorage = result.getRecords().get(0).getQuota();
+    assertEquals(nsQuota, quotaOnStorage.getQuota());
+    assertEquals(ssQuota, quotaOnStorage.getSpaceQuota());
+    assertEquals(0, quotaOnStorage.getFileAndDirectoryCount());
+    assertEquals(0, quotaOnStorage.getSpaceConsumed());
+
+    // test RouterQuotaUpdateService.periodicInvoke().
+    updateService.periodicInvoke();
+
+    // RouterQuotaUpdateService should update usage in local cache.
+    RouterQuotaUsage quotaUsage = quotaManager.getQuotaUsage("/dir-1");
+    assertEquals(nsQuota, quotaUsage.getQuota());
+    assertEquals(ssQuota, quotaUsage.getSpaceQuota());
+    assertEquals(1, quotaUsage.getFileAndDirectoryCount());
+    assertEquals(0, quotaUsage.getSpaceConsumed());
+    // RouterQuotaUpdateService shouldn't update mount entry in state store.
+    result = driver.get(MountTable.class);
+    quotaOnStorage = result.getRecords().get(0).getQuota();
+    assertEquals(nsQuota, quotaOnStorage.getQuota());
+    assertEquals(ssQuota, quotaOnStorage.getSpaceQuota());
+    assertEquals(0, quotaOnStorage.getFileAndDirectoryCount());
+    assertEquals(0, quotaOnStorage.getSpaceConsumed());
+  }
+
+  /**
+   * Verify whether quota is updated properly.
+   * {@link RouterQuotaUpdateService#periodicInvoke()} should be able to update
+   * the quota even if the destination directory for some mount entry is not
+   * present in the filesystem.
+   */
+  @Test
+  public void testQuotaUpdateWhenDestinationNotPresent() throws Exception {
+    long nsQuota = 5;
+    long ssQuota = 3 * BLOCK_SIZE;
+    String path = "/dst-not-present";
+    final FileSystem nnFs = nnContext1.getFileSystem();
+
+    // Add one mount table:
+    // /dst-not-present --> ns0---/dst-not-present.
+    nnFs.mkdirs(new Path(path));
+    MountTable mountTable =
+        MountTable.newInstance(path, Collections.singletonMap("ns0", path));
+    mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
+        .spaceQuota(ssQuota).build());
+    addMountTable(mountTable);
+
+    Router router = routerContext.getRouter();
+    RouterQuotaManager quotaManager = router.getQuotaManager();
+    RouterQuotaUpdateService updateService =
+        router.getQuotaCacheUpdateService();
+    // verify quota usage is updated in RouterQuotaManager.
+    updateService.periodicInvoke();
+    RouterQuotaUsage quotaUsage = quotaManager.getQuotaUsage(path);
+    assertEquals(nsQuota, quotaUsage.getQuota());
+    assertEquals(ssQuota, quotaUsage.getSpaceQuota());
+    assertEquals(1, quotaUsage.getFileAndDirectoryCount());
+    assertEquals(0, quotaUsage.getSpaceConsumed());
+
+    // Update quota to [nsQuota*2, ssQuota*2].
+    mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota * 2)
+        .spaceQuota(ssQuota * 2).build());
+    updateMountTable(mountTable);
+    // Remove /dst-not-present.
+    nnFs.delete(new Path(path), true);
+
+    // verify quota is updated in RouterQuotaManager.
+    updateService.periodicInvoke();
+    quotaUsage = quotaManager.getQuotaUsage(path);
+    assertEquals(nsQuota * 2, quotaUsage.getQuota());
+    assertEquals(ssQuota * 2, quotaUsage.getSpaceQuota());
+    assertEquals(0, quotaUsage.getFileAndDirectoryCount());
+    assertEquals(0, quotaUsage.getSpaceConsumed());
+  }
+
+  /**
    * Add three mount tables.
    * /dir-1              --> ns0---/dir-1 [nsQuota, ssQuota]
    * /dir-1/dir-2        --> ns0---/dir-2 [QUOTA_UNSET, ssQuota * 2]


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org