You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bh...@apache.org on 2019/07/16 00:20:28 UTC
[hadoop] 01/01: HDDS-1802. Add Eviction policy for table cache.
This is an automated email from the ASF dual-hosted git repository.
bharat pushed a commit to branch HDDS-1802
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 73d58e9cd33917313b5c1801e6b0a851fcd57b63
Author: Bharat Viswanadham <bh...@apache.org>
AuthorDate: Mon Jul 15 17:20:01 2019 -0700
HDDS-1802. Add Eviction policy for table cache.
---
.../java/org/apache/hadoop/utils/db/DBStore.java | 14 +++-
.../java/org/apache/hadoop/utils/db/RDBStore.java | 9 +++
.../org/apache/hadoop/utils/db/TypedTable.java | 75 +++++++++++++++++++---
...{PartialTableCache.java => TableCacheImpl.java} | 31 +++++++--
...tialTableCache.java => TestTableCacheImpl.java} | 71 ++++++++++++++------
.../hadoop/ozone/om/OmMetadataManagerImpl.java | 7 +-
6 files changed, 168 insertions(+), 39 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java
index d01dfe4..95e57d9 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.utils.db.cache.TableCacheImpl;
/**
* The DBStore interface provides the ability to create Tables, which store
@@ -47,7 +48,9 @@ public interface DBStore extends AutoCloseable {
/**
- * Gets an existing TableStore with implicit key/value conversion.
+ * Gets an existing TableStore with implicit key/value conversion and
+ * with default cleanup policy for cache. Default policy is cache cleanup
+ * after flush to DB is completed.
*
* @param name - Name of the TableStore to get
* @param keyType
@@ -59,6 +62,15 @@ public interface DBStore extends AutoCloseable {
Class<KEY> keyType, Class<VALUE> valueType) throws IOException;
/**
+ * Gets an existing TableStore with implicit key/value conversion and
+ * with specified cleanup policy for cache.
+ * @throws IOException
+ */
+ <KEY, VALUE> Table<KEY, VALUE> getTable(String name,
+ Class<KEY> keyType, Class<VALUE> valueType,
+ TableCacheImpl.CacheCleanupPolicy cleanupPolicy) throws IOException;
+
+ /**
* Lists the Known list of Tables in a DB.
*
* @return List of Tables, in case of Rocks DB and LevelDB we will return at
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java
index 27862c7..23c03f1 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.utils.RocksDBStoreMBean;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.utils.db.cache.TableCacheImpl;
import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
@@ -261,6 +262,14 @@ public class RDBStore implements DBStore {
}
@Override
+ public <KEY, VALUE> Table<KEY, VALUE> getTable(String name,
+ Class<KEY> keyType, Class<VALUE> valueType,
+ TableCacheImpl.CacheCleanupPolicy cleanupPolicy) throws IOException {
+ return new TypedTable<KEY, VALUE>(getTable(name), codecRegistry, keyType,
+ valueType, cleanupPolicy);
+ }
+
+ @Override
public ArrayList<Table> listTables() throws IOException {
ArrayList<Table> returnList = new ArrayList<>();
for (ColumnFamilyHandle handle : handleTable.values()) {
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java
index 05f73b8..3af498c 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java
@@ -23,9 +23,10 @@ import java.util.Iterator;
import java.util.Map;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
import org.apache.hadoop.utils.db.cache.CacheKey;
import org.apache.hadoop.utils.db.cache.CacheValue;
-import org.apache.hadoop.utils.db.cache.PartialTableCache;
+import org.apache.hadoop.utils.db.cache.TableCacheImpl;
import org.apache.hadoop.utils.db.cache.TableCache;
/**
@@ -49,7 +50,17 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
private final TableCache<CacheKey<KEY>, CacheValue<VALUE>> cache;
+ private final TableCacheImpl.CacheCleanupPolicy cacheCleanupPolicy;
+
+ /**
+ * Create an TypedTable from the raw table.
+ * Default cleanup policy used for the table is cleanup after flush.
+ * @param rawTable
+ * @param codecRegistry
+ * @param keyType
+ * @param valueType
+ */
public TypedTable(
Table<byte[], byte[]> rawTable,
CodecRegistry codecRegistry, Class<KEY> keyType,
@@ -58,7 +69,29 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
this.codecRegistry = codecRegistry;
this.keyType = keyType;
this.valueType = valueType;
- cache = new PartialTableCache<>();
+ this.cacheCleanupPolicy = TableCacheImpl.CacheCleanupPolicy.AFTERFLUSH;
+ cache = new TableCacheImpl<>(cacheCleanupPolicy);
+ }
+
+ /**
+ * Create an TypedTable from the raw table with specified cleanup policy
+ * for table cache.
+ * @param rawTable
+ * @param codecRegistry
+ * @param keyType
+ * @param valueType
+ * @param cleanupPolicy
+ */
+ public TypedTable(
+ Table<byte[], byte[]> rawTable,
+ CodecRegistry codecRegistry, Class<KEY> keyType,
+ Class<VALUE> valueType, TableCacheImpl.CacheCleanupPolicy cleanupPolicy) {
+ this.rawTable = rawTable;
+ this.codecRegistry = codecRegistry;
+ this.keyType = keyType;
+ this.valueType = valueType;
+ this.cacheCleanupPolicy = cleanupPolicy;
+ cache = new TableCacheImpl<>(cacheCleanupPolicy);
}
@Override
@@ -83,11 +116,22 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
@Override
public boolean isExist(KEY key) throws IOException {
- CacheValue<VALUE> cacheValue= cache.get(new CacheKey<>(key));
- return (cacheValue != null && cacheValue.getCacheValue() != null) ||
+
+ if (cacheCleanupPolicy == TableCacheImpl.CacheCleanupPolicy.NEVER) {
+ return isExistForCleanupPolicyNever(key);
+ }
+
+ return isExistForCleanupPolicyNever(key) ||
rawTable.isExist(codecRegistry.asRawData(key));
}
+ private boolean isExistForCleanupPolicyNever(KEY key) {
+ // If cache cleanup policy is NEVER, entire table is cached. So, no need
+ // to check from DB.
+ CacheValue<VALUE> cacheValue= cache.get(new CacheKey<>(key));
+ return (cacheValue != null && cacheValue.getCacheValue() != null);
+ }
+
/**
* Returns the value mapped to the given key in byte array or returns null
* if the key is not found.
@@ -104,14 +148,25 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
public VALUE get(KEY key) throws IOException {
// Here the metadata lock will guarantee that cache is not updated for same
// key during get key.
- CacheValue< VALUE > cacheValue = cache.get(new CacheKey<>(key));
- if (cacheValue == null) {
- // If no cache for the table or if it does not exist in cache get from
- // RocksDB table.
+
+ // First get from cache. If it has return value.
+ // If it does not have
+ // If cache cleanup policy is NEVER return null. Because cache here is
+ // full table data in-memory, so no need to get from underlying rocksdb
+ // table.
+ // If cache cleanup policy is AFTERFLUSH return from underlying rocksdb
+ // table. As it might have been cleaned up from cache, might be there in
+ // DB.
+ CacheValue<VALUE> cacheValue =
+ Optional.fromNullable(cache.get(new CacheKey<>(key))).orNull();
+ if (cacheValue != null) {
+ return cacheValue.getCacheValue();
+ }
+
+ if (cacheCleanupPolicy == TableCacheImpl.CacheCleanupPolicy.AFTERFLUSH) {
return getFromTable(key);
} else {
- // We have a value in cache, return the value.
- return cacheValue.getCacheValue();
+ return null;
}
}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/PartialTableCache.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/TableCacheImpl.java
similarity index 77%
rename from hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/PartialTableCache.java
rename to hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/TableCacheImpl.java
index 7d16d04..5e14e2a 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/PartialTableCache.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/TableCacheImpl.java
@@ -32,21 +32,28 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
/**
- * Cache implementation for the table, this cache is partial cache, this will
- * be cleaned up, after entries are flushed to DB.
+ * Cache implementation for the table. Depending on the cache clean up policy
+ * this cache will be full cache or partial cache.
+ *
+ * If cache cleanup policy is set as {@link CacheCleanupPolicy#AFTERFLUSH},
+ * this will be a partial cache.
+ *
+ * If cache cleanup policy is set as {@link CacheCleanupPolicy#NEVER},
+ * this will be a full cache.
*/
@Private
@Evolving
-public class PartialTableCache<CACHEKEY extends CacheKey,
+public class TableCacheImpl<CACHEKEY extends CacheKey,
CACHEVALUE extends CacheValue> implements TableCache<CACHEKEY, CACHEVALUE> {
private final ConcurrentHashMap<CACHEKEY, CACHEVALUE> cache;
private final TreeSet<EpochEntry<CACHEKEY>> epochEntries;
private ExecutorService executorService;
+ private CacheCleanupPolicy cleanupPolicy;
- public PartialTableCache() {
+ public TableCacheImpl(CacheCleanupPolicy cleanupPolicy) {
cache = new ConcurrentHashMap<>();
epochEntries = new TreeSet<>();
// Created a singleThreadExecutor, so one cleanup will be running at a
@@ -54,7 +61,7 @@ public class PartialTableCache<CACHEKEY extends CacheKey,
ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("PartialTableCache Cleanup Thread - %d").build();
executorService = Executors.newSingleThreadExecutor(build);
-
+ this.cleanupPolicy = cleanupPolicy;
}
@Override
@@ -70,7 +77,10 @@ public class PartialTableCache<CACHEKEY extends CacheKey,
@Override
public void cleanup(long epoch) {
- executorService.submit(() -> evictCache(epoch));
+ // If it is never do nothing.
+ if (cleanupPolicy == CacheCleanupPolicy.AFTERFLUSH) {
+ executorService.submit(() -> evictCache(epoch));
+ }
}
@Override
@@ -103,4 +113,13 @@ public class PartialTableCache<CACHEKEY extends CacheKey,
}
}
}
+
+ /**
+ * Cleanup policies for table cache.
+ */
+ public enum CacheCleanupPolicy {
+ NEVER, // Cache will not be cleaned up. This mean's the table maintains
+ // full cache.
+ AFTERFLUSH // Cache will be cleaned up, once after flushing to DB.
+ }
}
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/TestPartialTableCache.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/TestTableCacheImpl.java
similarity index 66%
rename from hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/TestPartialTableCache.java
rename to hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/TestTableCacheImpl.java
index 736ae1b..8aeb5f7 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/TestPartialTableCache.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/TestTableCacheImpl.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.utils.db.cache;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import com.google.common.base.Optional;
@@ -26,18 +28,40 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import static org.junit.Assert.fail;
/**
* Class tests partial table cache.
*/
-public class TestPartialTableCache {
+@RunWith(value = Parameterized.class)
+public class TestTableCacheImpl {
private TableCache<CacheKey<String>, CacheValue<String>> tableCache;
+ private final TableCacheImpl.CacheCleanupPolicy cacheCleanupPolicy;
+
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> policy() {
+ Object[][] params = new Object[][] {
+ {TableCacheImpl.CacheCleanupPolicy.NEVER},
+ {TableCacheImpl.CacheCleanupPolicy.AFTERFLUSH}
+ };
+ return Arrays.asList(params);
+ }
+
+ public TestTableCacheImpl(
+ TableCacheImpl.CacheCleanupPolicy cacheCleanupPolicy) {
+ this.cacheCleanupPolicy = cacheCleanupPolicy;
+ }
+
+
@Before
public void create() {
- tableCache = new PartialTableCache<>();
+ tableCache =
+ new TableCacheImpl<>(cacheCleanupPolicy);
}
@Test
public void testPartialTableCache() {
@@ -98,33 +122,40 @@ public class TestPartialTableCache {
tableCache.get(new CacheKey<>(Integer.toString(i))).getCacheValue());
}
- int deleted = 5;
- // cleanup first 5 entires
- tableCache.cleanup(deleted);
value = future.get();
Assert.assertEquals(10, value);
totalCount += value;
- // We should totalCount - deleted entries in cache.
- final int tc = totalCount;
- GenericTestUtils.waitFor(() -> (tc - deleted == tableCache.size()), 100,
- 5000);
-
- // Check if we have remaining entries.
- for (int i=6; i <= totalCount; i++) {
- Assert.assertEquals(Integer.toString(i),
- tableCache.get(new CacheKey<>(Integer.toString(i))).getCacheValue());
+ if (cacheCleanupPolicy == TableCacheImpl.CacheCleanupPolicy.AFTERFLUSH) {
+ int deleted = 5;
+
+ // cleanup first 5 entires
+ tableCache.cleanup(deleted);
+
+ // We should totalCount - deleted entries in cache.
+ final int tc = totalCount;
+ GenericTestUtils.waitFor(() -> (tc - deleted == tableCache.size()), 100,
+ 5000);
+ // Check if we have remaining entries.
+ for (int i=6; i <= totalCount; i++) {
+ Assert.assertEquals(Integer.toString(i), tableCache.get(
+ new CacheKey<>(Integer.toString(i))).getCacheValue());
+ }
+ tableCache.cleanup(10);
+
+ tableCache.cleanup(totalCount);
+
+ // Cleaned up all entries, so cache size should be zero.
+ GenericTestUtils.waitFor(() -> (0 == tableCache.size()), 100,
+ 5000);
+ } else {
+ tableCache.cleanup(totalCount);
+ Assert.assertEquals(totalCount, tableCache.size());
}
- tableCache.cleanup(10);
-
- tableCache.cleanup(totalCount);
- // Cleaned up all entries, so cache size should be zero.
- GenericTestUtils.waitFor(() -> (0 == tableCache.size()), 100,
- 5000);
}
private int writeToCache(int count, int startVal, long sleep)
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 88fd24c..c7d6bb4 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -69,6 +69,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import org.apache.hadoop.utils.db.TypedTable;
import org.apache.hadoop.utils.db.cache.CacheKey;
import org.apache.hadoop.utils.db.cache.CacheValue;
+import org.apache.hadoop.utils.db.cache.TableCacheImpl;
import org.eclipse.jetty.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -269,11 +270,13 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
this.store.getTable(USER_TABLE, String.class, VolumeList.class);
checkTableStatus(userTable, USER_TABLE);
volumeTable =
- this.store.getTable(VOLUME_TABLE, String.class, OmVolumeArgs.class);
+ this.store.getTable(VOLUME_TABLE, String.class, OmVolumeArgs.class,
+ TableCacheImpl.CacheCleanupPolicy.NEVER);
checkTableStatus(volumeTable, VOLUME_TABLE);
bucketTable =
- this.store.getTable(BUCKET_TABLE, String.class, OmBucketInfo.class);
+ this.store.getTable(BUCKET_TABLE, String.class, OmBucketInfo.class,
+ TableCacheImpl.CacheCleanupPolicy.NEVER);
checkTableStatus(bucketTable, BUCKET_TABLE);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org