You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2019/02/12 09:01:25 UTC
[hadoop] branch trunk updated: HDDS-360. Use RocksDBStore and
TableStore for SCM Metadata. Contributed by Anu Engineer.
This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new a536eb5 HDDS-360. Use RocksDBStore and TableStore for SCM Metadata. Contributed by Anu Engineer.
a536eb5 is described below
commit a536eb5c419cb507b4b0aed123be0255c57ef4dd
Author: Nanda kumar <na...@apache.org>
AuthorDate: Tue Feb 12 14:25:14 2019 +0530
HDDS-360. Use RocksDBStore and TableStore for SCM Metadata.
Contributed by Anu Engineer.
---
.../java/org/apache/hadoop/ozone/OzoneConsts.java | 1 +
.../apache/hadoop/utils/db/RDBStoreIterator.java | 21 +-
.../org/apache/hadoop/utils/db/TableIterator.java | 12 +
.../org/apache/hadoop/utils/db/TypedTable.java | 22 ++
.../java/org/apache/hadoop/hdds/scm/ScmUtils.java | 37 +++
.../hadoop/hdds/scm/block/BlockManagerImpl.java | 61 +++--
.../hadoop/hdds/scm/block/DeletedBlockLogImpl.java | 263 ++++++++-------------
.../metadata/DeletedBlocksTransactionCodec.java | 46 ++++
.../scm/{ScmUtils.java => metadata/LongCodec.java} | 34 ++-
.../hadoop/hdds/scm/metadata/SCMMetadataStore.java | 73 ++++++
.../hdds/scm/metadata/SCMMetadataStoreRDBImpl.java | 148 ++++++++++++
.../{ScmUtils.java => metadata/package-info.java} | 28 +--
.../hadoop/hdds/scm/node/SCMNodeManager.java | 5 +-
.../hdds/scm/server/SCMBlockProtocolServer.java | 4 +-
.../hdds/scm/server/SCMClientProtocolServer.java | 4 +-
.../hadoop/hdds/scm/server/SCMConfigurator.java | 202 ++++++++++++++++
.../{SCMStorage.java => SCMStorageConfig.java} | 12 +-
.../hdds/scm/server/StorageContainerManager.java | 256 +++++++++++++++-----
.../org/apache/hadoop/hdds/scm/HddsTestUtils.java | 4 +-
.../java/org/apache/hadoop/hdds/scm/TestUtils.java | 23 ++
.../hadoop/hdds/scm/block/TestBlockManager.java | 70 +++---
.../hadoop/hdds/scm/block/TestDeletedBlockLog.java | 29 ++-
.../main/java/org/apache/hadoop/ozone/OmUtils.java | 32 +--
.../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 7 +-
.../hadoop/ozone/TestSecureOzoneCluster.java | 60 ++---
.../hadoop/ozone/TestStorageContainerManager.java | 11 +-
.../commandhandler/TestBlockDeletion.java | 4 +-
.../apache/hadoop/ozone/om/TestOzoneManager.java | 8 +-
.../hadoop/ozone/scm/TestContainerSQLCli.java | 4 +-
.../org/apache/hadoop/ozone/scm/package-info.java | 31 +--
30 files changed, 1045 insertions(+), 467 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 1d696f8..ca319f6 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -115,6 +115,7 @@ public final class OzoneConsts {
public static final String DELETED_BLOCK_DB = "deletedBlock.db";
public static final String OM_DB_NAME = "om.db";
public static final String OZONE_MANAGER_TOKEN_DB_NAME = "om-token.db";
+ public static final String SCM_DB_NAME = "scm.db";
public static final String STORAGE_DIR_CHUNKS = "chunks";
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStoreIterator.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStoreIterator.java
index 1faa089..ecc065b 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStoreIterator.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStoreIterator.java
@@ -41,7 +41,7 @@ public class RDBStoreIterator
@Override
public void forEachRemaining(
Consumer<? super ByteArrayKeyValue> action) {
- while(hasNext()) {
+ while (hasNext()) {
action.accept(next());
}
}
@@ -56,7 +56,7 @@ public class RDBStoreIterator
if (rocksDBIterator.isValid()) {
ByteArrayKeyValue value =
ByteArrayKeyValue.create(rocksDBIterator.key(), rocksDBIterator
- .value());
+ .value());
rocksDBIterator.next();
return value;
}
@@ -84,6 +84,23 @@ public class RDBStoreIterator
}
@Override
+ public byte[] key() {
+ if (rocksDBIterator.isValid()) {
+ return rocksDBIterator.key();
+ }
+ return null;
+ }
+
+ @Override
+ public ByteArrayKeyValue value() {
+ if (rocksDBIterator.isValid()) {
+ return ByteArrayKeyValue.create(rocksDBIterator.key(),
+ rocksDBIterator.value());
+ }
+ return null;
+ }
+
+ @Override
public void close() throws IOException {
rocksDBIterator.close();
}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TableIterator.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TableIterator.java
index 071dbf4..fcd8535 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TableIterator.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TableIterator.java
@@ -47,4 +47,16 @@ public interface TableIterator<KEY, T> extends Iterator<T>, Closeable {
*/
T seek(KEY key);
+ /**
+ * Returns the key value at the current position.
+ * @return KEY
+ */
+ KEY key();
+
+ /**
+ * Returns the VALUE at the current position.
+ * @return VALUE
+ */
+ T value();
+
}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java
index d0a33fd..f715572 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java
@@ -140,12 +140,16 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
private TableIterator<byte[], ? extends KeyValue<byte[], byte[]>>
rawIterator;
+ private final Class<KEY> keyClass;
+ private final Class<VALUE> valueClass;
public TypedTableIterator(
TableIterator<byte[], ? extends KeyValue<byte[], byte[]>> rawIterator,
Class<KEY> keyType,
Class<VALUE> valueType) {
this.rawIterator = rawIterator;
+ keyClass = keyType;
+ valueClass = valueType;
}
@Override
@@ -169,6 +173,24 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
}
@Override
+ public KEY key() {
+ byte[] result = rawIterator.key();
+ if (result == null) {
+ return null;
+ }
+ return codecRegistry.asObject(result, keyClass);
+ }
+
+ @Override
+ public TypedKeyValue value() {
+ KeyValue keyValue = rawIterator.value();
+ if(keyValue != null) {
+ return new TypedKeyValue(keyValue, keyClass, valueClass);
+ }
+ return null;
+ }
+
+ @Override
public void close() throws IOException {
rawIterator.close();
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
index 43b4452..5b17de9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
@@ -18,14 +18,25 @@
package org.apache.hadoop.hdds.scm;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
import org.apache.hadoop.hdds.scm.chillmode.Precheck;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Collection;
/**
* SCM utility class.
*/
public final class ScmUtils {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(ScmUtils.class);
private ScmUtils() {
}
@@ -42,4 +53,30 @@ public final class ScmUtils {
preCheck.check(operation);
}
}
+
+ public static File getDBPath(Configuration conf, String dbDirectory) {
+ final Collection<String> dbDirs =
+ conf.getTrimmedStringCollection(dbDirectory);
+
+ if (dbDirs.size() > 1) {
+ throw new IllegalArgumentException(
+ "Bad configuration setting " + dbDirectory
+ + ". OM does not support multiple metadata dirs currently.");
+ }
+
+ if (dbDirs.size() == 1) {
+ final File dbDirPath = new File(dbDirs.iterator().next());
+ if (!dbDirPath.exists() && !dbDirPath.mkdirs()) {
+ throw new IllegalArgumentException(
+ "Unable to create directory " + dbDirPath
+ + " specified in configuration setting " + dbDirectory);
+ }
+ return dbDirPath;
+ }
+
+ LOG.warn("{} is not configured. We recommend adding this setting. "
+ + "Falling back to {} instead.", dbDirectory,
+ HddsConfigKeys.OZONE_METADATA_DIRS);
+ return ServerUtils.getOzoneMetaDirPath(conf);
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index c4ce69b..a4757ee 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -16,50 +16,45 @@
*/
package org.apache.hadoop.hdds.scm.block;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import javax.management.ObjectName;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmUtils;
import org.apache.hadoop.hdds.scm.chillmode.ChillModePrecheck;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.management.ObjectName;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
- .INVALID_BLOCK_SIZE;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
- .OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
- .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
- .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
- .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.INVALID_BLOCK_SIZE;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
/** Block Manager manages the block access for SCM. */
public class BlockManagerImpl implements EventHandler<Boolean>,
@@ -85,18 +80,14 @@ public class BlockManagerImpl implements EventHandler<Boolean>,
* Constructor.
*
* @param conf - configuration.
- * @param nodeManager - node manager.
- * @param pipelineManager - pipeline manager.
- * @param containerManager - container manager.
- * @param eventPublisher - event publisher.
+ * @param scm
* @throws IOException
*/
- public BlockManagerImpl(final Configuration conf,
- final NodeManager nodeManager, final PipelineManager pipelineManager,
- final ContainerManager containerManager, EventPublisher eventPublisher)
+ public BlockManagerImpl(final Configuration conf, StorageContainerManager scm)
throws IOException {
- this.pipelineManager = pipelineManager;
- this.containerManager = containerManager;
+ Objects.requireNonNull(scm, "SCM cannot be null");
+ this.pipelineManager = scm.getPipelineManager();
+ this.containerManager = scm.getContainerManager();
this.containerSize = (long)conf.getStorageSize(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
@@ -106,7 +97,8 @@ public class BlockManagerImpl implements EventHandler<Boolean>,
mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);
// SCM block deleting transaction log and deleting service.
- deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager);
+ deletedBlockLog = new DeletedBlockLogImpl(conf, scm.getContainerManager(),
+ scm.getScmMetadataStore());
long svcInterval =
conf.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT,
@@ -118,7 +110,8 @@ public class BlockManagerImpl implements EventHandler<Boolean>,
TimeUnit.MILLISECONDS);
blockDeletingService =
new SCMBlockDeletingService(deletedBlockLog, containerManager,
- nodeManager, eventPublisher, svcInterval, serviceTimeout, conf);
+ scm.getScmNodeManager(), scm.getEventQueue(), svcInterval,
+ serviceTimeout, conf);
chillModePrecheck = new ChillModePrecheck(conf);
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index 766d428..5ff34f5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -17,39 +17,8 @@
*/
package org.apache.hadoop.hdds.scm.block;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
-import com.google.common.primitives.Longs;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
- .DeleteBlockTransactionResult;
-import org.apache.hadoop.hdds.scm.command
- .CommandStatusReportHandler.DeleteBlockStatus;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.ContainerManager;
-import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.hdds.server.ServerUtils;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.utils.BatchOperation;
-import org.apache.hadoop.utils.MetadataStore;
-import org.apache.hadoop.utils.MetadataStoreBuilder;
-import org.eclipse.jetty.util.ConcurrentHashSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
import java.io.IOException;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -60,17 +29,29 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.utils.db.BatchOperation;
+import org.apache.hadoop.utils.db.Table;
+import org.apache.hadoop.utils.db.TableIterator;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static java.lang.Math.min;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
- .OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
- .OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
- .OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
- .OZONE_SCM_DB_CACHE_SIZE_MB;
-import static org.apache.hadoop.ozone.OzoneConsts.DELETED_BLOCK_DB;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT;
/**
* A implement class of {@link DeletedBlockLog}, and it uses
@@ -88,40 +69,21 @@ public class DeletedBlockLogImpl
public static final Logger LOG =
LoggerFactory.getLogger(DeletedBlockLogImpl.class);
- private static final byte[] LATEST_TXID =
- DFSUtil.string2Bytes("#LATEST_TXID#");
-
private final int maxRetry;
- private final MetadataStore deletedStore;
private final ContainerManager containerManager;
+ private final SCMMetadataStore scmMetadataStore;
private final Lock lock;
- // The latest id of deleted blocks in the db.
- private long lastTxID;
// Maps txId to set of DNs which are successful in committing the transaction
private Map<Long, Set<UUID>> transactionToDNsCommitMap;
public DeletedBlockLogImpl(Configuration conf,
- ContainerManager containerManager) throws IOException {
+ ContainerManager containerManager,
+ SCMMetadataStore scmMetadataStore) {
maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
-
- final File metaDir = ServerUtils.getScmDbDir(conf);
- final String scmMetaDataDir = metaDir.getPath();
- final File deletedLogDbPath = new File(scmMetaDataDir, DELETED_BLOCK_DB);
- final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
- OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
- // Load store of all transactions.
- deletedStore = MetadataStoreBuilder.newBuilder()
- .setCreateIfMissing(true)
- .setConf(conf)
- .setDbFile(deletedLogDbPath)
- .setCacheSize(cacheSize * OzoneConsts.MB)
- .build();
this.containerManager = containerManager;
-
+ this.scmMetadataStore = scmMetadataStore;
this.lock = new ReentrantLock();
- // start from the head of deleted store.
- lastTxID = findLatestTxIDInStore();
// transactionToDNsCommitMap is updated only when
// transaction is added to the log and when it is removed.
@@ -130,26 +92,6 @@ public class DeletedBlockLogImpl
transactionToDNsCommitMap = new ConcurrentHashMap<>();
}
- @VisibleForTesting
- public MetadataStore getDeletedStore() {
- return deletedStore;
- }
-
- /**
- * There is no need to lock before reading because
- * it's only used in construct method.
- *
- * @return latest txid.
- * @throws IOException
- */
- private long findLatestTxIDInStore() throws IOException {
- long txid = 0;
- byte[] value = deletedStore.get(LATEST_TXID);
- if (value != null) {
- txid = Longs.fromByteArray(value);
- }
- return txid;
- }
@Override
public List<DeletedBlocksTransaction> getFailedTransactions()
@@ -157,16 +99,16 @@ public class DeletedBlockLogImpl
lock.lock();
try {
final List<DeletedBlocksTransaction> failedTXs = Lists.newArrayList();
- deletedStore.iterate(null, (key, value) -> {
- if (!Arrays.equals(LATEST_TXID, key)) {
- DeletedBlocksTransaction delTX =
- DeletedBlocksTransaction.parseFrom(value);
+ try (TableIterator<Long,
+ ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
+ scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
+ while (iter.hasNext()) {
+ DeletedBlocksTransaction delTX = iter.next().getValue();
if (delTX.getCount() == -1) {
failedTXs.add(delTX);
}
}
- return true;
- });
+ }
return failedTXs;
} finally {
lock.unlock();
@@ -181,44 +123,44 @@ public class DeletedBlockLogImpl
*/
@Override
public void incrementCount(List<Long> txIDs) throws IOException {
- BatchOperation batch = new BatchOperation();
- lock.lock();
- try {
- for(Long txID : txIDs) {
- try {
- byte[] deleteBlockBytes =
- deletedStore.get(Longs.toByteArray(txID));
- if (deleteBlockBytes == null) {
- LOG.warn("Delete txID {} not found", txID);
- continue;
- }
- DeletedBlocksTransaction block = DeletedBlocksTransaction
- .parseFrom(deleteBlockBytes);
- DeletedBlocksTransaction.Builder builder = block.toBuilder();
- int currentCount = block.getCount();
- if (currentCount > -1) {
- builder.setCount(++currentCount);
- }
- // if the retry time exceeds the maxRetry value
- // then set the retry value to -1, stop retrying, admins can
- // analyze those blocks and purge them manually by SCMCli.
- if (currentCount > maxRetry) {
- builder.setCount(-1);
- }
- deletedStore.put(Longs.toByteArray(txID),
- builder.build().toByteArray());
- } catch (IOException ex) {
- LOG.warn("Cannot increase count for txID " + txID, ex);
+ for (Long txID : txIDs) {
+ lock.lock();
+ try {
+ DeletedBlocksTransaction block =
+ scmMetadataStore.getDeletedBlocksTXTable().get(txID);
+ if (block == null) {
+ // Should we make this an error ? How can we not find the deleted
+ // TXID?
+ LOG.warn("Deleted TXID not found.");
+ continue;
+ }
+ DeletedBlocksTransaction.Builder builder = block.toBuilder();
+ int currentCount = block.getCount();
+ if (currentCount > -1) {
+ builder.setCount(++currentCount);
+ }
+ // if the retry time exceeds the maxRetry value
+ // then set the retry value to -1, stop retrying, admins can
+ // analyze those blocks and purge them manually by SCMCli.
+ if (currentCount > maxRetry) {
+ builder.setCount(-1);
}
+ scmMetadataStore.getDeletedBlocksTXTable().put(txID,
+ builder.build());
+ } catch (IOException ex) {
+ LOG.warn("Cannot increase count for txID " + txID, ex);
+ // We do not throw error here, since we don't want to abort the loop.
+ // Just log and continue processing the rest of txids.
+ } finally {
+ lock.unlock();
}
- deletedStore.writeBatch(batch);
- } finally {
- lock.unlock();
}
}
+
private DeletedBlocksTransaction constructNewTransaction(long txID,
- long containerID, List<Long> blocks) {
+ long containerID,
+ List<Long> blocks) {
return DeletedBlocksTransaction.newBuilder()
.setTxID(txID)
.setContainerID(containerID)
@@ -231,7 +173,8 @@ public class DeletedBlockLogImpl
* {@inheritDoc}
*
* @param transactionResults - transaction IDs.
- * @param dnID - Id of Datanode which has acknowledged a delete block command.
+ * @param dnID - Id of Datanode which has acknowledged
+ * a delete block command.
* @throws IOException
*/
@Override
@@ -259,8 +202,8 @@ public class DeletedBlockLogImpl
}
dnsWithCommittedTxn.add(dnID);
- final ContainerInfo container = containerManager
- .getContainer(containerId);
+ final ContainerInfo container =
+ containerManager.getContainer(containerId);
final Set<ContainerReplica> replicas =
containerManager.getContainerReplicas(containerId);
// The delete entry can be safely removed from the log if all the
@@ -275,7 +218,7 @@ public class DeletedBlockLogImpl
if (dnsWithCommittedTxn.containsAll(containerDns)) {
transactionToDNsCommitMap.remove(txID);
LOG.debug("Purging txId={} from block deletion log", txID);
- deletedStore.delete(Longs.toByteArray(txID));
+ scmMetadataStore.getDeletedBlocksTXTable().delete(txID);
}
}
LOG.debug("Datanode txId={} containerId={} committed by dnId={}",
@@ -308,24 +251,18 @@ public class DeletedBlockLogImpl
* {@inheritDoc}
*
* @param containerID - container ID.
- * @param blocks - blocks that belong to the same container.
+ * @param blocks - blocks that belong to the same container.
* @throws IOException
*/
@Override
public void addTransaction(long containerID, List<Long> blocks)
throws IOException {
- BatchOperation batch = new BatchOperation();
lock.lock();
try {
- DeletedBlocksTransaction tx = constructNewTransaction(lastTxID + 1,
- containerID, blocks);
- byte[] key = Longs.toByteArray(lastTxID + 1);
-
- batch.put(key, tx.toByteArray());
- batch.put(LATEST_TXID, Longs.toByteArray(lastTxID + 1));
-
- deletedStore.writeBatch(batch);
- lastTxID += 1;
+ Long nextTXID = scmMetadataStore.getNextDeleteBlockTXID();
+ DeletedBlocksTransaction tx =
+ constructNewTransaction(nextTXID, containerID, blocks);
+ scmMetadataStore.getDeletedBlocksTXTable().put(nextTXID, tx);
} finally {
lock.unlock();
}
@@ -336,17 +273,16 @@ public class DeletedBlockLogImpl
lock.lock();
try {
final AtomicInteger num = new AtomicInteger(0);
- deletedStore.iterate(null, (key, value) -> {
- // Exclude latest txid record
- if (!Arrays.equals(LATEST_TXID, key)) {
- DeletedBlocksTransaction delTX =
- DeletedBlocksTransaction.parseFrom(value);
+ try (TableIterator<Long,
+ ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
+ scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
+ while (iter.hasNext()) {
+ DeletedBlocksTransaction delTX = iter.next().getValue();
if (delTX.getCount() > -1) {
num.incrementAndGet();
}
}
- return true;
- });
+ }
return num.get();
} finally {
lock.unlock();
@@ -360,24 +296,19 @@ public class DeletedBlockLogImpl
* @throws IOException
*/
@Override
- public void addTransactions(
- Map<Long, List<Long>> containerBlocksMap)
+ public void addTransactions(Map<Long, List<Long>> containerBlocksMap)
throws IOException {
- BatchOperation batch = new BatchOperation();
lock.lock();
try {
- long currentLatestID = lastTxID;
- for (Map.Entry<Long, List<Long>> entry :
- containerBlocksMap.entrySet()) {
- currentLatestID += 1;
- byte[] key = Longs.toByteArray(currentLatestID);
- DeletedBlocksTransaction tx = constructNewTransaction(currentLatestID,
+ BatchOperation batch = scmMetadataStore.getStore().initBatchOperation();
+ for (Map.Entry<Long, List<Long>> entry : containerBlocksMap.entrySet()) {
+ long nextTXID = scmMetadataStore.getNextDeleteBlockTXID();
+ DeletedBlocksTransaction tx = constructNewTransaction(nextTXID,
entry.getKey(), entry.getValue());
- batch.put(key, tx.toByteArray());
+ scmMetadataStore.getDeletedBlocksTXTable().putWithBatch(batch,
+ nextTXID, tx);
}
- lastTxID = currentLatestID;
- batch.put(LATEST_TXID, Longs.toByteArray(lastTxID));
- deletedStore.writeBatch(batch);
+ scmMetadataStore.getStore().commitBatchOperation(batch);
} finally {
lock.unlock();
}
@@ -385,9 +316,6 @@ public class DeletedBlockLogImpl
@Override
public void close() throws IOException {
- if (deletedStore != null) {
- deletedStore.close();
- }
}
@Override
@@ -396,11 +324,12 @@ public class DeletedBlockLogImpl
lock.lock();
try {
Map<Long, Long> deleteTransactionMap = new HashMap<>();
- deletedStore.iterate(null, (key, value) -> {
- if (!Arrays.equals(LATEST_TXID, key)) {
- DeletedBlocksTransaction block = DeletedBlocksTransaction
- .parseFrom(value);
-
+ try (TableIterator<Long,
+ ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
+ scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
+ while (iter.hasNext()) {
+ Table.KeyValue<Long, DeletedBlocksTransaction> keyValue = iter.next();
+ DeletedBlocksTransaction block = keyValue.getValue();
if (block.getCount() > -1 && block.getCount() <= maxRetry) {
if (transactions.addTransaction(block,
transactionToDNsCommitMap.get(block.getTxID()))) {
@@ -409,10 +338,8 @@ public class DeletedBlockLogImpl
.putIfAbsent(block.getTxID(), new ConcurrentHashSet<>());
}
}
- return !transactions.isFull();
}
- return true;
- });
+ }
return deleteTransactionMap;
} finally {
lock.unlock();
@@ -421,7 +348,7 @@ public class DeletedBlockLogImpl
@Override
public void onMessage(DeleteBlockStatus deleteBlockStatus,
- EventPublisher publisher) {
+ EventPublisher publisher) {
ContainerBlocksDeletionACKProto ackProto =
deleteBlockStatus.getCmdStatus().getBlockDeletionAck();
commitTransactions(ackProto.getResultsList(),
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/DeletedBlocksTransactionCodec.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/DeletedBlocksTransactionCodec.java
new file mode 100644
index 0000000..5deb3aa
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/DeletedBlocksTransactionCodec.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.metadata;
+
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.utils.db.Codec;
+
+/**
+ * Codec for Persisting the DeletedBlocks.
+ */
+public class DeletedBlocksTransactionCodec
+ implements Codec<DeletedBlocksTransaction> {
+ @Override
+ public byte[] toPersistedFormat(DeletedBlocksTransaction object) {
+ return object.toByteArray();
+ }
+
+ @Override
+ public DeletedBlocksTransaction fromPersistedFormat(byte[] rawData) {
+ try {
+ return DeletedBlocksTransaction.parseFrom(rawData);
+ } catch (InvalidProtocolBufferException e) {
+ throw new IllegalArgumentException(
+ "Can't convert rawBytes to DeletedBlocksTransaction.", e);
+ }
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/LongCodec.java
similarity index 53%
copy from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
copy to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/LongCodec.java
index 43b4452..2d495ab 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/LongCodec.java
@@ -5,7 +5,7 @@
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -14,32 +14,26 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
*/
-package org.apache.hadoop.hdds.scm;
+package org.apache.hadoop.hdds.scm.metadata;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
-import org.apache.hadoop.hdds.scm.chillmode.Precheck;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.utils.db.Codec;
/**
- * SCM utility class.
+ * Codec for Persisting the DeletedBlocks.
*/
-public final class ScmUtils {
+public class LongCodec implements Codec<Long> {
- private ScmUtils() {
+ @Override
+ public byte[] toPersistedFormat(Long object) {
+ return Longs.toByteArray(object);
}
- /**
- * Perform all prechecks for given scm operation.
- *
- * @param operation
- * @param preChecks prechecks to be performed
- */
- public static void preCheck(ScmOps operation, Precheck... preChecks)
- throws SCMException {
- for (Precheck preCheck : preChecks) {
- preCheck.check(operation);
- }
+ @Override
+ public Long fromPersistedFormat(byte[] rawData) {
+ return Longs.fromByteArray(rawData);
}
-}
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
new file mode 100644
index 0000000..8009e66
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.metadata;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import java.io.IOException;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.utils.db.DBStore;
+import org.apache.hadoop.utils.db.Table;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+
+/**
+ * Generic interface for data stores for SCM.
+ * This is similar to the OMMetadataStore class,
+ * where we write classes into some underlying storage system.
+ */
+public interface SCMMetadataStore {
+ /**
+ * Start metadata manager.
+ *
+ * @param configuration - Configuration
+ * @throws IOException - Unable to start metadata store.
+ */
+ void start(OzoneConfiguration configuration) throws IOException;
+
+ /**
+ * Stop metadata manager.
+ */
+ void stop() throws Exception;
+
+ /**
+ * Get metadata store.
+ *
+ * @return metadata store.
+ */
+ @VisibleForTesting
+ DBStore getStore();
+
+ /**
+ * A Table that keeps the deleted blocks lists and transactions.
+ * @return Table
+ */
+ Table<Long, DeletedBlocksTransaction> getDeletedBlocksTXTable();
+
+ /**
+ * Returns the current TXID for the deleted blocks.
+ * @return Long
+ */
+ Long getCurrentTXID();
+
+ /**
+ * Returns the next TXID for the Deleted Blocks.
+ * @return Long.
+ */
+ Long getNextDeleteBlockTXID();
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreRDBImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreRDBImpl.java
new file mode 100644
index 0000000..49a643d
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreRDBImpl.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.metadata;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import java.io.IOException;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.utils.db.DBStore;
+import org.apache.hadoop.utils.db.DBStoreBuilder;
+import org.apache.hadoop.utils.db.Table;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.utils.db.TableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.ozone.OzoneConsts.SCM_DB_NAME;
+
+/**
+ * A RocksDB based implementation of SCM Metadata Store.
+ * <p>
+ * <p>
+ * +---------------+------------+-------------------------+
+ * | Column Family | Key | Value |
+ * +---------------+------------+-------------------------+
+ * | DeletedBlocks | TXID(Long) | DeletedBlockTransaction |
+ * +---------------+------------+-------------------------+
+ */
+public class SCMMetadataStoreRDBImpl implements SCMMetadataStore {
+
+ private static final String DELETED_BLOCKS_TABLE = "deletedBlocks";
+ private Table deletedBlocksTable;
+
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SCMMetadataStoreRDBImpl.class);
+ private DBStore store;
+ private final OzoneConfiguration configuration;
+ private final AtomicLong txID;
+
+ /**
+ * Constructs the metadata store and starts the DB Services.
+ *
+ * @param config - Ozone Configuration.
+ * @throws IOException - on Failure.
+ */
+ public SCMMetadataStoreRDBImpl(OzoneConfiguration config) throws IOException {
+ this.configuration = config;
+ start(this.configuration);
+ this.txID = new AtomicLong(this.getLargestRecordedTXID());
+ }
+
+ @Override
+ public void start(OzoneConfiguration config)
+ throws IOException {
+ if (this.store == null) {
+ File metaDir = ServerUtils.getScmDbDir(configuration);
+
+ this.store = DBStoreBuilder.newBuilder(configuration)
+ .setName(SCM_DB_NAME)
+ .setPath(Paths.get(metaDir.getPath()))
+ .addTable(DELETED_BLOCKS_TABLE)
+ .addCodec(DeletedBlocksTransaction.class,
+ new DeletedBlocksTransactionCodec())
+ .addCodec(Long.class, new LongCodec())
+ .build();
+ deletedBlocksTable = this.store.getTable(DELETED_BLOCKS_TABLE,
+ Long.class, DeletedBlocksTransaction.class);
+ checkTableStatus(deletedBlocksTable, DELETED_BLOCKS_TABLE);
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ if (store != null) {
+ store.close();
+ store = null;
+ }
+ }
+
+ @Override
+ public org.apache.hadoop.utils.db.DBStore getStore() {
+ return this.store;
+ }
+
+ @Override
+ public Table<Long, DeletedBlocksTransaction> getDeletedBlocksTXTable() {
+ return deletedBlocksTable;
+ }
+
+ @Override
+ public Long getNextDeleteBlockTXID() {
+ return this.txID.incrementAndGet();
+ }
+
+ @Override
+ public Long getCurrentTXID() {
+ return this.txID.get();
+ }
+
+ /**
+ * Returns the largest recorded TXID from the DB.
+ *
+ * @return Long
+ * @throws IOException
+ */
+ private Long getLargestRecordedTXID() throws IOException {
+ try (TableIterator<Long, DeletedBlocksTransaction> txIter =
+ deletedBlocksTable.iterator()) {
+ txIter.seekToLast();
+ Long txid = txIter.key();
+ if (txid != null) {
+ return txid;
+ }
+ }
+ return 0L;
+ }
+
+
+ private void checkTableStatus(Table table, String name) throws IOException {
+ String logMessage = "Unable to get a reference to %s table. Cannot " +
+ "continue.";
+ String errMsg = "Inconsistent DB state, Table - %s. Please check the logs" +
+ "for more info.";
+ if (table == null) {
+ LOG.error(String.format(logMessage, name));
+ throw new IOException(String.format(errMsg, name));
+ }
+ }
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/package-info.java
similarity index 57%
copy from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
copy to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/package-info.java
index 43b4452..23e8aaa 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/package-info.java
@@ -15,31 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.hadoop.hdds.scm;
-
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
-import org.apache.hadoop.hdds.scm.chillmode.Precheck;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-
/**
- * SCM utility class.
+ * Metadata layer for SCM.
*/
-public final class ScmUtils {
-
- private ScmUtils() {
- }
-
- /**
- * Perform all prechecks for given scm operation.
- *
- * @param operation
- * @param preChecks prechecks to be performed
- */
- public static void preCheck(ScmOps operation, Precheck... preChecks)
- throws SCMException {
- for (Precheck preCheck : preChecks) {
- preCheck.check(operation);
- }
- }
-}
+package org.apache.hadoop.hdds.scm.metadata;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 3c5eaf8..16a10ac 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.hdds.protocol.proto
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
@@ -200,8 +199,8 @@ public class SCMNodeManager implements NodeManager {
return VersionResponse.newBuilder()
.setVersion(this.version.getVersion())
.addValue(OzoneConsts.SCM_ID,
- this.scmManager.getScmStorage().getScmId())
- .addValue(OzoneConsts.CLUSTER_ID, this.scmManager.getScmStorage()
+ this.scmManager.getScmStorageConfig().getScmId())
+ .addValue(OzoneConsts.CLUSTER_ID, this.scmManager.getScmStorageConfig()
.getClusterID())
.build();
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
index 8ba8b42..80056b5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
@@ -248,8 +248,8 @@ public class SCMBlockProtocolServer implements
try{
ScmInfo.Builder builder =
new ScmInfo.Builder()
- .setClusterId(scm.getScmStorage().getClusterID())
- .setScmId(scm.getScmStorage().getScmId());
+ .setClusterId(scm.getScmStorageConfig().getClusterID())
+ .setScmId(scm.getScmStorageConfig().getScmId());
return builder.build();
} catch (Exception ex) {
auditSuccess = false;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 998512c..0cb22ad 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -422,8 +422,8 @@ public class SCMClientProtocolServer implements
try{
ScmInfo.Builder builder =
new ScmInfo.Builder()
- .setClusterId(scm.getScmStorage().getClusterID())
- .setScmId(scm.getScmStorage().getScmId());
+ .setClusterId(scm.getScmStorageConfig().getClusterID())
+ .setScmId(scm.getScmStorageConfig().getScmId());
return builder.build();
} catch (Exception ex) {
auditSuccess = false;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java
new file mode 100644
index 0000000..bca9d57
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.server;
+
+
+import org.apache.hadoop.hdds.scm.block.BlockManager;
+import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateServer;
+
+/**
+ * This class acts as an SCM builder Class. This class is important for us
+ * from a resilience perspective of SCM. This class will allow us swap out
+ * different managers and replace with out on manager in the testing phase.
+ * <p>
+ * At some point in the future, we will make all these managers dynamically
+ * loadable, so other developers can extend SCM by replacing various managers.
+ * <p>
+ * TODO: Add different config keys, so that we can load different managers at
+ * run time. This will make it easy to extend SCM without having to replace
+ * whole SCM each time.
+ * <p>
+ * Different Managers supported by this builder are:
+ * NodeManager scmNodeManager;
+ * PipelineManager pipelineManager;
+ * ContainerManager containerManager;
+ * BlockManager scmBlockManager;
+ * ReplicationManager replicationManager;
+ * SCMChillModeManager scmChillModeManager;
+ * CertificateServer certificateServer;
+ * SCMMetadata scmMetadataStore.
+ *
+ * If any of these are *not* specified then the default version of these
+ * managers are used by SCM.
+ *
+ */
+public final class SCMConfigurator {
+ private NodeManager scmNodeManager;
+ private PipelineManager pipelineManager;
+ private ContainerManager containerManager;
+ private BlockManager scmBlockManager;
+ private ReplicationManager replicationManager;
+ private SCMChillModeManager scmChillModeManager;
+ private CertificateServer certificateServer;
+ private SCMMetadataStore metadataStore;
+
+ /**
+ * Allows user to specify a version of Node manager to use with this SCM.
+ * @param scmNodeManager - Node Manager.
+ */
+ public void setScmNodeManager(NodeManager scmNodeManager) {
+ this.scmNodeManager = scmNodeManager;
+ }
+
+ /**
+ * Allows user to specify a custom version of PipelineManager to use with
+ * this SCM.
+ * @param pipelineManager - Pipeline Manager.
+ */
+ public void setPipelineManager(PipelineManager pipelineManager) {
+ this.pipelineManager = pipelineManager;
+ }
+
+ /**
+ * Allows user to specify a custom version of containerManager to use with
+ * this SCM.
+ * @param containerManager - Container Manager.
+ */
+ public void setContainerManager(ContainerManager containerManager) {
+ this.containerManager = containerManager;
+ }
+
+ /**
+ * Allows user to specify a custom version of Block Manager to use with
+ * this SCM.
+ * @param scmBlockManager - Block Manager
+ */
+ public void setScmBlockManager(BlockManager scmBlockManager) {
+ this.scmBlockManager = scmBlockManager;
+ }
+
+ /**
+ * Allows user to specify a custom version of Replication Manager to use
+ * with this SCM.
+ * @param replicationManager - replication Manager.
+ */
+ public void setReplicationManager(ReplicationManager replicationManager) {
+ this.replicationManager = replicationManager;
+ }
+
+ /**
+ * Allows user to specify a custom version of Chill Mode Manager to use
+ * with this SCM.
+ * @param scmChillModeManager - ChillMode Manager.
+ */
+ public void setScmChillModeManager(SCMChillModeManager scmChillModeManager) {
+ this.scmChillModeManager = scmChillModeManager;
+ }
+
+ /**
+ * Allows user to specify a custom version of Certificate Server to use
+ * with this SCM.
+ * @param certificateAuthority - Certificate server.
+ */
+ public void setCertificateServer(CertificateServer certificateAuthority) {
+ this.certificateServer = certificateAuthority;
+ }
+
+ /**
+ * Allows user to specify a custom version of Metadata Store to be used
+ * with this SCM.
+ * @param scmMetadataStore - scm metadata store.
+ */
+ public void setMetadataStore(SCMMetadataStore scmMetadataStore) {
+ this.metadataStore = scmMetadataStore;
+ }
+
+ /**
+ * Gets SCM Node Manager.
+ * @return Node Manager.
+ */
+ public NodeManager getScmNodeManager() {
+ return scmNodeManager;
+ }
+
+ /**
+ * Get Pipeline Manager.
+ * @return pipeline manager.
+ */
+ public PipelineManager getPipelineManager() {
+ return pipelineManager;
+ }
+
+ /**
+ * Get Container Manager.
+ * @return container Manger.
+ */
+ public ContainerManager getContainerManager() {
+ return containerManager;
+ }
+
+ /**
+ * Get SCM Block Manager.
+ * @return Block Manager.
+ */
+ public BlockManager getScmBlockManager() {
+ return scmBlockManager;
+ }
+
+ /**
+ * Get Replica Manager.
+ * @return Replica Manager.
+ */
+ public ReplicationManager getReplicationManager() {
+ return replicationManager;
+ }
+
+ /**
+ * Gets Chill Mode Manager.
+ * @return Chill Mode manager.
+ */
+ public SCMChillModeManager getScmChillModeManager() {
+ return scmChillModeManager;
+ }
+
+ /**
+ * Get Certificate Manager.
+ * @return Certificate Manager.
+ */
+ public CertificateServer getCertificateServer() {
+ return certificateServer;
+ }
+
+ /**
+ * Get Metadata Store.
+ * @return SCMMetadataStore.
+ */
+ public SCMMetadataStore getMetadataStore() {
+ return metadataStore;
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorage.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorageConfig.java
similarity index 89%
rename from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorage.java
rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorageConfig.java
index 8a19850..73f9cbe 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorage.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorageConfig.java
@@ -30,16 +30,16 @@ import static org.apache.hadoop.ozone.OzoneConsts.SCM_ID;
import static org.apache.hadoop.ozone.OzoneConsts.STORAGE_DIR;
/**
- * SCMStorage is responsible for management of the StorageDirectories used by
- * the SCM.
+ * SCMStorageConfig is responsible for management of the
+ * StorageDirectories used by the SCM.
*/
-public class SCMStorage extends Storage {
+public class SCMStorageConfig extends Storage {
/**
- * Construct SCMStorage.
+ * Construct SCMStorageConfig.
* @throws IOException if any directories are inaccessible.
*/
- public SCMStorage(OzoneConfiguration conf) throws IOException {
+ public SCMStorageConfig(OzoneConfiguration conf) throws IOException {
super(NodeType.SCM, ServerUtils.getScmDbDir(conf), STORAGE_DIR);
}
@@ -70,4 +70,4 @@ public class SCMStorage extends Storage {
return scmProperties;
}
-}
\ No newline at end of file
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index bc81c84..3221128 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -27,6 +27,7 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.protobuf.BlockingService;
+import java.util.Objects;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsUtils;
@@ -58,6 +59,8 @@ import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
+import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
+import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreRDBImpl;
import org.apache.hadoop.hdds.scm.node.DeadNodeHandler;
import org.apache.hadoop.hdds.scm.node.NewNodeHandler;
import org.apache.hadoop.hdds.scm.node.NonHealthyToHealthyNodeHandler;
@@ -125,8 +128,7 @@ import static org.apache.hadoop.util.ExitUtil.terminate;
* and returns a pipeline.
*
* <p>A client once it gets a pipeline (a list of datanodes) will connect to
- * the datanodes and
- * create a container, which then can be used to store data.
+ * the datanodes and create a container, which then can be used to store data.
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
public final class StorageContainerManager extends ServiceRuntimeInfoImpl
@@ -158,16 +160,18 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
private final SCMDatanodeProtocolServer datanodeProtocolServer;
private final SCMBlockProtocolServer blockProtocolServer;
private final SCMClientProtocolServer clientProtocolServer;
- private final SCMSecurityProtocolServer securityProtocolServer;
+ private SCMSecurityProtocolServer securityProtocolServer;
/*
* State Managers of SCM.
*/
- private final NodeManager scmNodeManager;
- private final PipelineManager pipelineManager;
- private final ContainerManager containerManager;
- private final BlockManager scmBlockManager;
- private final SCMStorage scmStorage;
+ private NodeManager scmNodeManager;
+ private PipelineManager pipelineManager;
+ private ContainerManager containerManager;
+ private BlockManager scmBlockManager;
+ private final SCMStorageConfig scmStorageConfig;
+
+ private SCMMetadataStore scmMetadataStore;
private final EventQueue eventQueue;
/*
@@ -188,13 +192,13 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
*/
private Cache<String, ContainerStat> containerReportCache;
- private final ReplicationManager replicationManager;
+ private ReplicationManager replicationManager;
private final LeaseManager<Long> commandWatcherLeaseManager;
private final ReplicationActivityStatus replicationStatus;
- private final SCMChillModeManager scmChillModeManager;
- private final CertificateServer certificateServer;
+ private SCMChillModeManager scmChillModeManager;
+ private CertificateServer certificateServer;
private JvmPauseMonitor jvmPauseMonitor;
private final OzoneConfiguration configuration;
@@ -206,29 +210,54 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
*
* @param conf configuration
*/
- private StorageContainerManager(OzoneConfiguration conf)
+ public StorageContainerManager(OzoneConfiguration conf)
throws IOException, AuthenticationException {
+ // default empty configurator means default managers will be used.
+ this(conf, new SCMConfigurator());
+ }
+
+
+ /**
+ * This constructor offers finer control over how SCM comes up.
+ * To use this, user needs to create a SCMConfigurator and set various
+ * managers that user wants SCM to use, if a value is missing then SCM will
+ * use the default value for that manager.
+ *
+ * @param conf - Configuration
+ * @param configurator - configurator
+ */
+ public StorageContainerManager(OzoneConfiguration conf,
+ SCMConfigurator configurator)
+ throws IOException, AuthenticationException {
super(HddsVersionInfo.HDDS_VERSION_INFO);
+ Objects.requireNonNull(configurator, "configurator cannot not be null");
+ Objects.requireNonNull(conf, "configuration cannot not be null");
+
configuration = conf;
StorageContainerManager.initMetrics();
initContainerReportCache(conf);
- scmStorage = new SCMStorage(conf);
- if (scmStorage.getState() != StorageState.INITIALIZED) {
- throw new SCMException("SCM not initialized.", ResultCodes
- .SCM_NOT_INITIALIZED);
+ /**
+ * It is assumed the scm --init command creates the SCM Storage Config.
+ */
+ scmStorageConfig = new SCMStorageConfig(conf);
+ if (scmStorageConfig.getState() != StorageState.INITIALIZED) {
+ LOG.error("Please make sure you have run \'ozone scm --init\' " +
+ "command to generate all the required metadata.");
+ throw new SCMException("SCM not initialized due to storage config " +
+ "failure.", ResultCodes.SCM_NOT_INITIALIZED);
}
+ /**
+ * Important : This initialization sequence is assumed by some of our tests.
+ * The testSecureOzoneCluster assumes that security checks have to be
+ * passed before any artifacts like SCM DB is created. So please don't
+ * add any other initialization above the Security checks please.
+ */
+
// Authenticate SCM if security is enabled
if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
- loginAsSCMUser(conf);
- certificateServer = initializeCertificateServer(
- getScmStorage().getClusterID(), getScmStorage().getScmId());
- // TODO: Support Intermediary CAs in future.
- certificateServer.init(new SecurityConfig(conf),
- CertificateServer.CAType.SELF_SIGNED_CA);
- securityProtocolServer = new SCMSecurityProtocolServer(conf,
- certificateServer);
+ initializeCAnSecurityProtocol(conf, configurator);
} else {
// if no Security, we do not create a Certificate Server at all.
// This allows user to boot SCM without security temporarily
@@ -237,16 +266,16 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
securityProtocolServer = null;
}
- eventQueue = new EventQueue();
-
- scmNodeManager = new SCMNodeManager(
- conf, scmStorage.getClusterID(), this, eventQueue);
- pipelineManager = new SCMPipelineManager(conf, scmNodeManager, eventQueue);
- containerManager = new SCMContainerManager(
- conf, scmNodeManager, pipelineManager, eventQueue);
- scmBlockManager = new BlockManagerImpl(conf, scmNodeManager,
- pipelineManager, containerManager, eventQueue);
+ // Creates the SCM DBs or opens them if it exists.
+ initalizeMetadataStore(conf, configurator);
+ eventQueue = new EventQueue();
+ long watcherTimeout =
+ conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
+ HDDS_SCM_WATCHER_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
+ commandWatcherLeaseManager = new LeaseManager<>("CommandWatcher",
+ watcherTimeout);
+ initalizeSystemManagers(conf, configurator);
replicationStatus = new ReplicationActivityStatus();
CloseContainerEventHandler closeContainerHandler =
@@ -280,12 +309,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
PipelineActionHandler pipelineActionHandler =
new PipelineActionHandler(pipelineManager, conf);
- long watcherTimeout =
- conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
- HDDS_SCM_WATCHER_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
-
- commandWatcherLeaseManager = new LeaseManager<>("CommandWatcher",
- watcherTimeout);
RetriableDatanodeEventWatcher retriableDatanodeEventWatcher =
new RetriableDatanodeEventWatcher<>(
@@ -294,13 +317,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
commandWatcherLeaseManager);
retriableDatanodeEventWatcher.start(eventQueue);
- //TODO: support configurable containerPlacement policy
- ContainerPlacementPolicy containerPlacementPolicy =
- new SCMContainerPlacementCapacity(scmNodeManager, conf);
-
- replicationManager = new ReplicationManager(containerPlacementPolicy,
- containerManager, eventQueue, commandWatcherLeaseManager);
-
scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys
.OZONE_ADMINISTRATORS);
scmUsername = UserGroupInformation.getCurrentUser().getUserName();
@@ -342,13 +358,120 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
replicationStatus.getChillModeStatusListener());
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS,
(BlockManagerImpl) scmBlockManager);
- scmChillModeManager = new SCMChillModeManager(conf,
- containerManager.getContainers(), pipelineManager, eventQueue);
-
eventQueue.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
scmChillModeManager);
registerMXBean();
+ }
+
+ /**
+ * This function initializes the following managers. If the configurator
+ * specifies a value, we will use it, else we will use the default value.
+ *
+ * Node Manager
+ * Pipeline Manager
+ * Container Manager
+ * Block Manager
+ * Replication Manager
+ * Chill Mode Manager
+ *
+ * @param conf - Ozone Configuration.
+ * @param configurator - A customizer which allows different managers to be
+ * used if needed.
+ * @throws IOException - on Failure.
+ */
+ private void initalizeSystemManagers(OzoneConfiguration conf,
+ SCMConfigurator configurator)
+ throws IOException {
+ if(configurator.getScmNodeManager() != null) {
+ scmNodeManager = configurator.getScmNodeManager();
+ } else {
+ scmNodeManager = new SCMNodeManager(
+ conf, scmStorageConfig.getClusterID(), this, eventQueue);
+ }
+
+ //TODO: support configurable containerPlacement policy
+ ContainerPlacementPolicy containerPlacementPolicy =
+ new SCMContainerPlacementCapacity(scmNodeManager, conf);
+
+ if (configurator.getPipelineManager() != null) {
+ pipelineManager = configurator.getPipelineManager();
+ } else {
+ pipelineManager =
+ new SCMPipelineManager(conf, scmNodeManager, eventQueue);
+ }
+
+ if(configurator.getContainerManager() != null) {
+ containerManager = configurator.getContainerManager();
+ } else {
+ containerManager = new SCMContainerManager(
+ conf, scmNodeManager, pipelineManager, eventQueue);
+ }
+
+ if(configurator.getScmBlockManager() != null) {
+ scmBlockManager = configurator.getScmBlockManager();
+ } else {
+ scmBlockManager = new BlockManagerImpl(conf, this);
+ }
+ if (configurator.getReplicationManager() != null) {
+ replicationManager = configurator.getReplicationManager();
+ } else {
+ replicationManager = new ReplicationManager(containerPlacementPolicy,
+ containerManager, eventQueue, commandWatcherLeaseManager);
+ }
+ if(configurator.getScmChillModeManager() != null) {
+ scmChillModeManager = configurator.getScmChillModeManager();
+ } else {
+ scmChillModeManager = new SCMChillModeManager(conf,
+ containerManager.getContainers(), pipelineManager, eventQueue);
+ }
+ }
+
+ /**
+ * If security is enabled we need to have the Security Protocol and a
+ * default CA. This function initializes those values based on the
+ * configurator.
+ *
+ * @param conf - Config
+ * @param configurator - configurator
+ * @throws IOException - on Failure
+ * @throws AuthenticationException - on Failure
+ */
+ private void initializeCAnSecurityProtocol(OzoneConfiguration conf,
+ SCMConfigurator configurator)
+ throws IOException, AuthenticationException {
+ loginAsSCMUser(conf);
+ if(configurator.getCertificateServer() != null) {
+ this.certificateServer = configurator.getCertificateServer();
+ } else {
+ certificateServer = initializeCertificateServer(
+ getScmStorageConfig().getClusterID(),
+ getScmStorageConfig().getScmId());
+ }
+ // TODO: Support Intermediary CAs in future.
+ certificateServer.init(new SecurityConfig(conf),
+ CertificateServer.CAType.SELF_SIGNED_CA);
+ securityProtocolServer = new SCMSecurityProtocolServer(conf,
+ certificateServer);
+ }
+ /**
+ * Init the metadata store based on the configurator.
+ * @param conf - Config
+ * @param configurator - configurator
+ * @throws IOException - on Failure
+ */
+ private void initalizeMetadataStore(OzoneConfiguration conf,
+ SCMConfigurator configurator)
+ throws IOException {
+ if(configurator.getMetadataStore() != null) {
+ scmMetadataStore = configurator.getMetadataStore();
+ } else {
+ scmMetadataStore = new SCMMetadataStoreRDBImpl(conf);
+ if (scmMetadataStore == null) {
+ throw new SCMException("Unable to initialize metadata store",
+ ResultCodes.SCM_NOT_INITIALIZED);
+ }
+ }
}
/**
@@ -393,7 +516,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
// So it is easy to use different Certificate Servers if needed.
String subject = "scm@" + InetAddress.getLocalHost().getHostName();
return new DefaultCAServer(subject, clusterID, scmID);
-
}
/**
@@ -562,21 +684,21 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
* @throws IOException if init fails due to I/O error
*/
public static boolean scmInit(OzoneConfiguration conf) throws IOException {
- SCMStorage scmStorage = new SCMStorage(conf);
- StorageState state = scmStorage.getState();
+ SCMStorageConfig scmStorageConfig = new SCMStorageConfig(conf);
+ StorageState state = scmStorageConfig.getState();
if (state != StorageState.INITIALIZED) {
try {
String clusterId = StartupOption.INIT.getClusterId();
if (clusterId != null && !clusterId.isEmpty()) {
- scmStorage.setClusterId(clusterId);
+ scmStorageConfig.setClusterId(clusterId);
}
- scmStorage.initialize();
+ scmStorageConfig.initialize();
System.out.println(
"SCM initialization succeeded."
+ "Current cluster id for sd="
- + scmStorage.getStorageDir()
+ + scmStorageConfig.getStorageDir()
+ ";cid="
- + scmStorage.getClusterID());
+ + scmStorageConfig.getClusterID());
return true;
} catch (IOException ioe) {
LOG.error("Could not initialize SCM version file", ioe);
@@ -586,9 +708,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
System.out.println(
"SCM already initialized. Reusing existing"
+ " cluster id for sd="
- + scmStorage.getStorageDir()
+ + scmStorageConfig.getStorageDir()
+ ";cid="
- + scmStorage.getClusterID());
+ + scmStorageConfig.getClusterID());
return true;
}
}
@@ -649,8 +771,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
return metrics == null ? SCMMetrics.create() : metrics;
}
- public SCMStorage getScmStorage() {
- return scmStorage;
+ public SCMStorageConfig getScmStorageConfig() {
+ return scmStorageConfig;
}
public SCMDatanodeProtocolServer getDatanodeProtocolServer() {
@@ -878,6 +1000,12 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
}
IOUtils.cleanupWithLogger(LOG, containerManager);
IOUtils.cleanupWithLogger(LOG, pipelineManager);
+
+ try {
+ scmMetadataStore.stop();
+ } catch (Exception ex) {
+ LOG.error("SCM Metadata store stop failed", ex);
+ }
}
/**
@@ -1049,6 +1177,14 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
return nodeStateCount;
}
+
+ /**
+ * Returns the SCM metadata Store.
+ * @return SCMMetadataStore
+ */
+ public SCMMetadataStore getScmMetadataStore() {
+ return scmMetadataStore;
+ }
/**
* Startup options.
*/
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
index 35003c7..d010e2d 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hdds.protocol.proto
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer
.NodeRegistrationContainerReport;
-import org.apache.hadoop.hdds.scm.server.SCMStorage;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.common.Storage;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
@@ -78,7 +78,7 @@ public final class HddsTestUtils {
public static StorageContainerManager getScm(OzoneConfiguration conf)
throws IOException, AuthenticationException {
conf.setBoolean(OZONE_ENABLED, true);
- SCMStorage scmStore = new SCMStorage(conf);
+ SCMStorageConfig scmStore = new SCMStorageConfig(conf);
if(scmStore.getState() != Storage.StorageState.INITIALIZED) {
String clusterId = UUID.randomUUID().toString();
String scmId = UUID.randomUUID().toString();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index 0f9a5e4..b5d9e4b 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.hdds.scm;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineActionsProto;
@@ -48,8 +49,12 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.StorageTypeProto;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.common.Storage;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
import java.io.File;
import java.io.IOException;
@@ -59,6 +64,8 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
+
/**
* Stateless helper functions to handler scm/datanode connection.
*/
@@ -461,4 +468,20 @@ public final class TestUtils {
id, HddsProtos.LifeCycleEvent.QUASI_CLOSE);
}
+
+ public static StorageContainerManager getScm(OzoneConfiguration conf)
+ throws IOException, AuthenticationException {
+ conf.setBoolean(OZONE_ENABLED, true);
+ SCMStorageConfig scmStore = new SCMStorageConfig(conf);
+ if(scmStore.getState() != Storage.StorageState.INITIALIZED) {
+ String clusterId = UUID.randomUUID().toString();
+ String scmId = UUID.randomUUID().toString();
+ scmStore.setClusterId(clusterId);
+ scmStore.setScmId(scmId);
+ // writes the version file properties
+ scmStore.initialize();
+ }
+ return StorageContainerManager.createSCM(null, conf);
+ }
+
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index 7be1b17..f7e171c 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -17,24 +17,25 @@
package org.apache.hadoop.hdds.scm.block;
+import java.io.File;
+import java.io.IOException;
import java.util.UUID;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
-import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import org.apache.hadoop.hdds.scm.server.SCMStorage;
+import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -49,11 +50,7 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.util.concurrent.TimeoutException;
+import org.junit.rules.TemporaryFolder;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
import static org.apache.hadoop.ozone.OzoneConsts.GB;
@@ -64,6 +61,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.MB;
* Tests for SCM Block Manager.
*/
public class TestBlockManager implements EventHandler<Boolean> {
+ private StorageContainerManager scm;
private SCMContainerManager mapping;
private MockNodeManager nodeManager;
private PipelineManager pipelineManager;
@@ -75,11 +73,14 @@ public class TestBlockManager implements EventHandler<Boolean> {
private static String containerOwner = "OZONE";
private static EventQueue eventQueue;
private int numContainerPerOwnerInPipeline;
- private Configuration conf;
+ private OzoneConfiguration conf;
@Rule
public ExpectedException thrown = ExpectedException.none();
+ @Rule
+ public TemporaryFolder folder= new TemporaryFolder();
+
@Before
public void setUp() throws Exception {
conf = SCMTestUtils.getConf();
@@ -87,24 +88,23 @@ public class TestBlockManager implements EventHandler<Boolean> {
ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);
- String path = GenericTestUtils
- .getTempPath(TestBlockManager.class.getSimpleName());
- testDir = Paths.get(path).toFile();
- testDir.delete();
- conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, path);
- eventQueue = new EventQueue();
- boolean folderExisted = testDir.exists() || testDir.mkdirs();
- if (!folderExisted) {
- throw new IOException("Unable to create test directory path");
- }
+
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, folder.newFolder().toString());
+
+ // Override the default Node Manager in SCM with this Mock Node Manager.
nodeManager = new MockNodeManager(true, 10);
- pipelineManager =
- new SCMPipelineManager(conf, nodeManager, eventQueue);
- mapping = new SCMContainerManager(conf, nodeManager, pipelineManager,
- eventQueue);
- blockManager = new BlockManagerImpl(conf,
- nodeManager, pipelineManager, mapping, eventQueue);
- eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, blockManager);
+ SCMConfigurator configurator = new SCMConfigurator();
+ configurator.setScmNodeManager(nodeManager);
+ scm = getScm(conf, configurator);
+
+ // Initialize these fields so that the tests can pass.
+ mapping = (SCMContainerManager) scm.getContainerManager();
+ pipelineManager = scm.getPipelineManager();
+ blockManager = (BlockManagerImpl) scm.getScmBlockManager();
+
+ eventQueue = new EventQueue();
+ eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS,
+ (BlockManagerImpl) scm.getScmBlockManager());
eventQueue.addHandler(SCMEvents.START_REPLICATION, this);
CloseContainerEventHandler closeContainerHandler =
new CloseContainerEventHandler(pipelineManager, mapping);
@@ -121,16 +121,14 @@ public class TestBlockManager implements EventHandler<Boolean> {
@After
public void cleanup() throws IOException {
- blockManager.close();
- pipelineManager.close();
- mapping.close();
- FileUtil.fullyDelete(testDir);
+ scm.stop();
}
- private static StorageContainerManager getScm(OzoneConfiguration conf)
+ private static StorageContainerManager getScm(OzoneConfiguration conf,
+ SCMConfigurator configurator)
throws IOException, AuthenticationException {
conf.setBoolean(OZONE_ENABLED, true);
- SCMStorage scmStore = new SCMStorage(conf);
+ SCMStorageConfig scmStore = new SCMStorageConfig(conf);
if(scmStore.getState() != StorageState.INITIALIZED) {
String clusterId = UUID.randomUUID().toString();
String scmId = UUID.randomUUID().toString();
@@ -139,7 +137,7 @@ public class TestBlockManager implements EventHandler<Boolean> {
// writes the version file properties
scmStore.initialize();
}
- return StorageContainerManager.createSCM(null, conf);
+ return new StorageContainerManager(conf, configurator);
}
@Test
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index 48949be..95c0cd2 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -42,7 +44,8 @@ import org.apache.hadoop.hdds.protocol.proto
.DeleteBlockTransactionResult;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.utils.MetadataKeyFilters;
-import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.db.Table;
+import org.apache.hadoop.utils.db.TableIterator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -62,10 +65,12 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.when;
@@ -78,6 +83,7 @@ public class TestDeletedBlockLog {
private OzoneConfiguration conf;
private File testDir;
private ContainerManager containerManager;
+ private StorageContainerManager scm;
private List<DatanodeDetails> dnList;
@Before
@@ -85,10 +91,13 @@ public class TestDeletedBlockLog {
testDir = GenericTestUtils.getTestDir(
TestDeletedBlockLog.class.getSimpleName());
conf = new OzoneConfiguration();
+ conf.set(OZONE_ENABLED, "true");
conf.setInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ scm = TestUtils.getScm(conf);
containerManager = Mockito.mock(SCMContainerManager.class);
- deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager);
+ deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager,
+ scm.getScmMetadataStore());
dnList = new ArrayList<>(3);
setupContainerManager();
}
@@ -126,6 +135,8 @@ public class TestDeletedBlockLog {
@After
public void tearDown() throws Exception {
deletedBlockLog.close();
+ scm.stop();
+ scm.join();
FileUtils.deleteDirectory(testDir);
}
@@ -263,7 +274,6 @@ public class TestDeletedBlockLog {
MetadataKeyFilters.MetadataKeyFilter avoidLatestTxid =
(preKey, currentKey, nextKey) ->
!Arrays.equals(latestTxid, currentKey);
- MetadataStore store = deletedBlockLog.getDeletedStore();
// Randomly add/get/commit/increase transactions.
for (int i = 0; i < 100; i++) {
int state = random.nextInt(4);
@@ -286,9 +296,13 @@ public class TestDeletedBlockLog {
blocks = new ArrayList<>();
} else {
// verify the number of added and committed.
- List<Map.Entry<byte[], byte[]>> result =
- store.getRangeKVs(null, added, avoidLatestTxid);
- Assert.assertEquals(added, result.size() + committed);
+ try (TableIterator<Long,
+ ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
+ scm.getScmMetadataStore().getDeletedBlocksTXTable().iterator()) {
+ AtomicInteger count = new AtomicInteger();
+ iter.forEachRemaining((keyValue) -> count.incrementAndGet());
+ Assert.assertEquals(added, count.get() + committed);
+ }
}
}
blocks = getTransactions(1000);
@@ -303,7 +317,8 @@ public class TestDeletedBlockLog {
// close db and reopen it again to make sure
// transactions are stored persistently.
deletedBlockLog.close();
- deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager);
+ deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager,
+ scm.getScmMetadataStore());
List<DeletedBlocksTransaction> blocks =
getTransactions(10);
commitTransactions(blocks);
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index b0da150..ef0e88c 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -23,13 +23,11 @@ import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
-import java.util.Collection;
import java.util.Optional;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.scm.ScmUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@@ -117,33 +115,11 @@ public final class OmUtils {
* Get the location where OM should store its metadata directories.
* Fall back to OZONE_METADATA_DIRS if not defined.
*
- * @param conf
- * @return
+ * @param conf - Config
+ * @return File path, after creating all the required Directories.
*/
public static File getOmDbDir(Configuration conf) {
- final Collection<String> dbDirs = conf.getTrimmedStringCollection(
- OMConfigKeys.OZONE_OM_DB_DIRS);
-
- if (dbDirs.size() > 1) {
- throw new IllegalArgumentException(
- "Bad configuration setting " + OMConfigKeys.OZONE_OM_DB_DIRS +
- ". OM does not support multiple metadata dirs currently.");
- }
-
- if (dbDirs.size() == 1) {
- final File dbDirPath = new File(dbDirs.iterator().next());
- if (!dbDirPath.exists() && !dbDirPath.mkdirs()) {
- throw new IllegalArgumentException("Unable to create directory " +
- dbDirPath + " specified in configuration setting " +
- OMConfigKeys.OZONE_OM_DB_DIRS);
- }
- return dbDirPath;
- }
-
- LOG.warn("{} is not configured. We recommend adding this setting. " +
- "Falling back to {} instead.",
- OMConfigKeys.OZONE_OM_DB_DIRS, HddsConfigKeys.OZONE_METADATA_DIRS);
- return ServerUtils.getOzoneMetaDirPath(conf);
+ return ScmUtils.getDBPath(conf, OMConfigKeys.OZONE_OM_DB_DIRS);
}
/**
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index c21c383..ab322a2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ipc.Client;
@@ -40,7 +41,6 @@ import org.apache.hadoop.ozone.common.Storage.StorageState;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.hdds.scm.server.SCMStorage;
import org.apache.hadoop.ozone.om.OMStorage;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.protocolPB
@@ -437,12 +437,13 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
private StorageContainerManager createSCM()
throws IOException, AuthenticationException {
configureSCM();
- SCMStorage scmStore = new SCMStorage(conf);
+ SCMStorageConfig scmStore = new SCMStorageConfig(conf);
initializeScmStorage(scmStore);
return StorageContainerManager.createSCM(null, conf);
}
- private void initializeScmStorage(SCMStorage scmStore) throws IOException {
+ private void initializeScmStorage(SCMStorageConfig scmStore)
+ throws IOException {
if (scmStore.getState() == StorageState.INITIALIZED) {
return;
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
index d54da2b..29264c0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
@@ -36,7 +36,6 @@ import java.security.PrivilegedExceptionAction;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Callable;
-import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@@ -45,7 +44,7 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmInfo;
-import org.apache.hadoop.hdds.scm.server.SCMStorage;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
@@ -78,6 +77,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -114,6 +114,8 @@ public final class TestSecureOzoneCluster {
private OzoneManagerProtocolClientSideTranslatorPB omClient;
private KeyPair keyPair;
private Path metaDirPath;
+ @Rule
+ public TemporaryFolder folder= new TemporaryFolder();
@Before
public void init() {
@@ -121,8 +123,7 @@ public final class TestSecureOzoneCluster {
conf = new OzoneConfiguration();
conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "localhost");
DefaultMetricsSystem.setMiniClusterMode(true);
- final String path = GenericTestUtils
- .getTempPath(UUID.randomUUID().toString());
+ final String path = folder.newFolder().toString();
metaDirPath = Paths.get(path, "om-meta");
conf.set(OZONE_METADATA_DIRS, metaDirPath.toString());
startMiniKdc();
@@ -149,23 +150,22 @@ public final class TestSecureOzoneCluster {
if (omClient != null) {
omClient.close();
}
- FileUtils.deleteQuietly(metaDirPath.toFile());
} catch (Exception e) {
logger.error("Failed to stop TestSecureOzoneCluster", e);
}
}
- private void createCredentialsInKDC(Configuration conf, MiniKdc miniKdc)
- throws Exception {
+ private void createCredentialsInKDC(Configuration configuration,
+ MiniKdc kdc) throws Exception {
createPrincipal(scmKeytab,
- conf.get(ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY));
+ configuration.get(ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY));
createPrincipal(spnegoKeytab,
- conf.get(ScmConfigKeys
+ configuration.get(ScmConfigKeys
.HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY));
- conf.get(OMConfigKeys
+ configuration.get(OMConfigKeys
.OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY);
createPrincipal(omKeyTab,
- conf.get(OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY));
+ configuration.get(OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY));
}
private void createPrincipal(File keytab, String... principal)
@@ -185,37 +185,39 @@ public final class TestSecureOzoneCluster {
miniKdc.stop();
}
- private void setSecureConfig(Configuration conf) throws IOException {
- conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
- conf.setBoolean(OZONE_ENABLED, true);
- String host = InetAddress.getLocalHost().getCanonicalHostName();
+ private void setSecureConfig(Configuration configuration) throws IOException {
+ configuration.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
+ configuration.setBoolean(OZONE_ENABLED, true);
+ String host = InetAddress.getLocalHost().getCanonicalHostName()
+ .toLowerCase();
String realm = miniKdc.getRealm();
curUser = UserGroupInformation.getCurrentUser()
.getUserName();
- conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ configuration.set(
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
- conf.set(OZONE_ADMINISTRATORS, curUser);
+ configuration.set(OZONE_ADMINISTRATORS, curUser);
- conf.set(ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY,
+ configuration.set(ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY,
"scm/" + host + "@" + realm);
- conf.set(ScmConfigKeys.HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY,
+ configuration.set(ScmConfigKeys.HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY,
"HTTP_SCM/" + host + "@" + realm);
- conf.set(OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY,
+ configuration.set(OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY,
"om/" + host + "@" + realm);
- conf.set(OMConfigKeys.OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY,
+ configuration.set(OMConfigKeys.OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY,
"HTTP_OM/" + host + "@" + realm);
scmKeytab = new File(workDir, "scm.keytab");
spnegoKeytab = new File(workDir, "http.keytab");
omKeyTab = new File(workDir, "om.keytab");
- conf.set(ScmConfigKeys.HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY,
+ configuration.set(ScmConfigKeys.HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY,
scmKeytab.getAbsolutePath());
- conf.set(
+ configuration.set(
ScmConfigKeys.HDDS_SCM_HTTP_KERBEROS_KEYTAB_FILE_KEY,
spnegoKeytab.getAbsolutePath());
- conf.set(OMConfigKeys.OZONE_OM_KERBEROS_KEYTAB_FILE_KEY,
+ configuration.set(OMConfigKeys.OZONE_OM_KERBEROS_KEYTAB_FILE_KEY,
omKeyTab.getAbsolutePath());
conf.set(OMConfigKeys.OZONE_OM_HTTP_KERBEROS_KEYTAB_FILE,
spnegoKeytab.getAbsolutePath());
@@ -239,12 +241,15 @@ public final class TestSecureOzoneCluster {
scmId = UUID.randomUUID().toString();
omId = UUID.randomUUID().toString();
- final String path = GenericTestUtils
- .getTempPath(UUID.randomUUID().toString());
+ final String path = folder.newFolder().toString();
Path scmPath = Paths.get(path, "scm-meta");
+ File temp = scmPath.toFile();
+ if(!temp.exists()) {
+ temp.mkdirs();
+ }
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
- SCMStorage scmStore = new SCMStorage(conf);
+ SCMStorageConfig scmStore = new SCMStorageConfig(conf);
scmStore.setClusterId(clusterId);
scmStore.setScmId(scmId);
// writes the version file properties
@@ -586,5 +591,4 @@ public final class TestSecureOzoneCluster {
CertificateClient certClient = new CertificateClientTestImpl(config);
om.setCertClient(certClient);
}
-
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 0a12deb..a0c58db 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -51,7 +51,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
-import org.apache.hadoop.hdds.scm.server.SCMStorage;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager.StartupOption;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
@@ -398,7 +398,7 @@ public class TestStorageContainerManager {
// This will initialize SCM
StorageContainerManager.scmInit(conf);
- SCMStorage scmStore = new SCMStorage(conf);
+ SCMStorageConfig scmStore = new SCMStorageConfig(conf);
Assert.assertEquals(NodeType.SCM, scmStore.getNodeType());
Assert.assertEquals("testClusterId", scmStore.getClusterID());
StartupOption.INIT.setClusterId("testClusterIdNew");
@@ -422,7 +422,7 @@ public class TestStorageContainerManager {
StartupOption.INIT.setClusterId("testClusterId");
// This will initialize SCM
StorageContainerManager.scmInit(conf);
- SCMStorage scmStore = new SCMStorage(conf);
+ SCMStorageConfig scmStore = new SCMStorageConfig(conf);
Assert.assertEquals(NodeType.SCM, scmStore.getNodeType());
Assert.assertNotEquals("testClusterId", scmStore.getClusterID());
cluster.shutdown();
@@ -438,7 +438,8 @@ public class TestStorageContainerManager {
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
exception.expect(SCMException.class);
- exception.expectMessage("SCM not initialized.");
+ exception.expectMessage(
+ "SCM not initialized due to storage config failure");
StorageContainerManager.createSCM(null, conf);
}
@@ -463,7 +464,7 @@ public class TestStorageContainerManager {
Path scmPath = Paths.get(path, "scm-meta");
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
- SCMStorage scmStore = new SCMStorage(conf);
+ SCMStorageConfig scmStore = new SCMStorageConfig(conf);
String clusterId = UUID.randomUUID().toString();
String scmId = UUID.randomUUID().toString();
scmStore.setClusterId(clusterId);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index fa862a3..78f7d29 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -220,9 +220,9 @@ public class TestBlockDeletion {
private void verifyTransactionsCommitted() throws IOException {
DeletedBlockLogImpl deletedBlockLog =
(DeletedBlockLogImpl) scm.getScmBlockManager().getDeletedBlockLog();
- for (int txnID = 1; txnID <= maxTransactionId; txnID++) {
+ for (long txnID = 1; txnID <= maxTransactionId; txnID++) {
Assert.assertNull(
- deletedBlockLog.getDeletedStore().get(Longs.toByteArray(txnID)));
+ scm.getScmMetadataStore().getDeletedBlocksTXTable().get(txnID));
}
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java
index 45231f1..140f91c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java
@@ -21,6 +21,7 @@ import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
import org.apache.hadoop.net.NetUtils;
@@ -31,7 +32,6 @@ import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.client.rest.OzoneException;
import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.hdds.scm.server.SCMStorage;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import org.apache.hadoop.ozone.protocol.proto
@@ -1311,11 +1311,11 @@ public class TestOzoneManager {
public void testOmInitialization() throws IOException {
// Read the version file info from OM version file
OMStorage omStorage = cluster.getOzoneManager().getOmStorage();
- SCMStorage scmStorage = new SCMStorage(conf);
+ SCMStorageConfig scmStorageConfig = new SCMStorageConfig(conf);
// asserts whether cluster Id and SCM ID are properly set in SCM Version
// file.
- Assert.assertEquals(clusterId, scmStorage.getClusterID());
- Assert.assertEquals(scmId, scmStorage.getScmId());
+ Assert.assertEquals(clusterId, scmStorageConfig.getClusterID());
+ Assert.assertEquals(scmId, scmStorageConfig.getScmId());
// asserts whether OM Id is properly set in OM Version file.
Assert.assertEquals(omId, omStorage.getOmId());
// asserts whether the SCM info is correct in OM Version file.
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java
index ea0b2a1..cf25693 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java
@@ -127,8 +127,8 @@ public class TestContainerSQLCli {
new SCMPipelineManager(conf, nodeManager, eventQueue);
containerManager = new SCMContainerManager(conf, nodeManager,
pipelineManager, eventQueue);
- blockManager = new BlockManagerImpl(
- conf, nodeManager, pipelineManager, containerManager, eventQueue);
+ blockManager =
+ new BlockManagerImpl(conf, cluster.getStorageContainerManager());
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, blockManager);
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, false);
GenericTestUtils.waitFor(() -> {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/scm/package-info.java
similarity index 52%
copy from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
copy to hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/scm/package-info.java
index 43b4452..291fcd9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/scm/package-info.java
@@ -5,7 +5,7 @@
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -14,32 +14,9 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
*/
-
-package org.apache.hadoop.hdds.scm;
-
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
-import org.apache.hadoop.hdds.scm.chillmode.Precheck;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-
/**
- * SCM utility class.
+ * A tool to convert Ozone manager Metadata to SQL DB.
*/
-public final class ScmUtils {
-
- private ScmUtils() {
- }
-
- /**
- * Perform all prechecks for given scm operation.
- *
- * @param operation
- * @param preChecks prechecks to be performed
- */
- public static void preCheck(ScmOps operation, Precheck... preChecks)
- throws SCMException {
- for (Precheck preCheck : preChecks) {
- preCheck.check(operation);
- }
- }
-}
+package org.apache.hadoop.ozone.scm;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org