You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2022/11/18 17:30:21 UTC

[hadoop] branch branch-3.3.5 updated: HDFS-16844: RBF: Adds resilancy when StateStore gets exceptions. (#5138)

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch branch-3.3.5
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3.5 by this push:
     new 6aacb3557f2 HDFS-16844: RBF: Adds resilancy when StateStore gets exceptions. (#5138)
6aacb3557f2 is described below

commit 6aacb3557f2ae1d3f34447e60a5e19bbd95d7abb
Author: Owen O'Malley <oo...@linkedin.com>
AuthorDate: Fri Nov 18 17:24:08 2022 +0000

    HDFS-16844: RBF: Adds resilancy when StateStore gets exceptions. (#5138)
    
    Allows the StateStore to stay up when there are errors reading the data.
---
 .../resolver/MembershipNamenodeResolver.java       |   8 +-
 .../federation/resolver/MountTableResolver.java    |   4 +-
 .../server/federation/store/CachedRecordStore.java |   3 +-
 .../federation/store/impl/MembershipStoreImpl.java |   4 +-
 .../store/records/MockStateStoreDriver.java        | 139 +++++++++++++++++++++
 .../federation/store/records/TestRouterState.java  |  53 +++++++-
 6 files changed, 203 insertions(+), 8 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java
index 66290039782..9d2dd1651a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java
@@ -123,9 +123,13 @@ public class MembershipNamenodeResolver
     // Our cache depends on the store, update it first
     try {
       MembershipStore membership = getMembershipStore();
-      membership.loadCache(force);
+      if (!membership.loadCache(force)) {
+        return false;
+      }
       DisabledNameserviceStore disabled = getDisabledNameserviceStore();
-      disabled.loadCache(force);
+      if (!disabled.loadCache(force)) {
+        return false;
+      }
     } catch (IOException e) {
       LOG.error("Cannot update membership from the State Store", e);
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java
index 797006ab1de..9d4804fb928 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java
@@ -346,7 +346,9 @@ public class MountTableResolver
     try {
       // Our cache depends on the store, update it first
       MountTableStore mountTable = this.getMountTableStore();
-      mountTable.loadCache(force);
+      if (!mountTable.loadCache(force)) {
+        return false;
+      }
 
       GetMountTableEntriesRequest request =
           GetMountTableEntriesRequest.newInstance("/");
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
index 7b28c03a529..613d8a78038 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
@@ -100,7 +100,7 @@ public abstract class CachedRecordStore<R extends BaseRecord>
    * @throws StateStoreUnavailableException If the cache is not initialized.
    */
   private void checkCacheAvailable() throws StateStoreUnavailableException {
-    if (!this.initialized) {
+    if (!getDriver().isDriverReady() || !this.initialized) {
       throw new StateStoreUnavailableException(
           "Cached State Store not initialized, " +
           getRecordClass().getSimpleName() + " records not valid");
@@ -125,7 +125,6 @@ public abstract class CachedRecordStore<R extends BaseRecord>
       } catch (IOException e) {
         LOG.error("Cannot get \"{}\" records from the State Store",
             getRecordClass().getSimpleName());
-        this.initialized = false;
         return false;
       }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java
index a63a0f3b3ab..5d22b77afe2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java
@@ -185,7 +185,9 @@ public class MembershipStoreImpl
 
   @Override
   public boolean loadCache(boolean force) throws IOException {
-    super.loadCache(force);
+    if (!super.loadCache(force)) {
+      return false;
+    }
 
     // Update local cache atomically
     cacheWriteLock.lock();
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java
new file mode 100644
index 00000000000..57185a0a600
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.records;
+
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
+import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreBaseImpl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A mock StateStoreDriver that runs in memory that can force IOExceptions
+ * upon demand.
+ */
+public class MockStateStoreDriver extends StateStoreBaseImpl {
+  private boolean giveErrors = false;
+  private boolean initialized = false;
+  private final Map<String, Map<String, BaseRecord>> valueMap = new HashMap<>();
+
+  @Override
+  public boolean initDriver() {
+    initialized = true;
+    return true;
+  }
+
+  @Override
+  public <T extends BaseRecord> boolean initRecordStorage(String className,
+                                                          Class<T> clazz) {
+    return true;
+  }
+
+  @Override
+  public boolean isDriverReady() {
+    return initialized;
+  }
+
+  @Override
+  public void close() throws Exception {
+    valueMap.clear();
+    initialized = false;
+  }
+
+  /**
+   * Should this object throw an IOException on each following call?
+   * @param value should we throw errors?
+   */
+  public void setGiveErrors(boolean value) {
+    giveErrors = value;
+  }
+
+  /**
+   * Check to see if this StateStore should throw IOException on each call.
+   * @throws IOException thrown if giveErrors has been set
+   */
+  private void checkErrors() throws IOException {
+    if (giveErrors) {
+      throw new IOException("Induced errors");
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) throws IOException {
+    checkErrors();
+    Map<String, BaseRecord> map = valueMap.get(StateStoreUtils.getRecordName(clazz));
+    List<T> results =
+        map != null ? new ArrayList<>((Collection<T>) map.values()) : new ArrayList<>();
+    return new QueryResult<>(results, System.currentTimeMillis());
+  }
+
+  @Override
+  public <T extends BaseRecord> boolean putAll(List<T> records,
+                                               boolean allowUpdate,
+                                               boolean errorIfExists)
+      throws IOException {
+    checkErrors();
+    for (T record : records) {
+      Map<String, BaseRecord> map =
+          valueMap.computeIfAbsent(StateStoreUtils.getRecordName(record.getClass()),
+              k -> new HashMap<>());
+      String key = record.getPrimaryKey();
+      BaseRecord oldRecord = map.get(key);
+      if (oldRecord == null || allowUpdate) {
+        map.put(key, record);
+      } else if (errorIfExists) {
+        throw new IOException("Record already exists for " + record.getClass()
+            + ": " + key);
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public <T extends BaseRecord> boolean removeAll(Class<T> clazz) throws IOException {
+    checkErrors();
+    return valueMap.remove(StateStoreUtils.getRecordName(clazz)) != null;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <T extends BaseRecord> int remove(Class<T> clazz,
+                                           Query<T> query)
+      throws IOException {
+    checkErrors();
+    int result = 0;
+    Map<String, BaseRecord> map =
+        valueMap.get(StateStoreUtils.getRecordName(clazz));
+    if (map != null) {
+      for (Iterator<BaseRecord> itr = map.values().iterator(); itr.hasNext();) {
+        BaseRecord record = itr.next();
+        if (query.matches((T) record)) {
+          itr.remove();
+          result += 1;
+        }
+      }
+    }
+    return result;
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java
index dfe2bc98bf4..4289999429b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,8 +20,16 @@ package org.apache.hadoop.hdfs.server.federation.store.records;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
+import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
 import org.junit.Test;
 
@@ -40,7 +48,7 @@ public class TestRouterState {
   private static final RouterServiceState STATE = RouterServiceState.RUNNING;
 
 
-  private RouterState generateRecord() throws IOException {
+  private RouterState generateRecord() {
     RouterState record = RouterState.newInstance(ADDRESS, START_TIME, STATE);
     record.setVersion(VERSION);
     record.setCompileInfo(COMPILE_INFO);
@@ -82,4 +90,45 @@ public class TestRouterState {
 
     validateRecord(newRecord);
   }
+
+  @Test
+  public void testStateStoreResilience() throws Exception {
+    StateStoreService service = new StateStoreService();
+    Configuration conf = new Configuration();
+    conf.setClass(RBFConfigKeys.FEDERATION_STORE_DRIVER_CLASS,
+        MockStateStoreDriver.class,
+        StateStoreDriver.class);
+    conf.setBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, false);
+    service.init(conf);
+    MockStateStoreDriver driver = (MockStateStoreDriver) service.getDriver();
+    // Add two records for block1
+    driver.put(MembershipState.newInstance("routerId", "ns1",
+        "ns1-ha1", "cluster1", "block1", "rpc1",
+        "service1", "lifeline1", "https", "nn01",
+        FederationNamenodeServiceState.ACTIVE, false), false, false);
+    driver.put(MembershipState.newInstance("routerId", "ns1",
+        "ns1-ha2", "cluster1", "block1", "rpc2",
+        "service2", "lifeline2", "https", "nn02",
+        FederationNamenodeServiceState.STANDBY, false), false, false);
+    // load the cache
+    service.loadDriver();
+    MembershipNamenodeResolver resolver = new MembershipNamenodeResolver(conf, service);
+    service.refreshCaches(true);
+
+    // look up block1
+    List<? extends FederationNamenodeContext> result =
+        resolver.getNamenodesForBlockPoolId("block1");
+    assertEquals(2, result.size());
+
+    // cause io errors and then reload the cache
+    driver.setGiveErrors(true);
+    long previousUpdate = service.getCacheUpdateTime();
+    service.refreshCaches(true);
+    assertEquals(previousUpdate, service.getCacheUpdateTime());
+
+    // make sure the old cache is still there
+    result = resolver.getNamenodesForBlockPoolId("block1");
+    assertEquals(2, result.size());
+    service.stop();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org