You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by na...@apache.org on 2022/05/05 10:58:14 UTC

[ozone] branch master updated: HDDS-6551. Introduce StatefulService in scm (#3307)

This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new f315500c73 HDDS-6551. Introduce StatefulService in scm (#3307)
f315500c73 is described below

commit f315500c73b86ee571118fdb535d00ed07a9b3c1
Author: Siddhant Sangwan <si...@gmail.com>
AuthorDate: Thu May 5 16:28:09 2022 +0530

    HDDS-6551. Introduce StatefulService in scm (#3307)
---
 .../hadoop/hdds/scm/metadata/SCMMetadataStore.java |   3 +
 .../src/main/proto/SCMRatisProtocol.proto          |   1 +
 .../hadoop/hdds/scm/ha/SCMHAManagerImpl.java       |   2 +
 .../hadoop/hdds/scm/ha/SCMHAManagerStub.java       |  17 ++-
 .../apache/hadoop/hdds/scm/ha/StatefulService.java |  80 +++++++++++++
 .../hdds/scm/ha/StatefulServiceStateManager.java   |  61 ++++++++++
 .../scm/ha/StatefulServiceStateManagerImpl.java    | 124 +++++++++++++++++++++
 .../hadoop/hdds/scm/metadata/ByteStringCodec.java  |  69 ++++++++++++
 .../hadoop/hdds/scm/metadata/SCMDBDefinition.java  |  13 ++-
 .../hdds/scm/metadata/SCMMetadataStoreImpl.java    |  13 +++
 .../hdds/scm/server/StorageContainerManager.java   |  14 +++
 .../ha/TestStatefulServiceStateManagerImpl.java    |  89 +++++++++++++++
 12 files changed, 479 insertions(+), 7 deletions(-)

diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
index a63d90d0f4..9eb92f1da1 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.math.BigInteger;
 import java.security.cert.X509Certificate;
 
+import com.google.protobuf.ByteString;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -153,4 +154,6 @@ public interface SCMMetadataStore extends DBStoreHAManager {
    * Table that maintains move information.
    */
   Table<ContainerID, MoveDataNodePair> getMoveTable();
+
+  Table<String, ByteString> getStatefulServiceConfigTable();
 }
diff --git a/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto
index 8e4237ee59..af6232041e 100644
--- a/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto
@@ -27,6 +27,7 @@ enum RequestType {
     SEQUENCE_ID = 4;
     CERT_STORE = 5;
     MOVE = 6;
+    STATEFUL_SERVICE_CONFIG = 7;
 }
 
 message Method {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index 0fcc1ec93f..16b32735aa 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -375,6 +375,8 @@ public class SCMHAManagerImpl implements SCMHAManager {
         metadataStore.getDeletedBlocksTXTable());
     scm.getReplicationManager().getMoveScheduler()
         .reinitialize(metadataStore.getMoveTable());
+    scm.getStatefulServiceStateManager().reinitialize(
+        metadataStore.getStatefulServiceConfigTable());
     if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
       if (scm.getRootCertificateServer() != null) {
         scm.getRootCertificateServer().reinitialize(metadataStore);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java
index 6dbbea32cf..28366e6a3e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java
@@ -24,11 +24,13 @@ import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.base.Preconditions;
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
 import org.apache.hadoop.hdds.scm.AddSCMRequest;
 import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
 import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.Message;
@@ -60,6 +62,11 @@ public final class SCMHAManagerStub implements SCMHAManager {
     return new SCMHAManagerStub(isLeader, buffer);
   }
 
+  public static SCMHAManager getInstance(boolean isLeader, DBStore dbStore) {
+    return new SCMHAManagerStub(isLeader,
+        new SCMHADBTransactionBufferStub(dbStore));
+  }
+
   /**
    * Creates SCMHAManagerStub instance.
    */
@@ -109,7 +116,9 @@ public final class SCMHAManagerStub implements SCMHAManager {
 
   @Override
   public SCMHADBTransactionBuffer asSCMHADBTransactionBuffer() {
-    return null;
+    Preconditions
+        .checkArgument(transactionBuffer instanceof SCMHADBTransactionBuffer);
+    return (SCMHADBTransactionBuffer) transactionBuffer;
   }
 
   @Override
@@ -191,13 +200,9 @@ public final class SCMHAManagerStub implements SCMHAManager {
               "No handler found for request type " + request.getType());
         }
 
-        final List<Class<?>> argumentTypes = new ArrayList<>();
-        for (Object args : request.getArguments()) {
-          argumentTypes.add(args.getClass());
-        }
         final Object result = handler.getClass()
             .getMethod(request.getOperation(),
-                argumentTypes.toArray(new Class<?>[0]))
+                request.getParameterTypes())
             .invoke(handler, request.getArguments());
 
         return SCMRatisResponse.encode(result);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java
new file mode 100644
index 0000000000..441e83ce13
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * A StatefulService is an SCMService that persists configuration to RocksDB.
+ * The service must define this configuration as a Protobuf message.
+ */
+public abstract class StatefulService implements SCMService {
+  private final StatefulServiceStateManager stateManager;
+
+  /**
+   * Initialize a StatefulService from an extending class.
+   * @param scm {@link StorageContainerManager}
+   */
+  protected StatefulService(StorageContainerManager scm) {
+    stateManager = scm.getStatefulServiceStateManager();
+  }
+
+  /**
+   * Persists the specified {@link GeneratedMessage} configurationMessage
+   * to RocksDB with this service's {@link SCMService#getServiceName()} as the
+   * key.
+   * @param configurationMessage configuration GeneratedMessage to persist
+   * @throws IOException on failure to persist configuration
+   */
+  protected final void saveConfiguration(GeneratedMessage configurationMessage)
+      throws IOException {
+    stateManager.saveConfiguration(getServiceName(),
+        configurationMessage.toByteString());
+  }
+
+  /**
+   * Reads persisted configuration mapped to this service's
+   * {@link SCMService#getServiceName()} name.
+   *
+   * @param configType the Class object of the protobuf message type
+   * @param <T>        the Type of the protobuf message
+   * @return persisted protobuf message
+   * @throws IOException on failure to fetch the message from DB or when
+   *                     parsing it. ensure the specified configType is correct
+   */
+  protected final <T extends GeneratedMessage> T readConfiguration(
+      Class<T> configType) throws IOException {
+    try {
+      return configType.cast(ReflectionUtil.getMethod(configType,
+              "parseFrom", ByteString.class)
+          .invoke(null, stateManager.readConfiguration(getServiceName())));
+    } catch (NoSuchMethodException | IllegalAccessException
+        | InvocationTargetException e) {
+      e.printStackTrace();
+      throw new InvalidProtocolBufferException("GeneratedMessage cannot " +
+          "be parsed for type " + configType + ": " + e.getMessage());
+    }
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManager.java
new file mode 100644
index 0000000000..a6a8a50ae5
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManager.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.hdds.scm.metadata.Replicate;
+import org.apache.hadoop.hdds.utils.db.Table;
+
+import java.io.IOException;
+
+/**
+ * This interface defines an API for saving and reading configurations of a
+ * {@link StatefulService}.
+ */
+public interface StatefulServiceStateManager {
+
+  /**
+   * Persists the specified configurations bytes to RocksDB and replicates
+   * this operation through RATIS. The specified serviceName maps to the
+   * persisted bytes.
+   * @param serviceName name of the {@link StatefulService}, obtained
+   *                    through {@link SCMService#getServiceName()}
+   * @param bytes configuration to persist as a ByteString
+   * @throws IOException on failure to persist configuration
+   */
+  @Replicate
+  void saveConfiguration(String serviceName, ByteString bytes)
+      throws IOException;
+
+  /**
+   * Reads the persisted configuration mapped to the specified serviceName.
+   * @param serviceName name of the {@link StatefulService}, obtained through
+   * {@link SCMService#getServiceName()}
+   * @return configuration as a ByteString
+   * @throws IOException on failure
+   */
+  ByteString readConfiguration(String serviceName) throws IOException;
+
+  /**
+   * Sets the updated reference to the table when reloading SCM state.
+   * @param statefulServiceConfig table from
+   * {@link org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore}
+   */
+  void reinitialize(Table<String, ByteString> statefulServiceConfig);
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java
new file mode 100644
index 0000000000..d470f1bf7f
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
+import org.apache.hadoop.hdds.utils.db.Table;
+
+import java.io.IOException;
+import java.lang.reflect.Proxy;
+
+/**
+ * This class implements methods to save and read configurations of a
+ * stateful service from DB.
+ */
+public final class StatefulServiceStateManagerImpl
+    implements StatefulServiceStateManager {
+
+  // this table maps the service name to the configuration (ByteString)
+  private Table<String, ByteString> statefulServiceConfig;
+  private final DBTransactionBuffer transactionBuffer;
+
+  private StatefulServiceStateManagerImpl(
+      Table<String, ByteString> statefulServiceConfig,
+      DBTransactionBuffer scmDBTransactionBuffer) {
+    this.statefulServiceConfig = statefulServiceConfig;
+    this.transactionBuffer = scmDBTransactionBuffer;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void saveConfiguration(String serviceName, ByteString bytes)
+      throws IOException {
+    transactionBuffer.addToBuffer(statefulServiceConfig, serviceName, bytes);
+    if (transactionBuffer instanceof SCMHADBTransactionBuffer) {
+      SCMHADBTransactionBuffer buffer =
+              (SCMHADBTransactionBuffer) transactionBuffer;
+      buffer.flush();
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ByteString readConfiguration(String serviceName) throws IOException {
+    return statefulServiceConfig.get(serviceName);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void reinitialize(Table<String, ByteString> configs) {
+    this.statefulServiceConfig = configs;
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Builder for StatefulServiceStateManager.
+   */
+  public static class Builder {
+    private Table<String, ByteString> statefulServiceConfig;
+    private DBTransactionBuffer transactionBuffer;
+    private SCMRatisServer scmRatisServer;
+
+    public Builder setStatefulServiceConfig(
+        final Table<String, ByteString> statefulServiceConfig) {
+      this.statefulServiceConfig = statefulServiceConfig;
+      return this;
+    }
+
+    public Builder setSCMDBTransactionBuffer(
+        final DBTransactionBuffer dbTransactionBuffer) {
+      this.transactionBuffer = dbTransactionBuffer;
+      return this;
+    }
+
+    public Builder setRatisServer(final SCMRatisServer ratisServer) {
+      scmRatisServer = ratisServer;
+      return this;
+    }
+
+    public StatefulServiceStateManager build() {
+      Preconditions.checkNotNull(statefulServiceConfig);
+      Preconditions.checkNotNull(transactionBuffer);
+
+      final StatefulServiceStateManager stateManager =
+          new StatefulServiceStateManagerImpl(statefulServiceConfig,
+              transactionBuffer);
+
+      final SCMHAInvocationHandler invocationHandler =
+          new SCMHAInvocationHandler(RequestType.STATEFUL_SERVICE_CONFIG,
+              stateManager, scmRatisServer);
+
+      return (StatefulServiceStateManager) Proxy.newProxyInstance(
+          SCMHAInvocationHandler.class.getClassLoader(),
+          new Class<?>[]{StatefulServiceStateManager.class}, invocationHandler);
+    }
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/ByteStringCodec.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/ByteStringCodec.java
new file mode 100644
index 0000000000..fb25a6b766
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/ByteStringCodec.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.metadata;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.hdds.utils.db.Codec;
+
+import java.io.IOException;
+
+/**
+ * Codec to serialize/deserialize a {@link ByteString}.
+ */
+public class ByteStringCodec implements Codec<ByteString> {
+
+  /**
+   * Convert object to raw persisted format.
+   *
+   * @param object The original java object. Should not be null.
+   */
+  @Override
+  public byte[] toPersistedFormat(ByteString object) throws IOException {
+    if (object == null) {
+      return new byte[0];
+    }
+    return object.toByteArray();
+  }
+
+  /**
+   * Convert object from raw persisted format.
+   *
+   * @param rawData Byte array from the key/value store. Should not be null.
+   */
+  @Override
+  public ByteString fromPersistedFormat(byte[] rawData) throws IOException {
+    if (rawData == null) {
+      return ByteString.EMPTY;
+    }
+    return ByteString.copyFrom(rawData);
+  }
+
+  /**
+   * Copy Object from the provided object, and returns a new object.
+   *
+   * @param object a ByteString
+   */
+  @Override
+  public ByteString copyObject(ByteString object) {
+    if (object == null) {
+      return ByteString.EMPTY;
+    }
+    return ByteString.copyFrom(object.toByteArray());
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
index b3e861b40c..c8fdaa3ecb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.metadata;
 import java.math.BigInteger;
 import java.security.cert.X509Certificate;
 
+import com.google.protobuf.ByteString;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -154,6 +155,15 @@ public class SCMDBDefinition implements DBDefinition {
           MoveDataNodePair.class,
           new MoveDataNodePairCodec());
 
+  public static final DBColumnFamilyDefinition<String, ByteString>
+      STATEFUL_SERVICE_CONFIG =
+      new DBColumnFamilyDefinition<>(
+          "statefulServiceConfig",
+          String.class,
+          new StringCodec(),
+          ByteString.class,
+          new ByteStringCodec());
+
   @Override
   public String getName() {
     return "scm.db";
@@ -168,6 +178,7 @@ public class SCMDBDefinition implements DBDefinition {
   public DBColumnFamilyDefinition[] getColumnFamilies() {
     return new DBColumnFamilyDefinition[] {DELETED_BLOCKS, VALID_CERTS,
         VALID_SCM_CERTS, REVOKED_CERTS, REVOKED_CERTS_V2, PIPELINES, CONTAINERS,
-        TRANSACTIONINFO, CRLS, CRL_SEQUENCE_ID, SEQUENCE_ID, MOVE};
+        TRANSACTIONINFO, CRLS, CRL_SEQUENCE_ID, SEQUENCE_ID, MOVE,
+        STATEFUL_SERVICE_CONFIG};
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
index de7fcb0b74..2bfb136f3f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.math.BigInteger;
 import java.security.cert.X509Certificate;
 
+import com.google.protobuf.ByteString;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -48,6 +49,7 @@ import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.MOVE;
 import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.PIPELINES;
 import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.REVOKED_CERTS;
 import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.REVOKED_CERTS_V2;
+import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.STATEFUL_SERVICE_CONFIG;
 import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.TRANSACTIONINFO;
 import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.VALID_CERTS;
 import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.VALID_SCM_CERTS;
@@ -88,6 +90,8 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore {
 
   private Table<ContainerID, MoveDataNodePair> moveTable;
 
+  private Table<String, ByteString> statefulServiceConfigTable;
+
   private static final Logger LOG =
       LoggerFactory.getLogger(SCMMetadataStoreImpl.class);
   private DBStore store;
@@ -174,6 +178,10 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore {
       moveTable = MOVE.getTable(store);
 
       checkTableStatus(moveTable, MOVE.getName());
+
+      statefulServiceConfigTable = STATEFUL_SERVICE_CONFIG.getTable(store);
+      checkTableStatus(statefulServiceConfigTable,
+          STATEFUL_SERVICE_CONFIG.getName());
     }
   }
 
@@ -279,6 +287,11 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore {
     return moveTable;
   }
 
+  @Override
+  public Table<String, ByteString> getStatefulServiceConfigTable() {
+    return statefulServiceConfigTable;
+  }
+
   private void checkTableStatus(Table table, String name) throws IOException {
     String logMessage = "Unable to get a reference to %s table. Cannot " +
         "continue.";
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 13b3f7239b..7c58def298 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -56,6 +56,8 @@ import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
 import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.node.CommandQueueReportHandler;
+import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManager;
+import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManagerImpl;
 import org.apache.hadoop.hdds.scm.server.upgrade.ScmHAUnfinalizedStateValidationAction;
 import org.apache.hadoop.hdds.scm.pipeline.WritableContainerFactory;
 import org.apache.hadoop.hdds.security.token.ContainerTokenGenerator;
@@ -275,6 +277,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
   private final SCMHANodeDetails scmHANodeDetails;
 
   private ContainerBalancer containerBalancer;
+  private StatefulServiceStateManager statefulServiceStateManager;
 
   /**
    * Creates a new StorageContainerManager. Configuration will be
@@ -659,6 +662,13 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     }
     scmDecommissionManager = new NodeDecommissionManager(conf, scmNodeManager,
         containerManager, scmContext, eventQueue, replicationManager);
+
+    statefulServiceStateManager = StatefulServiceStateManagerImpl.newBuilder()
+        .setStatefulServiceConfig(
+            scmMetadataStore.getStatefulServiceConfigTable())
+        .setSCMDBTransactionBuffer(scmHAManager.getDBTransactionBuffer())
+        .setRatisServer(scmHAManager.getRatisServer())
+        .build();
   }
 
   /**
@@ -1825,6 +1835,10 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     return this.clusterMap;
   }
 
+  public StatefulServiceStateManager getStatefulServiceStateManager() {
+    return statefulServiceStateManager;
+  }
+
   /**
    * Get the safe mode status of all rules.
    *
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestStatefulServiceStateManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestStatefulServiceStateManagerImpl.java
new file mode 100644
index 0000000000..65e0528a98
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestStatefulServiceStateManagerImpl.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Tests StatefulServiceStateManagerImpl.
+ */
+public class TestStatefulServiceStateManagerImpl {
+  private OzoneConfiguration conf;
+  private DBStore dbStore;
+  private SCMHAManager scmhaManager;
+  private Table<String, ByteString> statefulServiceConfig;
+  private StatefulServiceStateManager stateManager;
+
+  @Before
+  public void setup() throws AuthenticationException, IOException {
+    conf = new OzoneConfiguration();
+    conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true);
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
+        GenericTestUtils.getRandomizedTempPath());
+    dbStore = DBStoreBuilder.createDBStore(conf, new SCMDBDefinition());
+    statefulServiceConfig =
+        SCMDBDefinition.STATEFUL_SERVICE_CONFIG.getTable(dbStore);
+    scmhaManager = SCMHAManagerStub.getInstance(true, dbStore);
+    stateManager =
+        StatefulServiceStateManagerImpl.newBuilder()
+            .setStatefulServiceConfig(statefulServiceConfig)
+            .setRatisServer(scmhaManager.getRatisServer())
+            .setSCMDBTransactionBuffer(
+                scmhaManager.asSCMHADBTransactionBuffer())
+            .build();
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    if (dbStore != null) {
+      dbStore.close();
+    }
+  }
+
+  /**
+   * Tests {@link
+   * StatefulServiceStateManagerImpl#saveConfiguration(String, ByteString)}.
+   * @throws IOException on failure
+   */
+  @Test
+  public void testSaveConfiguration() throws IOException {
+    String serviceName = "test";
+    String message = "message_string";
+    stateManager.saveConfiguration(serviceName,
+        ByteString.copyFromUtf8(message));
+    scmhaManager.asSCMHADBTransactionBuffer().flush();
+    Assert.assertEquals(ByteString.copyFromUtf8(message),
+        stateManager.readConfiguration(serviceName));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org