You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ww...@apache.org on 2017/07/16 01:34:34 UTC
[2/2] hadoop git commit: HDFS-12069. Ozone: Create a general
abstraction for metadata store. Contributed by Weiwei Yang.
HDFS-12069. Ozone: Create a general abstraction for metadata store. Contributed by Weiwei Yang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8f122a75
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8f122a75
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8f122a75
Branch: refs/heads/HDFS-7240
Commit: 8f122a75054163a2bb0bb8310c2b75016fdf1c3e
Parents: 90f1d58
Author: Weiwei Yang <ww...@apache.org>
Authored: Sun Jul 16 09:34:02 2017 +0800
Committer: Weiwei Yang <ww...@apache.org>
Committed: Sun Jul 16 09:34:02 2017 +0800
----------------------------------------------------------------------
.../apache/hadoop/ozone/OzoneConfigKeys.java | 9 +
.../common/helpers/ContainerUtils.java | 16 +-
.../container/common/helpers/KeyUtils.java | 20 +-
.../common/impl/ContainerManagerImpl.java | 6 +-
.../container/common/impl/KeyManagerImpl.java | 57 ++--
.../container/common/interfaces/KeyManager.java | 13 +-
.../container/common/utils/ContainerCache.java | 14 +-
.../hadoop/ozone/ksm/MetadataManager.java | 27 +-
.../hadoop/ozone/ksm/MetadataManagerImpl.java | 146 ++++-----
.../hadoop/ozone/ksm/VolumeManagerImpl.java | 50 ++--
.../ozone/scm/block/BlockManagerImpl.java | 67 +++--
.../org/apache/hadoop/ozone/scm/cli/SQLCLI.java | 112 +++----
.../ozone/scm/container/ContainerMapping.java | 19 +-
.../ContainerReplicationManager.java | 14 +-
.../ozone/scm/exceptions/SCMException.java | 3 +-
.../hadoop/ozone/scm/node/NodePoolManager.java | 5 +-
.../hadoop/ozone/scm/node/SCMNodeManager.java | 14 +-
.../ozone/scm/node/SCMNodePoolManager.java | 53 ++--
.../web/localstorage/OzoneMetadataManager.java | 133 ++++-----
.../org/apache/hadoop/utils/BatchOperation.java | 90 ++++++
.../org/apache/hadoop/utils/EntryConsumer.java | 38 +++
.../apache/hadoop/utils/LevelDBKeyFilters.java | 65 ----
.../org/apache/hadoop/utils/LevelDBStore.java | 182 +++++++-----
.../apache/hadoop/utils/MetadataKeyFilters.java | 65 ++++
.../org/apache/hadoop/utils/MetadataStore.java | 152 ++++++++++
.../hadoop/utils/MetadataStoreBuilder.java | 96 ++++++
.../src/main/resources/ozone-default.xml | 11 +
.../apache/hadoop/ozone/TestLevelDBStore.java | 165 -----------
.../apache/hadoop/ozone/TestMetadataStore.java | 296 +++++++++++++++++++
.../common/impl/TestContainerPersistence.java | 12 +-
.../TestContainerReplicationManager.java | 5 +-
.../hadoop/ozone/ksm/TestKeySpaceManager.java | 2 +
32 files changed, 1247 insertions(+), 710 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index cebb1b0..1822a2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -61,6 +61,15 @@ public final class OzoneConfigKeys {
public static final String OZONE_CONTAINER_METADATA_DIRS =
"ozone.container.metadata.dirs";
+ public static final String OZONE_METADATA_STORE_IMPL =
+ "ozone.metastore.impl";
+ public static final String OZONE_METADATA_STORE_IMPL_LEVELDB =
+ "LevelDB";
+ public static final String OZONE_METADATA_STORE_IMPL_ROCKSDB =
+ "RocksDB";
+ public static final String OZONE_METADATA_STORE_IMPL_DEFAULT =
+ OZONE_METADATA_STORE_IMPL_LEVELDB;
+
public static final String OZONE_KEY_CACHE = "ozone.key.cache.size";
public static final int OZONE_KEY_CACHE_DEFAULT = 1024;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
index 7d0e756..0a360e2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
@@ -27,8 +27,9 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
-import org.apache.hadoop.utils.LevelDBStore;
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -233,7 +234,8 @@ public final class ContainerUtils {
* @param containerPath - Container Path.
* @throws IOException
*/
- public static Path createMetadata(Path containerPath) throws IOException {
+ public static Path createMetadata(Path containerPath, Configuration conf)
+ throws IOException {
Logger log = LoggerFactory.getLogger(ContainerManagerImpl.class);
Preconditions.checkNotNull(containerPath);
Path metadataPath = containerPath.resolve(OzoneConsts.CONTAINER_META_PATH);
@@ -243,9 +245,11 @@ public final class ContainerUtils {
throw new IOException("Unable to create directory for metadata storage." +
" Path: " + metadataPath);
}
- LevelDBStore store =
- new LevelDBStore(metadataPath.resolve(OzoneConsts.CONTAINER_DB)
- .toFile(), true);
+ MetadataStore store = MetadataStoreBuilder.newBuilder()
+ .setConf(conf)
+ .setCreateIfMissing(true)
+ .setDbFile(metadataPath.resolve(OzoneConsts.CONTAINER_DB).toFile())
+ .build();
// we close since the SCM pre-creates containers.
// we will open and put Db handle into a cache when keys are being created
@@ -347,7 +351,7 @@ public final class ContainerUtils {
Preconditions.checkNotNull(containerData);
Path dbPath = Paths.get(containerData.getDBPath());
- LevelDBStore db = KeyUtils.getDB(containerData, conf);
+ MetadataStore db = KeyUtils.getDB(containerData, conf);
// If the container is not empty and cannot be deleted forcibly,
// then throw a SCE to stop deleting.
if(!forceDelete && !db.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
index 4eb8999..1a36b71 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
@@ -23,7 +23,8 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.impl.KeyManagerImpl;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
-import org.apache.hadoop.utils.LevelDBStore;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,20 +58,21 @@ public final class KeyUtils {
*
* @param container container.
* @param conf configuration.
- * @return LevelDB handle.
+ * @return MetadataStore handle.
* @throws StorageContainerException
*/
- public static LevelDBStore getDB(ContainerData container,
+ public static MetadataStore getDB(ContainerData container,
Configuration conf) throws StorageContainerException {
Preconditions.checkNotNull(container);
ContainerCache cache = ContainerCache.getInstance(conf);
Preconditions.checkNotNull(cache);
try {
- LevelDBStore db = cache.getDB(container.getContainerName());
+ MetadataStore db = cache.getDB(container.getContainerName());
if (db == null) {
- db = new LevelDBStore(
- new File(container.getDBPath()),
- false);
+ db = MetadataStoreBuilder.newBuilder()
+ .setDbFile(new File(container.getDBPath()))
+ .setCreateIfMissing(false)
+ .build();
cache.putDB(container.getContainerName(), db);
}
return db;
@@ -103,10 +105,10 @@ public final class KeyUtils {
@SuppressWarnings("unchecked")
public static void shutdownCache(ContainerCache cache) {
Logger log = LoggerFactory.getLogger(KeyManagerImpl.class);
- LevelDBStore[] handles = new LevelDBStore[cache.values().size()];
+ MetadataStore[] handles = new MetadataStore[cache.values().size()];
cache.values().toArray(handles);
Preconditions.checkState(handles.length == cache.values().size());
- for (LevelDBStore db : handles) {
+ for (MetadataStore db : handles) {
try {
db.close();
} catch (IOException ex) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
index f3e010d..b77ac55 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
@@ -41,7 +41,7 @@ import org.apache.hadoop.ozone.container.common.interfaces
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.utils.LevelDBStore;
+import org.apache.hadoop.utils.MetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -335,7 +335,7 @@ public class ContainerManagerImpl implements ContainerManager {
ContainerUtils.verifyIsNewContainer(containerFile, metadataFile);
metadataPath = this.locationManager.getDataPath(
containerData.getContainerName());
- metadataPath = ContainerUtils.createMetadata(metadataPath);
+ metadataPath = ContainerUtils.createMetadata(metadataPath, conf);
} else {
metadataPath = ContainerUtils.getMetadataDirectory(containerData);
}
@@ -502,7 +502,7 @@ public class ContainerManagerImpl implements ContainerManager {
ContainerData containerData = readContainer(containerName);
containerData.closeContainer();
writeContainerInfo(containerData, true);
- LevelDBStore db = KeyUtils.getDB(containerData, conf);
+ MetadataStore db = KeyUtils.getDB(containerData, conf);
// It is ok if this operation takes a bit of time.
// Close container is not expected to be instantaneous.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java
index 2b64b71..7fff0a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java
@@ -30,8 +30,9 @@ import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
-import org.apache.hadoop.utils.LevelDBKeyFilters.KeyPrefixFilter;
-import org.apache.hadoop.utils.LevelDBStore;
+import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
+import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
+import org.apache.hadoop.utils.MetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,11 +42,8 @@ import java.util.List;
import java.util.Map;
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
- .Result.IO_EXCEPTION;
-import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.Result.NO_SUCH_KEY;
-
/**
* Key Manager impl.
*/
@@ -74,8 +72,7 @@ public class KeyManagerImpl implements KeyManager {
* {@inheritDoc}
*/
@Override
- public void putKey(Pipeline pipeline, KeyData data)
- throws StorageContainerException {
+ public void putKey(Pipeline pipeline, KeyData data) throws IOException {
containerManager.readLock();
try {
// We are not locking the key manager since LevelDb serializes all actions
@@ -85,7 +82,7 @@ public class KeyManagerImpl implements KeyManager {
"Container name cannot be null");
ContainerData cData = containerManager.readContainer(
pipeline.getContainerName());
- LevelDBStore db = KeyUtils.getDB(cData, conf);
+ MetadataStore db = KeyUtils.getDB(cData, conf);
// This is a post condition that acts as a hint to the user.
// Should never fail.
@@ -102,7 +99,7 @@ public class KeyManagerImpl implements KeyManager {
* {@inheritDoc}
*/
@Override
- public KeyData getKey(KeyData data) throws StorageContainerException {
+ public KeyData getKey(KeyData data) throws IOException {
containerManager.readLock();
try {
Preconditions.checkNotNull(data, "Key data cannot be null");
@@ -110,7 +107,7 @@ public class KeyManagerImpl implements KeyManager {
"Container name cannot be null");
ContainerData cData = containerManager.readContainer(data
.getContainerName());
- LevelDBStore db = KeyUtils.getDB(cData, conf);
+ MetadataStore db = KeyUtils.getDB(cData, conf);
// This is a post condition that acts as a hint to the user.
// Should never fail.
@@ -124,8 +121,6 @@ public class KeyManagerImpl implements KeyManager {
ContainerProtos.KeyData keyData =
ContainerProtos.KeyData.parseFrom(kData);
return KeyData.getFromProtoBuf(keyData);
- } catch (IOException ex) {
- throw new StorageContainerException(ex, IO_EXCEPTION);
} finally {
containerManager.readUnlock();
}
@@ -136,7 +131,7 @@ public class KeyManagerImpl implements KeyManager {
*/
@Override
public void deleteKey(Pipeline pipeline, String keyName)
- throws StorageContainerException {
+ throws IOException {
containerManager.readLock();
try {
Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
@@ -144,7 +139,7 @@ public class KeyManagerImpl implements KeyManager {
"Container name cannot be null");
ContainerData cData = containerManager.readContainer(pipeline
.getContainerName());
- LevelDBStore db = KeyUtils.getDB(cData, conf);
+ MetadataStore db = KeyUtils.getDB(cData, conf);
// This is a post condition that acts as a hint to the user.
// Should never fail.
@@ -171,32 +166,28 @@ public class KeyManagerImpl implements KeyManager {
@Override
public List<KeyData> listKey(
Pipeline pipeline, String prefix, String startKey, int count)
- throws StorageContainerException {
+ throws IOException {
Preconditions.checkNotNull(pipeline,
"Pipeline cannot be null.");
Preconditions.checkArgument(count > 0,
"Count must be a positive number.");
ContainerData cData = containerManager.readContainer(pipeline
.getContainerName());
- LevelDBStore db = KeyUtils.getDB(cData, conf);
- try {
- List<KeyData> result = new ArrayList<KeyData>();
- byte[] startKeyInBytes = startKey == null ? null :
- DFSUtil.string2Bytes(startKey);
- KeyPrefixFilter prefixFilter = new KeyPrefixFilter(prefix);
- List<Map.Entry<byte[], byte[]>> range =
- db.getRangeKVs(startKeyInBytes, count, prefixFilter);
- for(Map.Entry<byte[], byte[]> entry : range) {
- String keyName = KeyUtils.getKeyName(entry.getKey());
- KeyData value = KeyUtils.getKeyData(entry.getValue());
- KeyData data = new KeyData(value.getContainerName(), keyName);
- result.add(data);
- }
- return result;
- } catch (IOException e) {
- throw new StorageContainerException(e,
- ContainerProtos.Result.IO_EXCEPTION);
+ MetadataStore db = KeyUtils.getDB(cData, conf);
+
+ List<KeyData> result = new ArrayList<KeyData>();
+ byte[] startKeyInBytes = startKey == null ? null :
+ DFSUtil.string2Bytes(startKey);
+ MetadataKeyFilter prefixFilter = new KeyPrefixFilter(prefix);
+ List<Map.Entry<byte[], byte[]>> range =
+ db.getRangeKVs(startKeyInBytes, count, prefixFilter);
+ for (Map.Entry<byte[], byte[]> entry : range) {
+ String keyName = KeyUtils.getKeyName(entry.getKey());
+ KeyData value = KeyUtils.getKeyData(entry.getValue());
+ KeyData data = new KeyData(value.getContainerName(), keyName);
+ result.add(data);
}
+ return result;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java
index a362d07..a613d2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import java.io.IOException;
import java.util.List;
/**
@@ -32,18 +33,18 @@ public interface KeyManager {
*
* @param pipeline - Pipeline.
* @param data - Key Data.
- * @throws StorageContainerException
+ * @throws IOException
*/
- void putKey(Pipeline pipeline, KeyData data) throws StorageContainerException;
+ void putKey(Pipeline pipeline, KeyData data) throws IOException;
/**
* Gets an existing key.
*
* @param data - Key Data.
* @return Key Data.
- * @throws StorageContainerException
+ * @throws IOException
*/
- KeyData getKey(KeyData data) throws StorageContainerException;
+ KeyData getKey(KeyData data) throws IOException;
/**
* Deletes an existing Key.
@@ -53,7 +54,7 @@ public interface KeyManager {
* @throws StorageContainerException
*/
void deleteKey(Pipeline pipeline, String keyName)
- throws StorageContainerException;
+ throws IOException;
/**
* List keys in a container.
@@ -65,7 +66,7 @@ public interface KeyManager {
* @return List of Keys that match the criteria.
*/
List<KeyData> listKey(Pipeline pipeline, String prefix, String startKey,
- int count) throws StorageContainerException;
+ int count) throws IOException;
/**
* Shutdown keyManager.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
index 81cdcf2..f4caad0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
@@ -24,7 +24,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.utils.LevelDBStore;
+import org.apache.hadoop.utils.MetadataStore;
import java.io.IOException;
import java.util.concurrent.locks.Lock;
@@ -69,7 +69,7 @@ public final class ContainerCache extends LRUMap {
protected boolean removeLRU(LinkEntry entry) {
lock.lock();
try {
- LevelDBStore db = (LevelDBStore) entry.getValue();
+ MetadataStore db = (MetadataStore) entry.getValue();
db.close();
} catch (IOException e) {
LOG.error("Error closing DB. Container: " + entry.getKey().toString(), e);
@@ -83,14 +83,14 @@ public final class ContainerCache extends LRUMap {
* Returns a DB handle if available, null otherwise.
*
* @param containerName - Name of the container.
- * @return OzoneLevelDBStore.
+ * @return MetadataStore.
*/
- public LevelDBStore getDB(String containerName) {
+ public MetadataStore getDB(String containerName) {
Preconditions.checkNotNull(containerName);
Preconditions.checkState(!containerName.isEmpty());
lock.lock();
try {
- return (LevelDBStore) this.get(containerName);
+ return (MetadataStore) this.get(containerName);
} finally {
lock.unlock();
}
@@ -106,7 +106,7 @@ public final class ContainerCache extends LRUMap {
Preconditions.checkState(!containerName.isEmpty());
lock.lock();
try {
- LevelDBStore db = this.getDB(containerName);
+ MetadataStore db = this.getDB(containerName);
if (db != null) {
try {
db.close();
@@ -126,7 +126,7 @@ public final class ContainerCache extends LRUMap {
* @param containerName - Name of the container
* @param db - DB handle
*/
- public void putDB(String containerName, LevelDBStore db) {
+ public void putDB(String containerName, MetadataStore db) {
Preconditions.checkNotNull(containerName);
Preconditions.checkState(!containerName.isEmpty());
lock.lock();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
index 1e42307..36e5b3a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
@@ -19,10 +19,10 @@ package org.apache.hadoop.ozone.ksm;
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
+import org.apache.hadoop.utils.BatchOperation;
import java.io.IOException;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.locks.Lock;
/**
@@ -56,36 +56,27 @@ public interface MetadataManager {
* @param key - key
* @return value
*/
- byte[] get(byte[] key);
+ byte[] get(byte[] key) throws IOException;
/**
* Puts a Key into Metadata DB.
* @param key - key
* @param value - value
*/
- void put(byte[] key, byte[] value);
+ void put(byte[] key, byte[] value) throws IOException;
/**
* Deletes a Key from Metadata DB.
* @param key - key
*/
- void delete(byte[] key);
+ void delete(byte[] key) throws IOException;
/**
- * Performs batch Put and Delete to Metadata DB.
- * Can be used to do multiple puts and deletes atomically.
- * @param putList - list of Key/Value to put into DB
- * @param delList - list of Key to delete from DB
- */
- void batchPutDelete(List<Map.Entry<byte[], byte[]>> putList,
- List<byte[]> delList) throws IOException;
-
- /**
- * Performs a batch Put to Metadata DB.
- * Can be used to do multiple puts atomically.
- * @param putList - list of Key/Value to put into DB
+ * Atomic write a batch of operations.
+ * @param batch
+ * @throws IOException
*/
- void batchPut(List<Map.Entry<byte[], byte[]>> putList) throws IOException;
+ void writeBatch(BatchOperation batch) throws IOException;
/**
* Given a volume return the corresponding DB key.
@@ -120,7 +111,7 @@ public interface MetadataManager {
*
* @param key - key name
*/
- void deleteKey(byte[] key);
+ void deleteKey(byte[] key) throws IOException;
/**
* Given a volume, check if it is empty,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
index dd3d524..7e48eda 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
@@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.ksm;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
-import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
@@ -32,12 +32,11 @@ import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyI
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeInfo;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeList;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
-import org.apache.hadoop.utils.LevelDBKeyFilters.KeyPrefixFilter;
-import org.apache.hadoop.utils.LevelDBKeyFilters.LevelDBKeyFilter;
-import org.apache.hadoop.utils.LevelDBStore;
-import org.iq80.leveldb.DBIterator;
-import org.iq80.leveldb.Options;
-import org.iq80.leveldb.WriteBatch;
+import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
+import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
import java.io.File;
import java.io.IOException;
@@ -57,9 +56,9 @@ import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
/**
* KSM metadata manager interface.
*/
-public class MetadataManagerImpl implements MetadataManager {
+public class MetadataManagerImpl implements MetadataManager {
- private final LevelDBStore store;
+ private final MetadataStore store;
private final ReadWriteLock lock;
@@ -67,10 +66,12 @@ public class MetadataManagerImpl implements MetadataManager {
File metaDir = OzoneUtils.getScmMetadirPath(conf);
final int cacheSize = conf.getInt(OZONE_KSM_DB_CACHE_SIZE_MB,
OZONE_KSM_DB_CACHE_SIZE_DEFAULT);
- Options options = new Options();
- options.cacheSize(cacheSize * OzoneConsts.MB);
File ksmDBFile = new File(metaDir.getPath(), KSM_DB_NAME);
- this.store = new LevelDBStore(ksmDBFile, options);
+ this.store = MetadataStoreBuilder.newBuilder()
+ .setConf(conf)
+ .setDbFile(ksmDBFile)
+ .setCacheSize(cacheSize * OzoneConsts.MB)
+ .build();
this.lock = new ReentrantReadWriteLock();
}
@@ -153,7 +154,7 @@ public class MetadataManagerImpl implements MetadataManager {
* @param key - key name
*/
@Override
- public void deleteKey(byte[] key) {
+ public void deleteKey(byte[] key) throws IOException {
store.delete(key);
}
@@ -181,7 +182,7 @@ public class MetadataManagerImpl implements MetadataManager {
* @return value
*/
@Override
- public byte[] get(byte[] key) {
+ public byte[] get(byte[] key) throws IOException {
return store.get(key);
}
@@ -191,7 +192,7 @@ public class MetadataManagerImpl implements MetadataManager {
* @param value - value
*/
@Override
- public void put(byte[] key, byte[] value) {
+ public void put(byte[] key, byte[] value) throws IOException {
store.put(key, value);
}
@@ -199,45 +200,13 @@ public class MetadataManagerImpl implements MetadataManager {
* Deletes a Key from Metadata DB.
* @param key - key
*/
- public void delete(byte[] key) {
+ public void delete(byte[] key) throws IOException {
store.delete(key);
}
- /**
- * Performs a batch Put and Delete from Metadata DB.
- * Can be used to do multiple puts and deletes atomically.
- * @param putList - list of key and value pairs to put to Metadata DB.
- * @param delList - list of keys to delete from Metadata DB.
- */
@Override
- public void batchPutDelete(List<Map.Entry<byte[], byte[]>> putList,
- List<byte[]> delList)
- throws IOException {
- WriteBatch batch = store.createWriteBatch();
- putList.forEach(entry -> batch.put(entry.getKey(), entry.getValue()));
- delList.forEach(entry -> batch.delete(entry));
- try {
- store.commitWriteBatch(batch);
- } finally {
- store.closeWriteBatch(batch);
- }
- }
-
- /**
- * Performs a batch Put to Metadata DB.
- * Can be used to do multiple puts atomically.
- * @param list - list of Map.Entry
- */
- @Override
- public void batchPut(List<Map.Entry<byte[], byte[]>> list)
- throws IOException {
- WriteBatch batch = store.createWriteBatch();
- list.forEach(entry -> batch.put(entry.getKey(), entry.getValue()));
- try {
- store.commitWriteBatch(batch);
- } finally {
- store.closeWriteBatch(batch);
- }
+ public void writeBatch(BatchOperation batch) throws IOException {
+ this.store.writeBatch(batch);
}
/**
@@ -246,21 +215,17 @@ public class MetadataManagerImpl implements MetadataManager {
* @return true if the volume is empty
*/
public boolean isVolumeEmpty(String volume) throws IOException {
- try (DBIterator iterator = store.getIterator()) {
- String dbVolumeRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume
- + OzoneConsts.KSM_BUCKET_PREFIX;
- byte[] dbVolumeRootKey = DFSUtil.string2Bytes(dbVolumeRootName);
- // Seek to the root of the volume and look for the next key
- iterator.seek(dbVolumeRootKey);
- if (iterator.hasNext()) {
- String firstBucketKey = DFSUtil.bytes2String(iterator.next().getKey());
- // if the key starts with /<volume name>/
- // then there is at least one bucket
- return !firstBucketKey.startsWith(dbVolumeRootName);
- } else {
- return true;
- }
+ String dbVolumeRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume;
+ byte[] dbVolumeRootKey = DFSUtil.string2Bytes(dbVolumeRootName);
+ // Seek to the root of the volume and look for the next key
+ ImmutablePair<byte[], byte[]> volumeRoot =
+ store.peekAround(1, dbVolumeRootKey);
+ if (volumeRoot != null) {
+ String firstBucketKey = DFSUtil.bytes2String(volumeRoot.getKey());
+ return !firstBucketKey.startsWith(dbVolumeRootName
+ + OzoneConsts.KSM_BUCKET_PREFIX);
}
+ return true;
}
/**
@@ -272,18 +237,15 @@ public class MetadataManagerImpl implements MetadataManager {
*/
public boolean isBucketEmpty(String volume, String bucket)
throws IOException {
- try (DBIterator iterator = store.getIterator()) {
- String keyRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume
- + OzoneConsts.KSM_BUCKET_PREFIX + bucket
- + OzoneConsts.KSM_KEY_PREFIX;
- byte[] keyRoot = DFSUtil.string2Bytes(keyRootName);
- iterator.seek(keyRoot);
- if(iterator.hasNext()) {
- return !DFSUtil.bytes2String(iterator.next().getKey())
- .startsWith(keyRootName);
- }
- return true;
+ String keyRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume
+ + OzoneConsts.KSM_BUCKET_PREFIX + bucket;
+ byte[] keyRoot = DFSUtil.string2Bytes(keyRootName);
+ ImmutablePair<byte[], byte[]> firstKey = store.peekAround(1, keyRoot);
+ if (firstKey != null) {
+ return !DFSUtil.bytes2String(firstKey.getKey())
+ .startsWith(keyRootName + OzoneConsts.KSM_KEY_PREFIX);
}
+ return true;
}
/**
@@ -305,8 +267,19 @@ public class MetadataManagerImpl implements MetadataManager {
ResultCodes.FAILED_VOLUME_NOT_FOUND);
}
- LevelDBKeyFilter filter =
- new KeyPrefixFilter(getBucketKeyPrefix(volumeName, bucketPrefix));
+
+ // A bucket must start with /volume/bucket_prefix
+ // and exclude keys /volume/bucket_xxx/key_xxx
+ MetadataKeyFilter filter = (preKey, currentKey, nextKey) -> {
+ if (currentKey != null) {
+ String bucketNamePrefix = getBucketKeyPrefix(volumeName, bucketPrefix);
+ String bucket = DFSUtil.bytes2String(currentKey);
+ return bucket.startsWith(bucketNamePrefix) &&
+ !bucket.replaceFirst(bucketNamePrefix, "")
+ .contains(OzoneConsts.KSM_KEY_PREFIX);
+ }
+ return false;
+ };
List<Map.Entry<byte[], byte[]>> rangeResult;
if (!Strings.isNullOrEmpty(startBucket)) {
@@ -349,7 +322,7 @@ public class MetadataManagerImpl implements MetadataManager {
ResultCodes.FAILED_BUCKET_NOT_FOUND);
}
- LevelDBKeyFilter filter =
+ MetadataKeyFilter filter =
new KeyPrefixFilter(getKeyKeyPrefix(volumeName, bucketName, keyPrefix));
List<Map.Entry<byte[], byte[]>> rangeResult;
@@ -427,18 +400,17 @@ public class MetadataManagerImpl implements MetadataManager {
private VolumeList getVolumesByUser(byte[] userNameKey)
throws KSMException {
VolumeList volumes = null;
- byte[] volumesInBytes = store.get(userNameKey);
- if (volumesInBytes == null) {
- // No volume found for this user, return an empty list
- return VolumeList.newBuilder().build();
- }
-
try {
+ byte[] volumesInBytes = store.get(userNameKey);
+ if (volumesInBytes == null) {
+ // No volume found for this user, return an empty list
+ return VolumeList.newBuilder().build();
+ }
volumes = VolumeList.parseFrom(volumesInBytes);
- } catch (InvalidProtocolBufferException e) {
+ } catch (IOException e) {
throw new KSMException("Unable to get volumes info by the given user, "
- + "metadata might be corrupted",
- e, ResultCodes.FAILED_METADATA_ERROR);
+ + "metadata might be corrupted", e,
+ ResultCodes.FAILED_METADATA_ERROR);
}
return volumes;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
index 0757c9d..f4590a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
@@ -26,15 +26,13 @@ import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.VolumeList;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.VolumeInfo;
-import org.iq80.leveldb.DBException;
+import org.apache.hadoop.utils.BatchOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.AbstractMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import static org.apache.hadoop.ozone.ksm
.KSMConfigKeys.OZONE_KSM_USER_MAX_VOLUME_DEFAULT;
@@ -67,8 +65,7 @@ public class VolumeManagerImpl implements VolumeManager {
// Helpers to add and delete volume from user list
private void addVolumeToOwnerList(String volume, String owner,
- List<Map.Entry<byte[], byte[]>> putBatch)
- throws IOException {
+ BatchOperation batchOperation) throws IOException {
// Get the volume list
byte[] dbUserKey = metadataManager.getUserKey(owner);
byte[] volumeList = metadataManager.get(dbUserKey);
@@ -88,12 +85,11 @@ public class VolumeManagerImpl implements VolumeManager {
prevVolList.add(volume);
VolumeList newVolList = VolumeList.newBuilder()
.addAllVolumeNames(prevVolList).build();
- putBatch.add(batchEntry(dbUserKey, newVolList.toByteArray()));
+ batchOperation.put(dbUserKey, newVolList.toByteArray());
}
private void delVolumeFromOwnerList(String volume, String owner,
- List<Map.Entry<byte[], byte[]>> putBatch,
- List<byte[]> deleteBatch)
+ BatchOperation batchOperation)
throws IOException {
// Get the volume list
byte[] dbUserKey = metadataManager.getUserKey(owner);
@@ -109,18 +105,14 @@ public class VolumeManagerImpl implements VolumeManager {
// Remove the volume from the list
prevVolList.remove(volume);
if (prevVolList.size() == 0) {
- deleteBatch.add(dbUserKey);
+ batchOperation.delete(dbUserKey);
} else {
VolumeList newVolList = VolumeList.newBuilder()
.addAllVolumeNames(prevVolList).build();
- putBatch.add(batchEntry(dbUserKey, newVolList.toByteArray()));
+ batchOperation.put(dbUserKey, newVolList.toByteArray());
}
}
- private Map.Entry<byte[], byte[]> batchEntry(byte[] key, byte[] value) {
- return new AbstractMap.SimpleEntry<>(key, value);
- }
-
/**
* Creates a volume.
* @param args - KsmVolumeArgs.
@@ -129,7 +121,6 @@ public class VolumeManagerImpl implements VolumeManager {
public void createVolume(KsmVolumeArgs args) throws IOException {
Preconditions.checkNotNull(args);
metadataManager.writeLock().lock();
- List<Map.Entry<byte[], byte[]>> batch = new LinkedList<>();
try {
byte[] dbVolumeKey = metadataManager.getVolumeKey(args.getVolume());
byte[] volumeInfo = metadataManager.get(dbVolumeKey);
@@ -140,16 +131,17 @@ public class VolumeManagerImpl implements VolumeManager {
throw new KSMException(ResultCodes.FAILED_VOLUME_ALREADY_EXISTS);
}
+ BatchOperation batch = new BatchOperation();
// Write the vol info
VolumeInfo newVolumeInfo = args.getProtobuf();
- batch.add(batchEntry(dbVolumeKey, newVolumeInfo.toByteArray()));
+ batch.put(dbVolumeKey, newVolumeInfo.toByteArray());
// Add volume to user list
addVolumeToOwnerList(args.getVolume(), args.getOwnerName(), batch);
- metadataManager.batchPut(batch);
+ metadataManager.writeBatch(batch);
LOG.info("created volume:{} user:{}",
args.getVolume(), args.getOwnerName());
- } catch (IOException | DBException ex) {
+ } catch (IOException ex) {
LOG.error("Volume creation failed for user:{} volname:{}",
args.getOwnerName(), args.getVolume(), ex);
throw ex;
@@ -169,8 +161,6 @@ public class VolumeManagerImpl implements VolumeManager {
public void setOwner(String volume, String owner) throws IOException {
Preconditions.checkNotNull(volume);
Preconditions.checkNotNull(owner);
- List<Map.Entry<byte[], byte[]>> putBatch = new LinkedList<>();
- List<byte[]> deleteBatch = new LinkedList<>();
metadataManager.writeLock().lock();
try {
byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
@@ -183,9 +173,9 @@ public class VolumeManagerImpl implements VolumeManager {
KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
- delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(),
- putBatch, deleteBatch);
- addVolumeToOwnerList(volume, owner, putBatch);
+ BatchOperation batch = new BatchOperation();
+ delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(), batch);
+ addVolumeToOwnerList(volume, owner, batch);
KsmVolumeArgs newVolumeArgs =
KsmVolumeArgs.newBuilder().setVolume(volumeArgs.getVolume())
@@ -195,9 +185,9 @@ public class VolumeManagerImpl implements VolumeManager {
.build();
VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf();
- putBatch.add(batchEntry(dbVolumeKey, newVolumeInfo.toByteArray()));
+ batch.put(dbVolumeKey, newVolumeInfo.toByteArray());
- metadataManager.batchPutDelete(putBatch, deleteBatch);
+ metadataManager.writeBatch(batch);
} catch (IOException ex) {
LOG.error("Changing volume ownership failed for user:{} volume:{}",
owner, volume, ex);
@@ -285,8 +275,7 @@ public class VolumeManagerImpl implements VolumeManager {
Preconditions.checkNotNull(volume);
metadataManager.writeLock().lock();
try {
- List<Map.Entry<byte[], byte[]>> putBatch = new LinkedList<>();
- List<byte[]> deleteBatch = new LinkedList<>();
+ BatchOperation batch = new BatchOperation();
byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
byte[] volInfo = metadataManager.get(dbVolumeKey);
if (volInfo == null) {
@@ -301,10 +290,9 @@ public class VolumeManagerImpl implements VolumeManager {
Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
// delete the volume from the owner list
// as well as delete the volume entry
- delVolumeFromOwnerList(volume, volumeInfo.getOwnerName(),
- putBatch, deleteBatch);
- deleteBatch.add(dbVolumeKey);
- metadataManager.batchPutDelete(putBatch, deleteBatch);
+ delVolumeFromOwnerList(volume, volumeInfo.getOwnerName(), batch);
+ batch.delete(dbVolumeKey);
+ metadataManager.writeBatch(batch);
} catch (IOException ex) {
LOG.error("Delete volume failed for volume:{}", volume, ex);
throw ex;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
index 8e49b5f..98da9b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
@@ -29,10 +29,9 @@ import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.utils.LevelDBStore;
-import org.iq80.leveldb.DBIterator;
-import org.iq80.leveldb.Options;
-import org.iq80.leveldb.WriteBatch;
+import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,13 +74,13 @@ public class BlockManagerImpl implements BlockManager {
private final NodeManager nodeManager;
private final Mapping containerManager;
- private final LevelDBStore blockStore;
+ private final MetadataStore blockStore;
private final Lock lock;
private final long containerSize;
private final long cacheSize;
- private final LevelDBStore openContainerStore;
+ private final MetadataStore openContainerStore;
private Map<String, Long> openContainers;
private final int containerProvisionBatchSize;
private final Random rand;
@@ -102,12 +101,14 @@ public class BlockManagerImpl implements BlockManager {
this.cacheSize = cacheSizeMB;
File metaDir = OzoneUtils.getScmMetadirPath(conf);
String scmMetaDataDir = metaDir.getPath();
- Options options = new Options();
- options.cacheSize(this.cacheSize * OzoneConsts.MB);
// Write the block key to container name mapping.
File blockContainerDbPath = new File(scmMetaDataDir, BLOCK_DB);
- blockStore = new LevelDBStore(blockContainerDbPath, options);
+ blockStore = MetadataStoreBuilder.newBuilder()
+ .setConf(conf)
+ .setDbFile(blockContainerDbPath)
+ .setCacheSize(this.cacheSize * OzoneConsts.MB)
+ .build();
this.containerSize = OzoneConsts.GB * conf.getInt(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
@@ -115,7 +116,12 @@ public class BlockManagerImpl implements BlockManager {
// Load store of all open contains for block allocation
File openContainsDbPath = new File(scmMetaDataDir, OPEN_CONTAINERS_DB);
- openContainerStore = new LevelDBStore(openContainsDbPath, options);
+ openContainerStore = MetadataStoreBuilder.newBuilder()
+ .setConf(conf)
+ .setDbFile(openContainsDbPath)
+ .setCacheSize(this.cacheSize * OzoneConsts.MB)
+ .build();
+
openContainers = new HashMap<>();
loadOpenContainers();
@@ -132,20 +138,19 @@ public class BlockManagerImpl implements BlockManager {
* @throws IOException
*/
private void loadOpenContainers() throws IOException {
- try (DBIterator iter = openContainerStore.getIterator()) {
- for (iter.seekToFirst(); iter.hasNext(); iter.next()) {
+ try {
+ openContainerStore.iterate(null, (key, value) -> {
try {
- byte[] key = iter.peekNext().getKey();
String containerName = DFSUtil.bytes2String(key);
- byte[] value = iter.peekNext().getValue();
Long containerUsed = Long.parseLong(DFSUtil.bytes2String(value));
openContainers.put(containerName, containerUsed);
LOG.debug("Loading open container: {} used : {}", containerName,
containerUsed);
- } catch (Exception ex) {
+ } catch (Exception e) {
LOG.warn("Failed loading open container, continue next...");
}
- }
+ return true;
+ });
} catch (IOException e) {
LOG.error("Loading open container store failed." + e);
throw new SCMException("Failed to load open container store",
@@ -321,21 +326,19 @@ public class BlockManagerImpl implements BlockManager {
throw new SCMException("Specified block key does not exist. key : " +
key, FAILED_TO_FIND_BLOCK);
}
- try (WriteBatch wb = blockStore.createWriteBatch()) {
- containerManager.getContainer(
- DFSUtil.bytes2String(containerBytes));
- String deletedKeyName = getDeletedKeyName(key);
- // Add a tombstone for the deleted key
- wb.put(DFSUtil.string2Bytes(deletedKeyName), containerBytes);
- // Delete the block key
- wb.delete(DFSUtil.string2Bytes(key));
- blockStore.commitWriteBatch(wb);
- // TODO: Add async tombstone clean thread to send delete command to
- // datanodes in the pipeline to clean up the blocks from containers.
- // TODO: Container report handling of the deleted blocks:
- // Remove tombstone and update open container usage.
- // We will revisit this when the closed container replication is done.
- }
+ BatchOperation batch = new BatchOperation();
+ containerManager.getContainer(DFSUtil.bytes2String(containerBytes));
+ String deletedKeyName = getDeletedKeyName(key);
+ // Add a tombstone for the deleted key
+ batch.put(DFSUtil.string2Bytes(deletedKeyName), containerBytes);
+ // Delete the block key
+ batch.delete(DFSUtil.string2Bytes(key));
+ blockStore.writeBatch(batch);
+ // TODO: Add async tombstone clean thread to send delete command to
+ // datanodes in the pipeline to clean up the blocks from containers.
+ // TODO: Container report handling of the deleted blocks:
+ // Remove tombstone and update open container usage.
+ // We will revisit this when the closed container replication is done.
} finally {
lock.unlock();
}
@@ -359,4 +362,4 @@ public class BlockManagerImpl implements BlockManager {
openContainerStore.close();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
index f558882..0e436e9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
@@ -31,8 +31,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.Pipeline;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.utils.LevelDBStore;
-import org.iq80.leveldb.DBIterator;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +47,6 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB;
@@ -153,6 +152,7 @@ public class SQLCLI extends Configured implements Tool {
.withDescription("specify output path")
.create("o");
allOptions.addOption(outPathOption);
+
return allOptions;
}
@@ -254,22 +254,25 @@ public class SQLCLI extends Configured implements Tool {
throws Exception {
LOG.info("Create tables for sql container db.");
File dbFile = dbPath.toFile();
- org.iq80.leveldb.Options dbOptions = new org.iq80.leveldb.Options();
- try (LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions);
- Connection conn = connectDB(outPath.toString());
- DBIterator iter = dbStore.getIterator()) {
+ try (MetadataStore dbStore = MetadataStoreBuilder.newBuilder()
+ .setDbFile(dbFile).build();
+ Connection conn = connectDB(outPath.toString())) {
executeSQL(conn, CREATE_CONTAINER_INFO);
executeSQL(conn, CREATE_CONTAINER_MEMBERS);
executeSQL(conn, CREATE_DATANODE_INFO);
- iter.seekToFirst();
HashSet<String> uuidChecked = new HashSet<>();
- while (iter.hasNext()) {
- Map.Entry<byte[], byte[]> entry = iter.next();
- String containerName = new String(entry.getKey(), encoding);
- Pipeline pipeline = Pipeline.parseFrom(entry.getValue());
- insertContainerDB(conn, containerName, pipeline, uuidChecked);
- }
+ dbStore.iterate(null, (key, value) -> {
+ String containerName = new String(key, encoding);
+ Pipeline pipeline = null;
+ pipeline = Pipeline.parseFrom(value);
+ try {
+ insertContainerDB(conn, containerName, pipeline, uuidChecked);
+ return true;
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ });
}
}
@@ -330,21 +333,24 @@ public class SQLCLI extends Configured implements Tool {
private void convertBlockDB(Path dbPath, Path outPath) throws Exception {
LOG.info("Create tables for sql block db.");
File dbFile = dbPath.toFile();
- org.iq80.leveldb.Options dbOptions = new org.iq80.leveldb.Options();
- try (LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions);
- Connection conn = connectDB(outPath.toString());
- DBIterator iter = dbStore.getIterator()) {
+ try (MetadataStore dbStore = MetadataStoreBuilder.newBuilder()
+ .setDbFile(dbFile).build();
+ Connection conn = connectDB(outPath.toString())) {
executeSQL(conn, CREATE_BLOCK_CONTAINER);
- iter.seekToFirst();
- while (iter.hasNext()) {
- Map.Entry<byte[], byte[]> entry = iter.next();
- String blockKey = DFSUtilClient.bytes2String(entry.getKey());
- String containerName = DFSUtilClient.bytes2String(entry.getValue());
+ dbStore.iterate(null, (key, value) -> {
+ String blockKey = DFSUtilClient.bytes2String(key);
+ String containerName = DFSUtilClient.bytes2String(value);
String insertBlockContainer = String.format(
INSERT_BLOCK_CONTAINER, blockKey, containerName);
- executeSQL(conn, insertBlockContainer);
- }
+
+ try {
+ executeSQL(conn, insertBlockContainer);
+ return true;
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ });
}
}
@@ -374,21 +380,23 @@ public class SQLCLI extends Configured implements Tool {
private void convertNodePoolDB(Path dbPath, Path outPath) throws Exception {
LOG.info("Create table for sql node pool db.");
File dbFile = dbPath.toFile();
- org.iq80.leveldb.Options dbOptions = new org.iq80.leveldb.Options();
- try (LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions);
- Connection conn = connectDB(outPath.toString());
- DBIterator iter = dbStore.getIterator()) {
+ try (MetadataStore dbStore = MetadataStoreBuilder.newBuilder()
+ .setDbFile(dbFile).build();
+ Connection conn = connectDB(outPath.toString())) {
executeSQL(conn, CREATE_NODE_POOL);
executeSQL(conn, CREATE_DATANODE_INFO);
- iter.seekToFirst();
- while (iter.hasNext()) {
- Map.Entry<byte[], byte[]> entry = iter.next();
- DatanodeID nodeId = DatanodeID.getFromProtoBuf(
- HdfsProtos.DatanodeIDProto.PARSER.parseFrom(entry.getKey()));
- String blockPool = DFSUtil.bytes2String(entry.getValue());
- insertNodePoolDB(conn, blockPool, nodeId);
- }
+ dbStore.iterate(null, (key, value) -> {
+ DatanodeID nodeId = DatanodeID
+ .getFromProtoBuf(HdfsProtos.DatanodeIDProto.PARSER.parseFrom(key));
+ String blockPool = DFSUtil.bytes2String(value);
+ try {
+ insertNodePoolDB(conn, blockPool, nodeId);
+ return true;
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ });
}
}
@@ -423,22 +431,24 @@ public class SQLCLI extends Configured implements Tool {
throws Exception {
LOG.info("Create table for open container db.");
File dbFile = dbPath.toFile();
- org.iq80.leveldb.Options dbOptions = new org.iq80.leveldb.Options();
- try (LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions);
- Connection conn = connectDB(outPath.toString());
- DBIterator iter = dbStore.getIterator()) {
+ try (MetadataStore dbStore = MetadataStoreBuilder.newBuilder()
+ .setDbFile(dbFile).build();
+ Connection conn = connectDB(outPath.toString())) {
executeSQL(conn, CREATE_OPEN_CONTAINER);
- iter.seekToFirst();
- while (iter.hasNext()) {
- Map.Entry<byte[], byte[]> entry = iter.next();
- String containerName = DFSUtil.bytes2String(entry.getKey());
- Long containerUsed = Long.parseLong(
- DFSUtil.bytes2String(entry.getValue()));
- String insertOpenContainer = String.format(
- INSERT_OPEN_CONTAINER, containerName, containerUsed);
- executeSQL(conn, insertOpenContainer);
- }
+ dbStore.iterate(null, (key, value) -> {
+ String containerName = DFSUtil.bytes2String(key);
+ Long containerUsed =
+ Long.parseLong(DFSUtil.bytes2String(value));
+ String insertOpenContainer = String
+ .format(INSERT_OPEN_CONTAINER, containerName, containerUsed);
+ try {
+ executeSQL(conn, insertOpenContainer);
+ return true;
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ });
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
index 75694bb..643779d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
@@ -32,9 +32,10 @@ import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.utils.LevelDBKeyFilters.KeyPrefixFilter;
-import org.apache.hadoop.utils.LevelDBStore;
-import org.iq80.leveldb.Options;
+import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
+import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,7 +64,7 @@ public class ContainerMapping implements Mapping {
private final long cacheSize;
private final Lock lock;
private final Charset encoding = Charset.forName("UTF-8");
- private final LevelDBStore containerStore;
+ private final MetadataStore containerStore;
private final ContainerPlacementPolicy placementPolicy;
private final long containerSize;
@@ -85,12 +86,14 @@ public class ContainerMapping implements Mapping {
this.cacheSize = cacheSizeMB;
File metaDir = OzoneUtils.getScmMetadirPath(conf);
- Options options = new Options();
- options.cacheSize(this.cacheSize * OzoneConsts.MB);
// Write the container name to pipeline mapping.
File containerDBPath = new File(metaDir, CONTAINER_DB);
- containerStore = new LevelDBStore(containerDBPath, options);
+ containerStore = MetadataStoreBuilder.newBuilder()
+ .setConf(conf)
+ .setDbFile(containerDBPath)
+ .setCacheSize(this.cacheSize * OzoneConsts.MB)
+ .build();
this.lock = new ReentrantLock();
@@ -192,7 +195,7 @@ public class ContainerMapping implements Mapping {
if(containerStore.isEmpty()) {
throw new IOException("No container exists in current db");
}
- KeyPrefixFilter prefixFilter = new KeyPrefixFilter(prefixName);
+ MetadataKeyFilter prefixFilter = new KeyPrefixFilter(prefixName);
byte[] startKey = startName == null ?
null : DFSUtil.string2Bytes(startName);
List<Map.Entry<byte[], byte[]>> range =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java
index 71836db..a22cd12 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.node.CommandQueue;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.scm.node.NodePoolManager;
@@ -260,8 +261,17 @@ public class ContainerReplicationManager implements Closeable {
* a datanode.
*/
public void handleContainerReport(ContainerReportsProto containerReport) {
- String poolName = poolManager.getNodePool(
- DatanodeID.getFromProtoBuf(containerReport.getDatanodeID()));
+ String poolName = null;
+ DatanodeID datanodeID = DatanodeID
+ .getFromProtoBuf(containerReport.getDatanodeID());
+ try {
+ poolName = poolManager.getNodePool(datanodeID);
+ } catch (SCMException e) {
+ LOG.warn("Skipping processing container report from datanode {}, "
+ + "cause: failed to get the corresponding node pool",
+ datanodeID.toString(), e);
+ return;
+ }
for(InProgressPool ppool : inProgressPoolList) {
if(ppool.getPoolName().equalsIgnoreCase(poolName)) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java
index ad902ba..f60bdc6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java
@@ -110,6 +110,7 @@ public class SCMException extends IOException {
FAILED_TO_FIND_CONTAINER,
FAILED_TO_FIND_CONTAINER_WITH_SAPCE,
BLOCK_EXISTS,
- FAILED_TO_FIND_BLOCK
+ FAILED_TO_FIND_BLOCK,
+ IO_EXCEPTION
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodePoolManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodePoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodePoolManager.java
index f91c705..d3218b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodePoolManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodePoolManager.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import java.io.Closeable;
+import java.io.IOException;
import java.util.List;
/**
@@ -35,7 +36,7 @@ public interface NodePoolManager extends Closeable {
* @param pool - name of the node pool.
* @param node - data node.
*/
- void addNode(String pool, DatanodeID node);
+ void addNode(String pool, DatanodeID node) throws IOException;
/**
* Remove a node from a node pool.
@@ -67,5 +68,5 @@ public interface NodePoolManager extends Closeable {
* @return node pool name if it has been assigned.
* null if the node has not been assigned to any node pool yet.
*/
- String getNodePool(DatanodeID datanodeID);
+ String getNodePool(DatanodeID datanodeID) throws SCMException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
index a71fbcc..235809b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
@@ -726,8 +726,16 @@ public class SCMNodeManager
// TODO: define node pool policy for non-default node pool.
// For now, all nodes are added to the "DefaultNodePool" upon registration
// if it has not been added to any node pool yet.
- if (nodePoolManager.getNodePool(datanodeID) == null) {
- nodePoolManager.addNode(SCMNodePoolManager.DEFAULT_NODEPOOL, datanodeID);
+ try {
+ if (nodePoolManager.getNodePool(datanodeID) == null) {
+ nodePoolManager.addNode(SCMNodePoolManager.DEFAULT_NODEPOOL,
+ datanodeID);
+ }
+ } catch (IOException e) {
+ // TODO: make sure registration failure is handled correctly.
+ return RegisteredCommand.newBuilder()
+ .setErrorCode(ErrorCode.errorNodeNotPermitted)
+ .build();
}
LOG.info("Data node with ID: {} Registered.",
datanodeID.getDatanodeUuid());
@@ -823,4 +831,4 @@ public class SCMNodeManager
}
return nodeCountMap;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java
index 9c1821f..aa34f29 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java
@@ -26,9 +26,8 @@ import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
-import org.apache.hadoop.utils.LevelDBStore;
-import org.iq80.leveldb.DBIterator;
-import org.iq80.leveldb.Options;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,7 +64,7 @@ public final class SCMNodePoolManager implements NodePoolManager {
public static final String DEFAULT_NODEPOOL = "DefaultNodePool";
// DB that saves the node to node pool mapping.
- private LevelDBStore nodePoolStore;
+ private MetadataStore nodePoolStore;
// In-memory node pool to nodes mapping
private HashMap<String, Set<DatanodeID>> nodePools;
@@ -84,11 +83,12 @@ public final class SCMNodePoolManager implements NodePoolManager {
OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
File metaDir = OzoneUtils.getScmMetadirPath(conf);
String scmMetaDataDir = metaDir.getPath();
- Options options = new Options();
- options.cacheSize(cacheSize * OzoneConsts.MB);
-
File nodePoolDBPath = new File(scmMetaDataDir, NODEPOOL_DB);
- nodePoolStore = new LevelDBStore(nodePoolDBPath, options);
+ nodePoolStore = MetadataStoreBuilder.newBuilder()
+ .setConf(conf)
+ .setDbFile(nodePoolDBPath)
+ .setCacheSize(cacheSize * OzoneConsts.MB)
+ .build();
nodePools = new HashMap<>();
lock = new ReentrantReadWriteLock();
init();
@@ -100,14 +100,11 @@ public final class SCMNodePoolManager implements NodePoolManager {
* @throws SCMException
*/
private void init() throws SCMException {
- try (DBIterator iter = nodePoolStore.getIterator()) {
- for (iter.seekToFirst(); iter.hasNext(); iter.next()) {
+ try {
+ nodePoolStore.iterate(null, (key, value) -> {
try {
- byte[] key = iter.peekNext().getKey();
DatanodeID nodeId = DatanodeID.getFromProtoBuf(
HdfsProtos.DatanodeIDProto.PARSER.parseFrom(key));
-
- byte[] value = iter.peekNext().getValue();
String poolName = DFSUtil.bytes2String(value);
Set<DatanodeID> nodePool = null;
@@ -119,12 +116,14 @@ public final class SCMNodePoolManager implements NodePoolManager {
}
nodePool.add(nodeId);
if (LOG.isDebugEnabled()) {
- LOG.debug("Adding node: {} to node pool: {}", nodeId, poolName);
+ LOG.debug("Adding node: {} to node pool: {}",
+ nodeId, poolName);
}
- } catch (Exception ex) {
+ } catch (IOException e) {
LOG.warn("Can't add a datanode to node pool, continue next...");
}
- }
+ return true;
+ });
} catch (IOException e) {
LOG.error("Loading node pool error " + e);
throw new SCMException("Failed to load node pool",
@@ -138,7 +137,8 @@ public final class SCMNodePoolManager implements NodePoolManager {
* @param node - name of the datanode.
*/
@Override
- public void addNode(final String pool, final DatanodeID node) {
+ public void addNode(final String pool, final DatanodeID node)
+ throws IOException {
Preconditions.checkNotNull(pool, "pool name is null");
Preconditions.checkNotNull(node, "node is null");
lock.writeLock().lock();
@@ -192,6 +192,10 @@ public final class SCMNodePoolManager implements NodePoolManager {
throw new SCMException(String.format("Unable to find node %s from" +
" pool %s in MAP.", DFSUtil.bytes2String(kName), pool),
FAILED_TO_FIND_NODE_IN_POOL); }
+ } catch (IOException e) {
+ throw new SCMException("Failed to remove node " + node.toString()
+ + " from node pool " + pool, e,
+ SCMException.ResultCodes.IO_EXCEPTION);
} finally {
lock.writeLock().unlock();
}
@@ -238,14 +242,17 @@ public final class SCMNodePoolManager implements NodePoolManager {
* TODO: Put this in a in-memory map if performance is an issue.
*/
@Override
- public String getNodePool(final DatanodeID datanodeID) {
+ public String getNodePool(final DatanodeID datanodeID) throws SCMException {
Preconditions.checkNotNull(datanodeID, "node is null");
- byte[] result = nodePoolStore.get(
- datanodeID.getProtoBufMessage().toByteArray());
- if (result == null) {
- return null;
+ try {
+ byte[] result = nodePoolStore.get(
+ datanodeID.getProtoBufMessage().toByteArray());
+ return result == null ? null : DFSUtil.bytes2String(result);
+ } catch (IOException e) {
+ throw new SCMException("Failed to get node pool for node "
+ + datanodeID.toString(), e,
+ SCMException.ResultCodes.IO_EXCEPTION);
}
- return DFSUtil.bytes2String(result);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java
index d849612..f28555a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.utils.LevelDBStore;
import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
@@ -39,8 +38,8 @@ import org.apache.hadoop.ozone.web.response.ListKeys;
import org.apache.hadoop.ozone.web.response.ListVolumes;
import org.apache.hadoop.ozone.web.response.VolumeInfo;
import org.apache.hadoop.ozone.web.response.VolumeOwner;
-import org.iq80.leveldb.DBException;
-import org.iq80.leveldb.DBIterator;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -128,8 +127,8 @@ public final class OzoneMetadataManager {
private static final String USER_DB = "/user.db";
private static final String META_DB = "/metadata.db";
private static OzoneMetadataManager bm = null;
- private LevelDBStore userDB;
- private LevelDBStore metadataDB;
+ private MetadataStore userDB;
+ private MetadataStore metadataDB;
private ReadWriteLock lock;
private Charset encoding = Charset.forName("UTF-8");
private String storageRoot;
@@ -157,8 +156,14 @@ public final class OzoneMetadataManager {
}
try {
- userDB = new LevelDBStore(new File(storageRoot + USER_DB), true);
- metadataDB = new LevelDBStore(new File(storageRoot + META_DB), true);
+ userDB = MetadataStoreBuilder.newBuilder()
+ .setDbFile(new File(storageRoot + USER_DB))
+ .setCreateIfMissing(true)
+ .build();
+ metadataDB = MetadataStoreBuilder.newBuilder()
+ .setDbFile(new File(storageRoot + META_DB))
+ .setCreateIfMissing(true)
+ .build();
inProgressObjects = new ConcurrentHashMap<>();
} catch (IOException ex) {
LOG.error("Cannot open db :" + ex.getMessage());
@@ -230,7 +235,7 @@ public final class OzoneMetadataManager {
metadataDB.put(args.getVolumeName().getBytes(encoding),
newVInfo.toDBString().getBytes(encoding));
- } catch (IOException | DBException ex) {
+ } catch (IOException ex) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
} finally {
lock.writeLock().unlock();
@@ -295,7 +300,7 @@ public final class OzoneMetadataManager {
userDB.put(args.getResourceName().getBytes(encoding),
volumeList.toDBString().getBytes(encoding));
- } catch (IOException | DBException ex) {
+ } catch (IOException ex) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
} finally {
lock.writeLock().unlock();
@@ -341,7 +346,7 @@ public final class OzoneMetadataManager {
VolumeInfo info = VolumeInfo.parse(new String(volumeInfo, encoding));
return info.getOwner().getName().equals(acl.getName());
- } catch (IOException | DBException ex) {
+ } catch (IOException ex) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, null, ex);
} finally {
lock.readLock().unlock();
@@ -365,7 +370,7 @@ public final class OzoneMetadataManager {
}
return VolumeInfo.parse(new String(volumeInfo, encoding));
- } catch (IOException | DBException ex) {
+ } catch (IOException ex) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
} finally {
lock.readLock().unlock();
@@ -405,7 +410,7 @@ public final class OzoneMetadataManager {
prevKey = volName[1];
}
return getFilteredVolumes(volumeList, prefix, prevKey, maxCount);
- } catch (IOException | DBException ex) {
+ } catch (IOException ex) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args.getArgs(), ex);
} finally {
lock.readLock().unlock();
@@ -448,80 +453,54 @@ public final class OzoneMetadataManager {
* @return ListVolumes.
* @throws OzoneException
*/
- public ListVolumes listAllVolumes(ListArgs args) throws OzoneException,
- IOException {
+ public ListVolumes listAllVolumes(ListArgs args)
+ throws OzoneException, IOException {
String prefix = args.getPrefix();
- String prevKey = args.getPrevKey();
+ final String prevKey;
int maxCount = args.getMaxKeys();
String userName = null;
- try (DBIterator iterator = this.userDB.getDB().iterator()) {
-
- if (prevKey != null) {
- // Format is username/volumeName
- String[] volName = args.getPrevKey().split("/");
- if (volName.length < 2) {
- throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs());
- }
- seekToUser(iterator, volName[0]);
- userName = new String(iterator.peekNext().getKey(), encoding);
- prevKey = volName[1];
- } else {
- userName = getFirstUser(iterator);
- }
-
- if (userName == null || userName.isEmpty()) {
+ if (args.getPrevKey() != null) {
+ // Format is username/volumeName
+ String[] volName = args.getPrevKey().split("/");
+ if (volName.length < 2) {
throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs());
}
- ListVolumes returnSet = new ListVolumes();
- int count = maxCount - returnSet.getVolumes().size();
-
- // we need to iterate through users until we get maxcount volumes
- // or no more volumes are left.
- while (iterator.hasNext() && count > 0) {
+ byte[] userNameBytes = userDB.get(volName[0].getBytes(encoding));
+ userName = new String(userNameBytes, encoding);
+ prevKey = volName[1];
+ } else {
+ userName = new String(userDB.peekAround(0, null).getKey(), encoding);
+ prevKey = null;
+ }
- userName = new String(iterator.next().getKey(), encoding);
+ if (userName == null || userName.isEmpty()) {
+ throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs());
+ }
- byte[] volumeList = userDB.get(userName.getBytes(encoding));
+ ListVolumes returnSet = new ListVolumes();
+ // we need to iterate through users until we get maxcount volumes
+ // or no more volumes are left.
+ userDB.iterate(null, (key, value) -> {
+ int currentSize = returnSet.getVolumes().size();
+ if (currentSize < maxCount) {
+ String name = new String(key, encoding);
+ byte[] volumeList = userDB.get(name.getBytes(encoding));
if (volumeList == null) {
- throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs());
+ throw new IOException(
+ ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs()));
}
-
- returnSet.getVolumes().addAll(getFilteredVolumes(
- volumeList, prefix, prevKey, count).getVolumes());
- count = maxCount - returnSet.getVolumes().size();
+ returnSet.getVolumes().addAll(
+ getFilteredVolumes(volumeList, prefix, prevKey,
+ maxCount - currentSize).getVolumes());
+ return true;
+ } else {
+ return false;
}
- return returnSet;
- }
- }
+ });
- /**
- * Returns the first user name from the UserDB.
- *
- * @return - UserName.
- * @throws IOException
- */
- String getFirstUser(DBIterator iterator) throws IOException {
- iterator.seekToFirst();
- if (iterator.hasNext()) {
- return new String(iterator.peekNext().getKey(), encoding);
- }
- return null;
- }
-
- /**
- * Reposition the DB cursor to the user name.
- *
- * @param iterator - Current Iterator.
- * @param userName - userName to seek to
- * @return - DBIterator.
- * @throws IOException
- */
- DBIterator seekToUser(DBIterator iterator, String userName) throws
- IOException {
- iterator.seek(userName.getBytes(encoding));
- return iterator;
+ return returnSet;
}
/**
@@ -587,7 +566,7 @@ public final class OzoneMetadataManager {
metadataDB.delete(args.getVolumeName().getBytes(encoding));
userDB.put(user.getBytes(encoding),
volumeList.toDBString().getBytes(encoding));
- } catch (IOException | DBException ex) {
+ } catch (IOException ex) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
} finally {
lock.writeLock().unlock();
@@ -659,7 +638,7 @@ public final class OzoneMetadataManager {
metadataDB.put(args.getResourceName().getBytes(encoding),
bucketInfo.toDBString().getBytes(encoding));
- } catch (IOException | DBException ex) {
+ } catch (IOException ex) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
} finally {
lock.writeLock().unlock();
@@ -716,7 +695,7 @@ public final class OzoneMetadataManager {
userDB.put(args.getParentName().getBytes(encoding),
bucketList.toDBString().getBytes(encoding));
- } catch (IOException | DBException ex) {
+ } catch (IOException ex) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
} finally {
lock.writeLock().unlock();
@@ -807,7 +786,7 @@ public final class OzoneMetadataManager {
metadataDB.delete(args.getResourceName().getBytes(encoding));
userDB.put(args.getParentName().getBytes(encoding),
bucketList.toDBString().getBytes(encoding));
- } catch (IOException | DBException ex) {
+ } catch (IOException ex) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
} finally {
lock.writeLock().unlock();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BatchOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BatchOperation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BatchOperation.java
new file mode 100644
index 0000000..47699eb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BatchOperation.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.utils;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * An utility class to store a batch of DB write operations.
+ */
+public class BatchOperation {
+
+ /**
+ * Enum for write operations.
+ */
+ public enum Operation {
+ DELETE, PUT
+ }
+
+ private List<SingleOperation> operations =
+ Lists.newArrayList();
+
+ /**
+ * Add a PUT operation into the batch.
+ */
+ public void put(byte[] key, byte[] value) {
+ operations.add(new SingleOperation(Operation.PUT, key, value));
+ }
+
+ /**
+ * Add a DELETE operation into the batch.
+ */
+ public void delete(byte[] key) {
+ operations.add(new SingleOperation(Operation.DELETE, key, null));
+
+ }
+
+ public List<SingleOperation> getOperations() {
+ return operations;
+ }
+
+ /**
+ * A SingleOperation represents a PUT or DELETE operation
+ * and the data the operation needs to manipulates.
+ */
+ public static class SingleOperation {
+
+ private Operation opt;
+ private byte[] key;
+ private byte[] value;
+
+ public SingleOperation(Operation opt, byte[] key, byte[] value) {
+ this.opt = opt;
+ if (key == null) {
+ throw new IllegalArgumentException("key cannot be null");
+ }
+ this.key = key.clone();
+ this.value = value == null ? null : value.clone();
+ }
+
+ public Operation getOpt() {
+ return opt;
+ }
+
+ public byte[] getKey() {
+ return key.clone();
+ }
+
+ public byte[] getValue() {
+ return value == null ? null : value.clone();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/EntryConsumer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/EntryConsumer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/EntryConsumer.java
new file mode 100644
index 0000000..c407398
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/EntryConsumer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.utils;
+
+import java.io.IOException;
+
+/**
+ * A consumer for metadata store key-value entries.
+ * Used by {@link MetadataStore} class.
+ */
+@FunctionalInterface
+public interface EntryConsumer {
+
+ /**
+ * Consumes a key and value and produces a boolean result.
+ * @param key key
+ * @param value value
+ * @return a boolean value produced by the consumer
+ * @throws IOException
+ */
+ boolean consume(byte[] key, byte[] value) throws IOException;
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org