You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2011/08/03 22:28:19 UTC
svn commit: r1153646 - in /hbase/trunk/src:
main/java/org/apache/hadoop/hbase/util/DynamicByteBloomFilter.java
main/java/org/apache/hadoop/hbase/util/IdLock.java
test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
Author: tedyu
Date: Wed Aug 3 20:28:18 2011
New Revision: 1153646
URL: http://svn.apache.org/viewvc?rev=1153646&view=rev
Log:
HBASE-3857 New files
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
Removed:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/DynamicByteBloomFilter.java
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/IdLock.java?rev=1153646&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/IdLock.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/IdLock.java Wed Aug 3 20:28:18 2011
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Allows multiple concurrent clients to lock on a numeric id with a minimal
+ * memory overhead. The intended usage is as follows:
+ *
+ * <pre>
+ * IdLock.Entry lockEntry = idLock.getLockEntry(id);
+ * try {
+ * // User code.
+ * } finally {
+ * idLock.releaseLockEntry(lockEntry);
+ * }</pre>
+ */
+public class IdLock {
+
+ /** An entry returned to the client as a lock object */
+ public static class Entry {
+ private final long id;
+ private int numWaiters;
+ private boolean isLocked = true;
+
+ private Entry(long id) {
+ this.id = id;
+ }
+
+ public String toString() {
+ return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked="
+ + isLocked;
+ }
+ }
+
+ private ConcurrentMap<Long, Entry> map =
+ new ConcurrentHashMap<Long, Entry>();
+
+ /**
+ * Blocks until the lock corresponding to the given id is acquired.
+ *
+ * @param id an arbitrary number to lock on
+ * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release
+ * the lock
+ * @throws IOException if interrupted
+ */
+ public Entry getLockEntry(long id) throws IOException {
+ Entry entry = new Entry(id);
+ Entry existing;
+ while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
+ synchronized (existing) {
+ if (existing.isLocked) {
+ ++existing.numWaiters; // Add ourselves to waiters.
+ while (existing.isLocked) {
+ try {
+ existing.wait();
+ } catch (InterruptedException e) {
+ --existing.numWaiters; // Remove ourselves from waiters.
+ throw new InterruptedIOException(
+ "Interrupted waiting to acquire sparse lock");
+ }
+ }
+
+ --existing.numWaiters; // Remove ourselves from waiters.
+ existing.isLocked = true;
+ return existing;
+ }
+ // If the entry is not locked, it might already be deleted from the
+ // map, so we cannot return it. We need to get our entry into the map
+ // or get someone else's locked entry.
+ }
+ }
+ return entry;
+ }
+
+ /**
+ * Must be called in a finally block to decrease the internal counter and
+ * remove the monitor object for the given id if the caller is the last
+ * client.
+ *
+ * @param entry the return value of {@link #getLockEntry(long)}
+ */
+ public void releaseLockEntry(Entry entry) {
+ synchronized (entry) {
+ entry.isLocked = false;
+ if (entry.numWaiters > 0) {
+ entry.notify();
+ } else {
+ map.remove(entry.id);
+ }
+ }
+ }
+
+ /** For testing */
+ void assertMapEmpty() {
+ assert map.size() == 0;
+ }
+
+}
Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java?rev=1153646&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java Wed Aug 3 20:28:18 2011
@@ -0,0 +1,229 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex;
+import org.apache.hadoop.hbase.io.hfile.HFileReaderV2;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.io.hfile.TestHFileWriterV2;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.BloomFilterFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import static org.junit.Assert.*;
+
+/**
+ * Tests {@link HFile} cache-on-write functionality for the following block
+ * types: data blocks, non-root index blocks, and Bloom filter blocks.
+ */
+@RunWith(Parameterized.class)
+public class TestCacheOnWrite {
+
+ private static final Log LOG = LogFactory.getLog(TestCacheOnWrite.class);
+
+ private static final HBaseTestingUtility TEST_UTIL =
+ new HBaseTestingUtility();
+ private Configuration conf;
+ private FileSystem fs;
+ private Random rand = new Random(12983177L);
+ private Path storeFilePath;
+ private Compression.Algorithm compress;
+ private CacheOnWriteType cowType;
+ private BlockCache blockCache;
+ private String testName;
+
+ private static final int DATA_BLOCK_SIZE = 2048;
+ private static final int NUM_KV = 25000;
+ private static final int INDEX_BLOCK_SIZE = 512;
+ private static final int BLOOM_BLOCK_SIZE = 4096;
+
+ /** The number of valid key types possible in a store file */
+ private static final int NUM_VALID_KEY_TYPES =
+ KeyValue.Type.values().length - 2;
+
+ private static enum CacheOnWriteType {
+ DATA_BLOCKS(BlockType.DATA, HFile.CACHE_BLOCKS_ON_WRITE_KEY),
+ BLOOM_BLOCKS(BlockType.BLOOM_CHUNK,
+ BloomFilterFactory.IO_STOREFILE_BLOOM_CACHE_ON_WRITE),
+ INDEX_BLOCKS(BlockType.LEAF_INDEX,
+ HFileBlockIndex.CACHE_INDEX_BLOCKS_ON_WRITE_KEY);
+
+ private final String confKey;
+ private final BlockType inlineBlockType;
+
+ private CacheOnWriteType(BlockType inlineBlockType, String confKey) {
+ this.inlineBlockType = inlineBlockType;
+ this.confKey = confKey;
+ }
+
+ public boolean shouldBeCached(BlockType blockType) {
+ return blockType == inlineBlockType
+ || blockType == BlockType.INTERMEDIATE_INDEX
+ && inlineBlockType == BlockType.LEAF_INDEX;
+ }
+
+ public void modifyConf(Configuration conf) {
+ for (CacheOnWriteType cowType : CacheOnWriteType.values())
+ conf.setBoolean(cowType.confKey, cowType == this);
+ }
+
+ }
+
+ public TestCacheOnWrite(CacheOnWriteType cowType,
+ Compression.Algorithm compress) {
+ this.cowType = cowType;
+ this.compress = compress;
+ testName = "[cacheOnWrite=" + cowType + ", compress=" + compress + "]";
+ }
+
+ @Parameters
+ public static Collection<Object[]> getParameters() {
+ List<Object[]> cowTypes = new ArrayList<Object[]>();
+ for (CacheOnWriteType cowType : CacheOnWriteType.values())
+ for (Compression.Algorithm compress :
+ HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
+ cowTypes.add(new Object[] { cowType, compress });
+ }
+ return cowTypes;
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ conf = TEST_UTIL.getConfiguration();
+ conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
+ conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE);
+ conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE,
+ BLOOM_BLOCK_SIZE);
+ cowType.modifyConf(conf);
+ fs = FileSystem.get(conf);
+ blockCache = StoreFile.getBlockCache(conf);
+ }
+
+ @After
+ public void tearDown() {
+ blockCache.evictBlocksByPrefix("");
+ }
+
+ @Test
+ public void testCacheOnWrite() throws IOException {
+ writeStoreFile();
+ readStoreFile();
+ }
+
+ private void readStoreFile() throws IOException {
+ HFileReaderV2 reader = (HFileReaderV2) HFile.createReader(fs,
+ storeFilePath, null, false, false);
+ LOG.info("HFile information: " + reader);
+ HFileScanner scanner = reader.getScanner(false, false);
+ assertTrue(testName, scanner.seekTo());
+
+ long offset = 0;
+ HFileBlock prevBlock = null;
+ EnumMap<BlockType, Integer> blockCountByType =
+ new EnumMap<BlockType, Integer>(BlockType.class);
+
+ while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
+ HFileBlock block = reader.readBlockData(offset, prevBlock == null ? -1
+ : prevBlock.getNextBlockOnDiskSizeWithHeader(), -1, false);
+ String blockCacheKey = HFile.getBlockCacheKey(reader.getName(), offset);
+ boolean isCached = blockCache.getBlock(blockCacheKey, true) != null;
+ boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType());
+ assertEquals(testName + " " + block, shouldBeCached, isCached);
+ prevBlock = block;
+ offset += block.getOnDiskSizeWithHeader();
+ BlockType bt = block.getBlockType();
+ Integer count = blockCountByType.get(bt);
+ blockCountByType.put(bt, (count == null ? 0 : count) + 1);
+ }
+
+ LOG.info("Block count by type: " + blockCountByType);
+ assertEquals(
+ "{DATA=1367, LEAF_INDEX=172, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}",
+ blockCountByType.toString());
+
+ reader.close();
+ }
+
+ public static KeyValue.Type generateKeyType(Random rand) {
+ if (rand.nextBoolean()) {
+ // Let's make half of KVs puts.
+ return KeyValue.Type.Put;
+ } else {
+ KeyValue.Type keyType =
+ KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
+ if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum)
+ {
+ throw new RuntimeException("Generated an invalid key type: " + keyType
+ + ". " + "Probably the layout of KeyValue.Type has changed.");
+ }
+ return keyType;
+ }
+ }
+
+ public void writeStoreFile() throws IOException {
+ Path storeFileParentDir = new Path(HBaseTestingUtility.getTestDir(),
+ "test_cache_on_write");
+ StoreFile.Writer sfw = StoreFile.createWriter(fs, storeFileParentDir,
+ DATA_BLOCK_SIZE, compress, KeyValue.COMPARATOR, conf,
+ StoreFile.BloomType.ROWCOL, NUM_KV);
+
+ final int rowLen = 32;
+ for (int i = 0; i < NUM_KV; ++i) {
+ byte[] k = TestHFileWriterV2.randomOrderedKey(rand, i);
+ byte[] v = TestHFileWriterV2.randomValue(rand);
+ int cfLen = rand.nextInt(k.length - rowLen + 1);
+ KeyValue kv = new KeyValue(
+ k, 0, rowLen,
+ k, rowLen, cfLen,
+ k, rowLen + cfLen, k.length - rowLen - cfLen,
+ rand.nextLong(),
+ generateKeyType(rand),
+ v, 0, v.length);
+ sfw.append(kv);
+ }
+
+ sfw.close();
+ storeFilePath = sfw.getPath();
+ }
+
+}