You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by na...@apache.org on 2022/05/05 10:58:14 UTC
[ozone] branch master updated: HDDS-6551. Introduce StatefulService in scm (#3307)
This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new f315500c73 HDDS-6551. Introduce StatefulService in scm (#3307)
f315500c73 is described below
commit f315500c73b86ee571118fdb535d00ed07a9b3c1
Author: Siddhant Sangwan <si...@gmail.com>
AuthorDate: Thu May 5 16:28:09 2022 +0530
HDDS-6551. Introduce StatefulService in scm (#3307)
---
.../hadoop/hdds/scm/metadata/SCMMetadataStore.java | 3 +
.../src/main/proto/SCMRatisProtocol.proto | 1 +
.../hadoop/hdds/scm/ha/SCMHAManagerImpl.java | 2 +
.../hadoop/hdds/scm/ha/SCMHAManagerStub.java | 17 ++-
.../apache/hadoop/hdds/scm/ha/StatefulService.java | 80 +++++++++++++
.../hdds/scm/ha/StatefulServiceStateManager.java | 61 ++++++++++
.../scm/ha/StatefulServiceStateManagerImpl.java | 124 +++++++++++++++++++++
.../hadoop/hdds/scm/metadata/ByteStringCodec.java | 69 ++++++++++++
.../hadoop/hdds/scm/metadata/SCMDBDefinition.java | 13 ++-
.../hdds/scm/metadata/SCMMetadataStoreImpl.java | 13 +++
.../hdds/scm/server/StorageContainerManager.java | 14 +++
.../ha/TestStatefulServiceStateManagerImpl.java | 89 +++++++++++++++
12 files changed, 479 insertions(+), 7 deletions(-)
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
index a63d90d0f4..9eb92f1da1 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.math.BigInteger;
import java.security.cert.X509Certificate;
+import com.google.protobuf.ByteString;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -153,4 +154,6 @@ public interface SCMMetadataStore extends DBStoreHAManager {
* Table that maintains move information.
*/
Table<ContainerID, MoveDataNodePair> getMoveTable();
+
+ Table<String, ByteString> getStatefulServiceConfigTable();
}
diff --git a/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto
index 8e4237ee59..af6232041e 100644
--- a/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto
@@ -27,6 +27,7 @@ enum RequestType {
SEQUENCE_ID = 4;
CERT_STORE = 5;
MOVE = 6;
+ STATEFUL_SERVICE_CONFIG = 7;
}
message Method {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index 0fcc1ec93f..16b32735aa 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -375,6 +375,8 @@ public class SCMHAManagerImpl implements SCMHAManager {
metadataStore.getDeletedBlocksTXTable());
scm.getReplicationManager().getMoveScheduler()
.reinitialize(metadataStore.getMoveTable());
+ scm.getStatefulServiceStateManager().reinitialize(
+ metadataStore.getStatefulServiceConfigTable());
if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
if (scm.getRootCertificateServer() != null) {
scm.getRootCertificateServer().reinitialize(metadataStore);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java
index 6dbbea32cf..28366e6a3e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java
@@ -24,11 +24,13 @@ import java.util.EnumMap;
import java.util.List;
import java.util.Map;
+import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
@@ -60,6 +62,11 @@ public final class SCMHAManagerStub implements SCMHAManager {
return new SCMHAManagerStub(isLeader, buffer);
}
+ public static SCMHAManager getInstance(boolean isLeader, DBStore dbStore) {
+ return new SCMHAManagerStub(isLeader,
+ new SCMHADBTransactionBufferStub(dbStore));
+ }
+
/**
* Creates SCMHAManagerStub instance.
*/
@@ -109,7 +116,9 @@ public final class SCMHAManagerStub implements SCMHAManager {
@Override
public SCMHADBTransactionBuffer asSCMHADBTransactionBuffer() {
- return null;
+ Preconditions
+ .checkArgument(transactionBuffer instanceof SCMHADBTransactionBuffer);
+ return (SCMHADBTransactionBuffer) transactionBuffer;
}
@Override
@@ -191,13 +200,9 @@ public final class SCMHAManagerStub implements SCMHAManager {
"No handler found for request type " + request.getType());
}
- final List<Class<?>> argumentTypes = new ArrayList<>();
- for (Object args : request.getArguments()) {
- argumentTypes.add(args.getClass());
- }
final Object result = handler.getClass()
.getMethod(request.getOperation(),
- argumentTypes.toArray(new Class<?>[0]))
+ request.getParameterTypes())
.invoke(handler, request.getArguments());
return SCMRatisResponse.encode(result);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java
new file mode 100644
index 0000000000..441e83ce13
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * A StatefulService is an SCMService that persists configuration to RocksDB.
+ * The service must define this configuration as a Protobuf message.
+ */
+public abstract class StatefulService implements SCMService {
+ private final StatefulServiceStateManager stateManager;
+
+ /**
+ * Initialize a StatefulService from an extending class.
+ * @param scm {@link StorageContainerManager}
+ */
+ protected StatefulService(StorageContainerManager scm) {
+ stateManager = scm.getStatefulServiceStateManager();
+ }
+
+ /**
+ * Persists the specified {@link GeneratedMessage} configurationMessage
+ * to RocksDB with this service's {@link SCMService#getServiceName()} as the
+ * key.
+ * @param configurationMessage configuration GeneratedMessage to persist
+ * @throws IOException on failure to persist configuration
+ */
+ protected final void saveConfiguration(GeneratedMessage configurationMessage)
+ throws IOException {
+ stateManager.saveConfiguration(getServiceName(),
+ configurationMessage.toByteString());
+ }
+
+ /**
+ * Reads persisted configuration mapped to this service's
+ * {@link SCMService#getServiceName()} name.
+ *
+ * @param configType the Class object of the protobuf message type
+ * @param <T> the Type of the protobuf message
+ * @return persisted protobuf message
+ * @throws IOException on failure to fetch the message from DB or when
+ * parsing it. ensure the specified configType is correct
+ */
+ protected final <T extends GeneratedMessage> T readConfiguration(
+ Class<T> configType) throws IOException {
+ try {
+ return configType.cast(ReflectionUtil.getMethod(configType,
+ "parseFrom", ByteString.class)
+ .invoke(null, stateManager.readConfiguration(getServiceName())));
+ } catch (NoSuchMethodException | IllegalAccessException
+ | InvocationTargetException e) {
+ e.printStackTrace();
+ throw new InvalidProtocolBufferException("GeneratedMessage cannot " +
+ "be parsed for type " + configType + ": " + e.getMessage());
+ }
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManager.java
new file mode 100644
index 0000000000..a6a8a50ae5
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManager.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.hdds.scm.metadata.Replicate;
+import org.apache.hadoop.hdds.utils.db.Table;
+
+import java.io.IOException;
+
+/**
+ * This interface defines an API for saving and reading configurations of a
+ * {@link StatefulService}.
+ */
+public interface StatefulServiceStateManager {
+
+ /**
+ * Persists the specified configurations bytes to RocksDB and replicates
+ * this operation through RATIS. The specified serviceName maps to the
+ * persisted bytes.
+ * @param serviceName name of the {@link StatefulService}, obtained
+ * through {@link SCMService#getServiceName()}
+ * @param bytes configuration to persist as a ByteString
+ * @throws IOException on failure to persist configuration
+ */
+ @Replicate
+ void saveConfiguration(String serviceName, ByteString bytes)
+ throws IOException;
+
+ /**
+ * Reads the persisted configuration mapped to the specified serviceName.
+ * @param serviceName name of the {@link StatefulService}, obtained through
+ * {@link SCMService#getServiceName()}
+ * @return configuration as a ByteString
+ * @throws IOException on failure
+ */
+ ByteString readConfiguration(String serviceName) throws IOException;
+
+ /**
+ * Sets the updated reference to the table when reloading SCM state.
+ * @param statefulServiceConfig table from
+ * {@link org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore}
+ */
+ void reinitialize(Table<String, ByteString> statefulServiceConfig);
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java
new file mode 100644
index 0000000000..d470f1bf7f
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulServiceStateManagerImpl.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
+import org.apache.hadoop.hdds.utils.db.Table;
+
+import java.io.IOException;
+import java.lang.reflect.Proxy;
+
+/**
+ * This class implements methods to save and read configurations of a
+ * stateful service from DB.
+ */
+public final class StatefulServiceStateManagerImpl
+ implements StatefulServiceStateManager {
+
+ // this table maps the service name to the configuration (ByteString)
+ private Table<String, ByteString> statefulServiceConfig;
+ private final DBTransactionBuffer transactionBuffer;
+
+ private StatefulServiceStateManagerImpl(
+ Table<String, ByteString> statefulServiceConfig,
+ DBTransactionBuffer scmDBTransactionBuffer) {
+ this.statefulServiceConfig = statefulServiceConfig;
+ this.transactionBuffer = scmDBTransactionBuffer;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void saveConfiguration(String serviceName, ByteString bytes)
+ throws IOException {
+ transactionBuffer.addToBuffer(statefulServiceConfig, serviceName, bytes);
+ if (transactionBuffer instanceof SCMHADBTransactionBuffer) {
+ SCMHADBTransactionBuffer buffer =
+ (SCMHADBTransactionBuffer) transactionBuffer;
+ buffer.flush();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ByteString readConfiguration(String serviceName) throws IOException {
+ return statefulServiceConfig.get(serviceName);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void reinitialize(Table<String, ByteString> configs) {
+ this.statefulServiceConfig = configs;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for StatefulServiceStateManager.
+ */
+ public static class Builder {
+ private Table<String, ByteString> statefulServiceConfig;
+ private DBTransactionBuffer transactionBuffer;
+ private SCMRatisServer scmRatisServer;
+
+ public Builder setStatefulServiceConfig(
+ final Table<String, ByteString> statefulServiceConfig) {
+ this.statefulServiceConfig = statefulServiceConfig;
+ return this;
+ }
+
+ public Builder setSCMDBTransactionBuffer(
+ final DBTransactionBuffer dbTransactionBuffer) {
+ this.transactionBuffer = dbTransactionBuffer;
+ return this;
+ }
+
+ public Builder setRatisServer(final SCMRatisServer ratisServer) {
+ scmRatisServer = ratisServer;
+ return this;
+ }
+
+ public StatefulServiceStateManager build() {
+ Preconditions.checkNotNull(statefulServiceConfig);
+ Preconditions.checkNotNull(transactionBuffer);
+
+ final StatefulServiceStateManager stateManager =
+ new StatefulServiceStateManagerImpl(statefulServiceConfig,
+ transactionBuffer);
+
+ final SCMHAInvocationHandler invocationHandler =
+ new SCMHAInvocationHandler(RequestType.STATEFUL_SERVICE_CONFIG,
+ stateManager, scmRatisServer);
+
+ return (StatefulServiceStateManager) Proxy.newProxyInstance(
+ SCMHAInvocationHandler.class.getClassLoader(),
+ new Class<?>[]{StatefulServiceStateManager.class}, invocationHandler);
+ }
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/ByteStringCodec.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/ByteStringCodec.java
new file mode 100644
index 0000000000..fb25a6b766
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/ByteStringCodec.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.metadata;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.hdds.utils.db.Codec;
+
+import java.io.IOException;
+
+/**
+ * Codec to serialize/deserialize a {@link ByteString}.
+ */
+public class ByteStringCodec implements Codec<ByteString> {
+
+ /**
+ * Convert object to raw persisted format.
+ *
+ * @param object The original java object. Should not be null.
+ */
+ @Override
+ public byte[] toPersistedFormat(ByteString object) throws IOException {
+ if (object == null) {
+ return new byte[0];
+ }
+ return object.toByteArray();
+ }
+
+ /**
+ * Convert object from raw persisted format.
+ *
+ * @param rawData Byte array from the key/value store. Should not be null.
+ */
+ @Override
+ public ByteString fromPersistedFormat(byte[] rawData) throws IOException {
+ if (rawData == null) {
+ return ByteString.EMPTY;
+ }
+ return ByteString.copyFrom(rawData);
+ }
+
+ /**
+ * Copy Object from the provided object, and returns a new object.
+ *
+ * @param object a ByteString
+ */
+ @Override
+ public ByteString copyObject(ByteString object) {
+ if (object == null) {
+ return ByteString.EMPTY;
+ }
+ return ByteString.copyFrom(object.toByteArray());
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
index b3e861b40c..c8fdaa3ecb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.metadata;
import java.math.BigInteger;
import java.security.cert.X509Certificate;
+import com.google.protobuf.ByteString;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -154,6 +155,15 @@ public class SCMDBDefinition implements DBDefinition {
MoveDataNodePair.class,
new MoveDataNodePairCodec());
+ public static final DBColumnFamilyDefinition<String, ByteString>
+ STATEFUL_SERVICE_CONFIG =
+ new DBColumnFamilyDefinition<>(
+ "statefulServiceConfig",
+ String.class,
+ new StringCodec(),
+ ByteString.class,
+ new ByteStringCodec());
+
@Override
public String getName() {
return "scm.db";
@@ -168,6 +178,7 @@ public class SCMDBDefinition implements DBDefinition {
public DBColumnFamilyDefinition[] getColumnFamilies() {
return new DBColumnFamilyDefinition[] {DELETED_BLOCKS, VALID_CERTS,
VALID_SCM_CERTS, REVOKED_CERTS, REVOKED_CERTS_V2, PIPELINES, CONTAINERS,
- TRANSACTIONINFO, CRLS, CRL_SEQUENCE_ID, SEQUENCE_ID, MOVE};
+ TRANSACTIONINFO, CRLS, CRL_SEQUENCE_ID, SEQUENCE_ID, MOVE,
+ STATEFUL_SERVICE_CONFIG};
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
index de7fcb0b74..2bfb136f3f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.math.BigInteger;
import java.security.cert.X509Certificate;
+import com.google.protobuf.ByteString;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -48,6 +49,7 @@ import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.MOVE;
import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.PIPELINES;
import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.REVOKED_CERTS;
import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.REVOKED_CERTS_V2;
+import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.STATEFUL_SERVICE_CONFIG;
import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.TRANSACTIONINFO;
import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.VALID_CERTS;
import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.VALID_SCM_CERTS;
@@ -88,6 +90,8 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore {
private Table<ContainerID, MoveDataNodePair> moveTable;
+ private Table<String, ByteString> statefulServiceConfigTable;
+
private static final Logger LOG =
LoggerFactory.getLogger(SCMMetadataStoreImpl.class);
private DBStore store;
@@ -174,6 +178,10 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore {
moveTable = MOVE.getTable(store);
checkTableStatus(moveTable, MOVE.getName());
+
+ statefulServiceConfigTable = STATEFUL_SERVICE_CONFIG.getTable(store);
+ checkTableStatus(statefulServiceConfigTable,
+ STATEFUL_SERVICE_CONFIG.getName());
}
}
@@ -279,6 +287,11 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore {
return moveTable;
}
+ @Override
+ public Table<String, ByteString> getStatefulServiceConfigTable() {
+ return statefulServiceConfigTable;
+ }
+
private void checkTableStatus(Table table, String name) throws IOException {
String logMessage = "Unable to get a reference to %s table. Cannot " +
"continue.";
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 13b3f7239b..7c58def298 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -56,6 +56,8 @@ import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.node.CommandQueueReportHandler;
+import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManager;
+import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManagerImpl;
import org.apache.hadoop.hdds.scm.server.upgrade.ScmHAUnfinalizedStateValidationAction;
import org.apache.hadoop.hdds.scm.pipeline.WritableContainerFactory;
import org.apache.hadoop.hdds.security.token.ContainerTokenGenerator;
@@ -275,6 +277,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
private final SCMHANodeDetails scmHANodeDetails;
private ContainerBalancer containerBalancer;
+ private StatefulServiceStateManager statefulServiceStateManager;
/**
* Creates a new StorageContainerManager. Configuration will be
@@ -659,6 +662,13 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
}
scmDecommissionManager = new NodeDecommissionManager(conf, scmNodeManager,
containerManager, scmContext, eventQueue, replicationManager);
+
+ statefulServiceStateManager = StatefulServiceStateManagerImpl.newBuilder()
+ .setStatefulServiceConfig(
+ scmMetadataStore.getStatefulServiceConfigTable())
+ .setSCMDBTransactionBuffer(scmHAManager.getDBTransactionBuffer())
+ .setRatisServer(scmHAManager.getRatisServer())
+ .build();
}
/**
@@ -1825,6 +1835,10 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
return this.clusterMap;
}
+ public StatefulServiceStateManager getStatefulServiceStateManager() {
+ return statefulServiceStateManager;
+ }
+
/**
* Get the safe mode status of all rules.
*
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestStatefulServiceStateManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestStatefulServiceStateManagerImpl.java
new file mode 100644
index 0000000000..65e0528a98
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestStatefulServiceStateManagerImpl.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Tests StatefulServiceStateManagerImpl.
+ */
+public class TestStatefulServiceStateManagerImpl {
+ private OzoneConfiguration conf;
+ private DBStore dbStore;
+ private SCMHAManager scmhaManager;
+ private Table<String, ByteString> statefulServiceConfig;
+ private StatefulServiceStateManager stateManager;
+
+ @Before
+ public void setup() throws AuthenticationException, IOException {
+ conf = new OzoneConfiguration();
+ conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true);
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
+ GenericTestUtils.getRandomizedTempPath());
+ dbStore = DBStoreBuilder.createDBStore(conf, new SCMDBDefinition());
+ statefulServiceConfig =
+ SCMDBDefinition.STATEFUL_SERVICE_CONFIG.getTable(dbStore);
+ scmhaManager = SCMHAManagerStub.getInstance(true, dbStore);
+ stateManager =
+ StatefulServiceStateManagerImpl.newBuilder()
+ .setStatefulServiceConfig(statefulServiceConfig)
+ .setRatisServer(scmhaManager.getRatisServer())
+ .setSCMDBTransactionBuffer(
+ scmhaManager.asSCMHADBTransactionBuffer())
+ .build();
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ if (dbStore != null) {
+ dbStore.close();
+ }
+ }
+
+ /**
+ * Tests {@link
+ * StatefulServiceStateManagerImpl#saveConfiguration(String, ByteString)}.
+ * @throws IOException on failure
+ */
+ @Test
+ public void testSaveConfiguration() throws IOException {
+ String serviceName = "test";
+ String message = "message_string";
+ stateManager.saveConfiguration(serviceName,
+ ByteString.copyFromUtf8(message));
+ scmhaManager.asSCMHADBTransactionBuffer().flush();
+ Assert.assertEquals(ByteString.copyFromUtf8(message),
+ stateManager.readConfiguration(serviceName));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org