You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/07/21 01:27:12 UTC
hadoop git commit: HDFS-12149. Ozone: RocksDB implementation of ozone
metadata store. Contributed by Weiwei Yang.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7240 c2f2e0fa2 -> 0edc4c3dd
HDFS-12149. Ozone: RocksDB implementation of ozone metadata store. Contributed by Weiwei Yang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0edc4c3d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0edc4c3d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0edc4c3d
Branch: refs/heads/HDFS-7240
Commit: 0edc4c3dd8f6e6adcb57dfb81a891c92e4e183f9
Parents: c2f2e0f
Author: Anu Engineer <ae...@apache.org>
Authored: Thu Jul 20 18:22:03 2017 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Thu Jul 20 18:22:03 2017 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/pom.xml | 5 +
.../org/apache/hadoop/utils/LevelDBStore.java | 5 +-
.../hadoop/utils/MetadataStoreBuilder.java | 12 +-
.../org/apache/hadoop/utils/RocksDBStore.java | 318 +++++++++++++++++++
.../src/main/resources/ozone-default.xml | 2 +-
.../apache/hadoop/ozone/TestMetadataStore.java | 99 +++++-
6 files changed, 433 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0edc4c3d/hadoop-hdfs-project/hadoop-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index 22e4aa8..61ccca9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -193,6 +193,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>leveldbjni-all</artifactId>
<version>1.8</version>
</dependency>
+ <dependency>
+ <groupId>org.rocksdb</groupId>
+ <artifactId>rocksdbjni</artifactId>
+ <version>5.5.5</version>
+ </dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.bouncycastle</groupId>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0edc4c3d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
index 415b788..c7df429 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
@@ -126,7 +126,9 @@ public class LevelDBStore implements MetadataStore {
*/
@Override
public void close() throws IOException {
- db.close();
+ if (db != null){
+ db.close();
+ }
}
/**
@@ -163,6 +165,7 @@ public class LevelDBStore implements MetadataStore {
@Override
public void destroy() throws IOException {
+ close();
JniDBFactory.factory.destroy(dbFile, dbOptions);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0edc4c3d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java
index 81f2d8a..5546549 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.iq80.leveldb.Options;
+import org.rocksdb.BlockBasedTableConfig;
import java.io.File;
import java.io.IOException;
@@ -82,8 +83,15 @@ public class MetadataStoreBuilder {
}
store = new LevelDBStore(dbFile, options);
} else if (OZONE_METADATA_STORE_IMPL_ROCKSDB.equals(impl)) {
- // TODO replace with rocksDB impl
- store = new LevelDBStore(dbFile, new Options());
+ org.rocksdb.Options opts = new org.rocksdb.Options();
+ opts.setCreateIfMissing(createIfMissing);
+
+ if (cacheSize > 0) {
+ BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
+ tableConfig.setBlockCacheSize(cacheSize);
+ opts.setTableFormatConfig(tableConfig);
+ }
+ store = new RocksDBStore(dbFile, opts);
} else {
throw new IllegalArgumentException("Invalid argument for "
+ OzoneConfigKeys.OZONE_METADATA_STORE_IMPL
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0edc4c3d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/RocksDBStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/RocksDBStore.java
new file mode 100644
index 0000000..b2e5e2a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/RocksDBStore.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.utils;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Options;
+import org.rocksdb.WriteOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.DbPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.AbstractMap;
+
+/**
+ * RocksDB implementation of ozone metadata store.
+ */
+public class RocksDBStore implements MetadataStore {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RocksDBStore.class);
+
+ private RocksDB db = null;
+ private File dbLocation;
+ private WriteOptions writeOptions;
+ private Options dbOptions;
+
+ public RocksDBStore(File dbFile, Options options) throws IOException {
+ Preconditions.checkNotNull(dbFile, "DB file location cannot be null");
+ RocksDB.loadLibrary();
+ dbOptions = options;
+ dbLocation = dbFile;
+ writeOptions = new WriteOptions();
+ writeOptions.setSync(true);
+ writeOptions.setNoSlowdown(true);
+ try {
+ db = RocksDB.open(dbOptions, dbLocation.getAbsolutePath());
+ } catch (RocksDBException e) {
+ throw new IOException("Failed init RocksDB, db path : "
+ + dbFile.getAbsolutePath(), e);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RocksDB successfully opened.");
+ LOG.debug("[Option] dbLocation= {}", dbLocation.getAbsolutePath());
+ LOG.debug("[Option] createIfMissing = {}", options.createIfMissing());
+ LOG.debug("[Option] compactionPriority= {}", options.compactionStyle());
+ LOG.debug("[Option] compressionType= {}", options.compressionType());
+ LOG.debug("[Option] maxOpenFiles= {}", options.maxOpenFiles());
+ LOG.debug("[Option] writeBufferSize= {}", options.writeBufferSize());
+ }
+ }
+
+ private IOException toIOException(String msg, RocksDBException e) {
+ String statusCode = e.getStatus() == null ? "N/A" :
+ e.getStatus().getCodeString();
+ String errMessage = e.getMessage() == null ? "Unknown error" :
+ e.getMessage();
+ String output = msg + "; status : " + statusCode
+ + "; message : " + errMessage;
+ return new IOException(output, e);
+ }
+
+ @Override
+ public void put(byte[] key, byte[] value) throws IOException {
+ try {
+ db.put(writeOptions, key, value);
+ } catch (RocksDBException e) {
+ throw toIOException("Failed to put key-value to metadata store", e);
+ }
+ }
+
+ @Override
+ public boolean isEmpty() throws IOException {
+ RocksIterator it = null;
+ try {
+ it = db.newIterator();
+ it.seekToFirst();
+ return !it.isValid();
+ } finally {
+ if (it != null) {
+ it.close();
+ }
+ }
+ }
+
+ @Override
+ public byte[] get(byte[] key) throws IOException {
+ try {
+ return db.get(key);
+ } catch (RocksDBException e) {
+ throw toIOException("Failed to get the value for the given key", e);
+ }
+ }
+
+ @Override
+ public void delete(byte[] key) throws IOException {
+ try {
+ db.delete(key);
+ } catch (RocksDBException e) {
+ throw toIOException("Failed to delete the given key", e);
+ }
+ }
+
+ @Override
+ public List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
+ int count, MetadataKeyFilters.MetadataKeyFilter... filters)
+ throws IOException, IllegalArgumentException {
+ List<Map.Entry<byte[], byte[]>> result = new ArrayList<>();
+ long start = System.currentTimeMillis();
+ if (count < 0) {
+ throw new IllegalArgumentException(
+ "Invalid count given " + count + ", count must be greater than 0");
+ }
+ RocksIterator it = null;
+ try {
+ it = db.newIterator();
+ if (startKey == null) {
+ it.seekToFirst();
+ } else {
+ if(get(startKey) == null) {
+ throw new IOException("Invalid start key, not found in current db");
+ }
+ it.seek(startKey);
+ }
+ while(it.isValid() && result.size() < count) {
+ byte[] currentKey = it.key();
+ byte[] currentValue = it.value();
+
+ it.prev();
+ final byte[] prevKey = it.isValid() ? it.key() : null;
+
+ it.seek(currentKey);
+ it.next();
+ final byte[] nextKey = it.isValid() ? it.key() : null;
+
+ if (filters == null || Arrays.asList(filters).stream()
+ .allMatch(entry -> entry.filterKey(prevKey,
+ currentKey, nextKey))) {
+ result.add(new AbstractMap.SimpleImmutableEntry<>(currentKey,
+ currentValue));
+ }
+ }
+ } finally {
+ if (it != null) {
+ it.close();
+ }
+ long end = System.currentTimeMillis();
+ long timeConsumed = end - start;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Time consumed for getRangeKVs() is {}ms,"
+ + " result length is {}.", timeConsumed, result.size());
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public void writeBatch(BatchOperation operation)
+ throws IOException {
+ List<BatchOperation.SingleOperation> operations =
+ operation.getOperations();
+ if (!operations.isEmpty()) {
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ for (BatchOperation.SingleOperation opt : operations) {
+ switch (opt.getOpt()) {
+ case DELETE:
+ writeBatch.remove(opt.getKey());
+ break;
+ case PUT:
+ writeBatch.put(opt.getKey(), opt.getValue());
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid operation "
+ + opt.getOpt());
+ }
+ }
+ db.write(writeOptions, writeBatch);
+ } catch (RocksDBException e) {
+ throw toIOException("Batch write operation failed", e);
+ }
+ }
+ }
+
+ @Override
+ public void compactDB() throws IOException {
+ if (db != null) {
+ try {
+ db.compactRange();
+ } catch (RocksDBException e) {
+ throw toIOException("Failed to compact db", e);
+ }
+ }
+ }
+
+ private void deleteQuietly(File fileOrDir) {
+ if (fileOrDir != null && fileOrDir.exists()) {
+ try {
+ FileUtils.forceDelete(fileOrDir);
+ } catch (IOException e) {
+ LOG.warn("Failed to delete dir {}", fileOrDir.getAbsolutePath(), e);
+ }
+ }
+ }
+
+ @Override
+ public void destroy() throws IOException {
+ // Make sure db is closed.
+ close();
+
+ // There is no destroydb java API available,
+ // equivalently we can delete all db directories.
+ deleteQuietly(dbLocation);
+ deleteQuietly(new File(dbOptions.dbLogDir()));
+ deleteQuietly(new File(dbOptions.walDir()));
+ List<DbPath> dbPaths = dbOptions.dbPaths();
+ if (dbPaths != null) {
+ dbPaths.forEach(dbPath -> {
+ deleteQuietly(new File(dbPath.toString()));
+ });
+ }
+ }
+
+ @Override
+ public ImmutablePair<byte[], byte[]> peekAround(int offset,
+ byte[] from) throws IOException, IllegalArgumentException {
+ RocksIterator it = null;
+ try {
+ it = db.newIterator();
+ if (from == null) {
+ it.seekToFirst();
+ } else {
+ it.seek(from);
+ }
+ if (!it.isValid()) {
+ throw new IOException("Key not found");
+ }
+
+ switch (offset) {
+ case 0:
+ break;
+ case 1:
+ it.next();
+ break;
+ case -1:
+ it.prev();
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Position can only be -1, 0 " + "or 1, but found " + offset);
+ }
+ return it.isValid() ? new ImmutablePair<>(it.key(), it.value()) : null;
+ } finally {
+ if (it != null) {
+ it.close();
+ }
+ }
+ }
+
+ @Override
+ public void iterate(byte[] from, EntryConsumer consumer)
+ throws IOException {
+ RocksIterator it = null;
+ try {
+ it = db.newIterator();
+ if (from != null) {
+ it.seek(from);
+ } else {
+ it.seekToFirst();
+ }
+ while (it.isValid()) {
+ if (!consumer.consume(it.key(), it.value())) {
+ break;
+ }
+ it.next();
+ }
+ } finally {
+ if (it != null) {
+ it.close();
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (db != null) {
+ db.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0edc4c3d/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
index bd29ce4..133bcb2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
@@ -598,7 +598,7 @@
<property>
<name>ozone.metastore.impl</name>
- <value>LevelDB</value>
+ <value>RocksDB</value>
<description>
Ozone metadata store implementation. Ozone metadata are well distributed
to multiple services such as ksm, scm. They are stored in some local
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0edc4c3d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMetadataStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMetadataStore.java
index 0000e50..143ea94 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMetadataStore.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMetadataStore.java
@@ -22,6 +22,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.utils.BatchOperation;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
@@ -33,6 +34,8 @@ import org.junit.After;
import org.junit.Test;
import org.junit.Assert;
import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
@@ -40,15 +43,35 @@ import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.util.UUID;
+import java.util.Collection;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.runners.Parameterized.*;
/**
* Test class for ozone metadata store.
*/
+@RunWith(Parameterized.class)
public class TestMetadataStore {
+ private final String storeImpl;
+
+ public TestMetadataStore(String metadataImpl) {
+ this.storeImpl = metadataImpl;
+ }
+
+ @Parameters
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB},
+ {OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB}
+ });
+ }
+
private MetadataStore store;
private File testDir;
-
private final static int MAX_GETRANGE_LENGTH = 100;
@Rule
@@ -56,11 +79,11 @@ public class TestMetadataStore {
@Before
public void init() throws IOException {
- testDir = GenericTestUtils.getTestDir(getClass().getSimpleName());
+ testDir = GenericTestUtils.getTestDir(getClass().getSimpleName()
+ + "-" + storeImpl.toLowerCase());
Configuration conf = new OzoneConfiguration();
- conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL,
- OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB);
+ conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, storeImpl);
store = MetadataStoreBuilder.newBuilder()
.setConf(conf)
@@ -293,4 +316,72 @@ public class TestMetadataStore {
expectedException.expectMessage("Invalid start key");
store.getRangeKVs(getBytes("unknownKey"), MAX_GETRANGE_LENGTH);
}
+
+ @Test
+ public void testDestroyDB() throws IOException {
+ // create a new DB to test db destroy
+ Configuration conf = new OzoneConfiguration();
+ conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, storeImpl);
+
+ File dbDir = GenericTestUtils.getTestDir(getClass().getSimpleName()
+ + "-" + storeImpl.toLowerCase() + "-toDestroy");
+ MetadataStore dbStore = MetadataStoreBuilder.newBuilder()
+ .setConf(conf)
+ .setCreateIfMissing(true)
+ .setDbFile(dbDir)
+ .build();
+
+ dbStore.put(getBytes("key1"), getBytes("value1"));
+ dbStore.put(getBytes("key2"), getBytes("value2"));
+
+ Assert.assertFalse(dbStore.isEmpty());
+ Assert.assertTrue(dbDir.exists());
+ Assert.assertTrue(dbDir.listFiles().length > 0);
+
+ dbStore.destroy();
+
+ Assert.assertFalse(dbDir.exists());
+ }
+
+ @Test
+ public void testBatchWrite() throws IOException {
+ Configuration conf = new OzoneConfiguration();
+ conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, storeImpl);
+
+ File dbDir = GenericTestUtils.getTestDir(getClass().getSimpleName()
+ + "-" + storeImpl.toLowerCase() + "-batchWrite");
+ MetadataStore dbStore = MetadataStoreBuilder.newBuilder()
+ .setConf(conf)
+ .setCreateIfMissing(true)
+ .setDbFile(dbDir)
+ .build();
+
+ List<String> expectedResult = Lists.newArrayList();
+ for (int i = 0; i<10; i++) {
+ dbStore.put(getBytes("batch-" + i), getBytes("batch-value-" + i));
+ expectedResult.add("batch-" + i);
+ }
+
+ BatchOperation batch = new BatchOperation();
+ batch.delete(getBytes("batch-2"));
+ batch.delete(getBytes("batch-3"));
+ batch.delete(getBytes("batch-4"));
+ batch.put(getBytes("batch-new-2"), getBytes("batch-new-value-2"));
+
+ expectedResult.remove("batch-2");
+ expectedResult.remove("batch-3");
+ expectedResult.remove("batch-4");
+ expectedResult.add("batch-new-2");
+
+ dbStore.writeBatch(batch);
+
+ Iterator<String> it = expectedResult.iterator();
+ AtomicInteger count = new AtomicInteger(0);
+ dbStore.iterate(null, (key, value) -> {
+ count.incrementAndGet();
+ return it.hasNext() && it.next().equals(getString(key));
+ });
+
+ Assert.assertEquals(8, count.get());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org