You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/09/25 01:58:46 UTC
[04/11] hbase git commit: HBASE-18825 Use HStoreFile instead of
StoreFile in our own code base and remove unnecessary methods in StoreFile
interface
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
new file mode 100644
index 0000000..b685115
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -0,0 +1,1743 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.lang.ref.SoftReference;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MemoryCompactionPolicy;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
+import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
+import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.util.Progressable;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+
+/**
+ * Test class for the Store
+ */
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestHStore {
+ private static final Log LOG = LogFactory.getLog(TestHStore.class);
+ @Rule
+ public TestName name = new TestName();
+
+ HStore store;
+ byte [] table = Bytes.toBytes("table");
+ byte [] family = Bytes.toBytes("family");
+
+ byte [] row = Bytes.toBytes("row");
+ byte [] row2 = Bytes.toBytes("row2");
+ byte [] qf1 = Bytes.toBytes("qf1");
+ byte [] qf2 = Bytes.toBytes("qf2");
+ byte [] qf3 = Bytes.toBytes("qf3");
+ byte [] qf4 = Bytes.toBytes("qf4");
+ byte [] qf5 = Bytes.toBytes("qf5");
+ byte [] qf6 = Bytes.toBytes("qf6");
+
+ NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR);
+
+ List<Cell> expected = new ArrayList<>();
+ List<Cell> result = new ArrayList<>();
+
+ long id = System.currentTimeMillis();
+ Get get = new Get(row);
+
+ private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
+
+
+ /**
+ * Setup
+ * @throws IOException
+ */
+ @Before
+ public void setUp() throws IOException {
+ qualifiers.add(qf1);
+ qualifiers.add(qf3);
+ qualifiers.add(qf5);
+
+ Iterator<byte[]> iter = qualifiers.iterator();
+ while(iter.hasNext()){
+ byte [] next = iter.next();
+ expected.add(new KeyValue(row, family, next, 1, (byte[])null));
+ get.addColumn(family, next);
+ }
+ }
+
+ private void init(String methodName) throws IOException {
+ init(methodName, TEST_UTIL.getConfiguration());
+ }
+
+ private Store init(String methodName, Configuration conf) throws IOException {
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ // some of the tests write 4 versions and then flush
+ // (with HBASE-4241, lower versions are collected on flush)
+ hcd.setMaxVersions(4);
+ return init(methodName, conf, hcd);
+ }
+
+ private HStore init(String methodName, Configuration conf, HColumnDescriptor hcd)
+ throws IOException {
+ HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
+ return init(methodName, conf, htd, hcd);
+ }
+
+ private HStore init(String methodName, Configuration conf, HTableDescriptor htd,
+ HColumnDescriptor hcd) throws IOException {
+ return init(methodName, conf, htd, hcd, null);
+ }
+
+ @SuppressWarnings("deprecation")
+ private HStore init(String methodName, Configuration conf, HTableDescriptor htd,
+ HColumnDescriptor hcd, MyStoreHook hook) throws IOException {
+ return init(methodName, conf, htd, hcd, hook, false);
+ }
+ @SuppressWarnings("deprecation")
+ private HStore init(String methodName, Configuration conf, HTableDescriptor htd,
+ HColumnDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
+ //Setting up a Store
+ Path basedir = new Path(DIR+methodName);
+ Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
+ final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName));
+
+ FileSystem fs = FileSystem.get(conf);
+
+ fs.delete(logdir, true);
+
+ if (htd.hasFamily(hcd.getName())) {
+ htd.modifyFamily(hcd);
+ } else {
+ htd.addFamily(hcd);
+ }
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
+ MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null);
+ HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
+ final Configuration walConf = new Configuration(conf);
+ FSUtils.setRootDir(walConf, basedir);
+ final WALFactory wals = new WALFactory(walConf, null, methodName);
+ HRegion region = new HRegion(tableDir, wals.getWAL(info.getEncodedNameAsBytes(),
+ info.getTable().getNamespace()), fs, conf, info, htd, null);
+ if (hook == null) {
+ store = new HStore(region, hcd, conf);
+ } else {
+ store = new MyStore(region, hcd, conf, hook, switchToPread);
+ }
+ return store;
+ }
+
+ /**
+ * Test we do not lose data if we fail a flush and then close.
+ * Part of HBase-10466
+ * @throws Exception
+ */
+ @Test
+ public void testFlushSizeAccounting() throws Exception {
+ LOG.info("Setting up a faulty file system that cannot write in " +
+ this.name.getMethodName());
+ final Configuration conf = HBaseConfiguration.create();
+ // Only retry once.
+ conf.setInt("hbase.hstore.flush.retries.number", 1);
+ User user = User.createUserForTesting(conf, this.name.getMethodName(),
+ new String[]{"foo"});
+ // Inject our faulty LocalFileSystem
+ conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
+ user.runAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ // Make sure it worked (above is sensitive to caching details in hadoop core)
+ FileSystem fs = FileSystem.get(conf);
+ assertEquals(FaultyFileSystem.class, fs.getClass());
+ FaultyFileSystem ffs = (FaultyFileSystem)fs;
+
+ // Initialize region
+ init(name.getMethodName(), conf);
+
+ MemstoreSize size = store.memstore.getFlushableSize();
+ assertEquals(0, size.getDataSize());
+ LOG.info("Adding some data");
+ MemstoreSize kvSize = new MemstoreSize();
+ store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
+ // add the heap size of active (mutable) segment
+ kvSize.incMemstoreSize(0, MutableSegment.DEEP_OVERHEAD);
+ size = store.memstore.getFlushableSize();
+ assertEquals(kvSize, size);
+ // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right.
+ try {
+ LOG.info("Flushing");
+ flushStore(store, id++);
+ fail("Didn't bubble up IOE!");
+ } catch (IOException ioe) {
+ assertTrue(ioe.getMessage().contains("Fault injected"));
+ }
+ // due to snapshot, change mutable to immutable segment
+ kvSize.incMemstoreSize(0,
+ CSLMImmutableSegment.DEEP_OVERHEAD_CSLM-MutableSegment.DEEP_OVERHEAD);
+ size = store.memstore.getFlushableSize();
+ assertEquals(kvSize, size);
+ MemstoreSize kvSize2 = new MemstoreSize();
+ store.add(new KeyValue(row, family, qf2, 2, (byte[])null), kvSize2);
+ kvSize2.incMemstoreSize(0, MutableSegment.DEEP_OVERHEAD);
+ // Even though we add a new kv, we expect the flushable size to be 'same' since we have
+ // not yet cleared the snapshot -- the above flush failed.
+ assertEquals(kvSize, size);
+ ffs.fault.set(false);
+ flushStore(store, id++);
+ size = store.memstore.getFlushableSize();
+ // Size should be the foreground kv size.
+ assertEquals(kvSize2, size);
+ flushStore(store, id++);
+ size = store.memstore.getFlushableSize();
+ assertEquals(0, size.getDataSize());
+ assertEquals(MutableSegment.DEEP_OVERHEAD, size.getHeapSize());
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Verify that compression and data block encoding are respected by the
+ * Store.createWriterInTmp() method, used on store flush.
+ */
+ @Test
+ public void testCreateWriter() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ FileSystem fs = FileSystem.get(conf);
+
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ hcd.setCompressionType(Compression.Algorithm.GZ);
+ hcd.setDataBlockEncoding(DataBlockEncoding.DIFF);
+ init(name.getMethodName(), conf, hcd);
+
+ // Test createWriterInTmp()
+ StoreFileWriter writer = store.createWriterInTmp(4, hcd.getCompressionType(), false, true, false);
+ Path path = writer.getPath();
+ writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
+ writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
+ writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
+ writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
+ writer.close();
+
+ // Verify that compression and encoding settings are respected
+ HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
+ assertEquals(hcd.getCompressionType(), reader.getCompressionAlgorithm());
+ assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
+ reader.close();
+ }
+
+ @Test
+ public void testDeleteExpiredStoreFiles() throws Exception {
+ testDeleteExpiredStoreFiles(0);
+ testDeleteExpiredStoreFiles(1);
+ }
+
+ /*
+ * @param minVersions the MIN_VERSIONS for the column family
+ */
+ public void testDeleteExpiredStoreFiles(int minVersions) throws Exception {
+ int storeFileNum = 4;
+ int ttl = 4;
+ IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
+ EnvironmentEdgeManagerTestHelper.injectEdge(edge);
+
+ Configuration conf = HBaseConfiguration.create();
+ // Enable the expired store file deletion
+ conf.setBoolean("hbase.store.delete.expired.storefile", true);
+ // Set the compaction threshold higher to avoid normal compactions.
+ conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5);
+
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ hcd.setMinVersions(minVersions);
+ hcd.setTimeToLive(ttl);
+ init(name.getMethodName() + "-" + minVersions, conf, hcd);
+
+ long storeTtl = this.store.getScanInfo().getTtl();
+ long sleepTime = storeTtl / storeFileNum;
+ long timeStamp;
+ // There are 4 store files and the max time stamp difference among these
+ // store files will be (this.store.ttl / storeFileNum)
+ for (int i = 1; i <= storeFileNum; i++) {
+ LOG.info("Adding some data for the store file #" + i);
+ timeStamp = EnvironmentEdgeManager.currentTime();
+ this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
+ this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
+ this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
+ flush(i);
+ edge.incrementTime(sleepTime);
+ }
+
+ // Verify the total number of store files
+ assertEquals(storeFileNum, this.store.getStorefiles().size());
+
+ // Each call will find one expired store file and delete it before compaction happens.
+ // There will be no compaction due to threshold above. Last file will not be replaced.
+ for (int i = 1; i <= storeFileNum - 1; i++) {
+ // verify the expired store file.
+ assertFalse(this.store.requestCompaction().isPresent());
+ Collection<HStoreFile> sfs = this.store.getStorefiles();
+ // Ensure i files are gone.
+ if (minVersions == 0) {
+ assertEquals(storeFileNum - i, sfs.size());
+ // Ensure only non-expired files remain.
+ for (HStoreFile sf : sfs) {
+ assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl));
+ }
+ } else {
+ assertEquals(storeFileNum, sfs.size());
+ }
+ // Let the next store file expired.
+ edge.incrementTime(sleepTime);
+ }
+ assertFalse(this.store.requestCompaction().isPresent());
+
+ Collection<HStoreFile> sfs = this.store.getStorefiles();
+ // Assert the last expired file is not removed.
+ if (minVersions == 0) {
+ assertEquals(1, sfs.size());
+ }
+ long ts = sfs.iterator().next().getReader().getMaxTimestamp();
+ assertTrue(ts < (edge.currentTime() - storeTtl));
+
+ for (HStoreFile sf : sfs) {
+ sf.closeStoreFile(true);
+ }
+ }
+
+ @Test
+ public void testLowestModificationTime() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ FileSystem fs = FileSystem.get(conf);
+ // Initialize region
+ init(name.getMethodName(), conf);
+
+ int storeFileNum = 4;
+ for (int i = 1; i <= storeFileNum; i++) {
+ LOG.info("Adding some data for the store file #"+i);
+ this.store.add(new KeyValue(row, family, qf1, i, (byte[])null), null);
+ this.store.add(new KeyValue(row, family, qf2, i, (byte[])null), null);
+ this.store.add(new KeyValue(row, family, qf3, i, (byte[])null), null);
+ flush(i);
+ }
+ // after flush; check the lowest time stamp
+ long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
+ long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
+ assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
+
+ // after compact; check the lowest time stamp
+ store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null);
+ lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
+ lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
+ assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
+ }
+
+ private static long getLowestTimeStampFromFS(FileSystem fs,
+ final Collection<HStoreFile> candidates) throws IOException {
+ long minTs = Long.MAX_VALUE;
+ if (candidates.isEmpty()) {
+ return minTs;
+ }
+ Path[] p = new Path[candidates.size()];
+ int i = 0;
+ for (HStoreFile sf : candidates) {
+ p[i] = sf.getPath();
+ ++i;
+ }
+
+ FileStatus[] stats = fs.listStatus(p);
+ if (stats == null || stats.length == 0) {
+ return minTs;
+ }
+ for (FileStatus s : stats) {
+ minTs = Math.min(minTs, s.getModificationTime());
+ }
+ return minTs;
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Get tests
+ //////////////////////////////////////////////////////////////////////////////
+
+ private static final int BLOCKSIZE_SMALL = 8192;
+ /**
+ * Test for hbase-1686.
+ * @throws IOException
+ */
+ @Test
+ public void testEmptyStoreFile() throws IOException {
+ init(this.name.getMethodName());
+ // Write a store file.
+ this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
+ this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
+ flush(1);
+ // Now put in place an empty store file. Its a little tricky. Have to
+ // do manually with hacked in sequence id.
+ HStoreFile f = this.store.getStorefiles().iterator().next();
+ Path storedir = f.getPath().getParent();
+ long seqid = f.getMaxSequenceId();
+ Configuration c = HBaseConfiguration.create();
+ FileSystem fs = FileSystem.get(c);
+ HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
+ StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c),
+ fs)
+ .withOutputDir(storedir)
+ .withFileContext(meta)
+ .build();
+ w.appendMetadata(seqid + 1, false);
+ w.close();
+ this.store.close();
+ // Reopen it... should pick up two files
+ this.store = new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c);
+ assertEquals(2, this.store.getStorefilesCount());
+
+ result = HBaseTestingUtility.getFromStoreFile(store,
+ get.getRow(),
+ qualifiers);
+ assertEquals(1, result.size());
+ }
+
+ /**
+ * Getting data from memstore only
+ * @throws IOException
+ */
+ @Test
+ public void testGet_FromMemStoreOnly() throws IOException {
+ init(this.name.getMethodName());
+
+ //Put data in memstore
+ this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
+ this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
+ this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
+ this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
+ this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
+ this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
+
+ //Get
+ result = HBaseTestingUtility.getFromStoreFile(store,
+ get.getRow(), qualifiers);
+
+ //Compare
+ assertCheck();
+ }
+
+ /**
+ * Getting data from files only
+ * @throws IOException
+ */
+ @Test
+ public void testGet_FromFilesOnly() throws IOException {
+ init(this.name.getMethodName());
+
+ //Put data in memstore
+ this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
+ this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
+ //flush
+ flush(1);
+
+ //Add more data
+ this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
+ this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
+ //flush
+ flush(2);
+
+ //Add more data
+ this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
+ this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
+ //flush
+ flush(3);
+
+ //Get
+ result = HBaseTestingUtility.getFromStoreFile(store,
+ get.getRow(),
+ qualifiers);
+ //this.store.get(get, qualifiers, result);
+
+ //Need to sort the result since multiple files
+ Collections.sort(result, CellComparator.COMPARATOR);
+
+ //Compare
+ assertCheck();
+ }
+
+ /**
+ * Getting data from memstore and files
+ * @throws IOException
+ */
+ @Test
+ public void testGet_FromMemStoreAndFiles() throws IOException {
+ init(this.name.getMethodName());
+
+ //Put data in memstore
+ this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
+ this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
+ //flush
+ flush(1);
+
+ //Add more data
+ this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
+ this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
+ //flush
+ flush(2);
+
+ //Add more data
+ this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
+ this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
+
+ //Get
+ result = HBaseTestingUtility.getFromStoreFile(store,
+ get.getRow(), qualifiers);
+
+ //Need to sort the result since multiple files
+ Collections.sort(result, CellComparator.COMPARATOR);
+
+ //Compare
+ assertCheck();
+ }
+
+ private void flush(int storeFilessize) throws IOException{
+ this.store.snapshot();
+ flushStore(store, id++);
+ assertEquals(storeFilessize, this.store.getStorefiles().size());
+ assertEquals(0, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount());
+ }
+
+ private void assertCheck() {
+ assertEquals(expected.size(), result.size());
+ for(int i=0; i<expected.size(); i++) {
+ assertEquals(expected.get(i), result.get(i));
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentEdgeManagerTestHelper.reset();
+ }
+
+ @Test
+ public void testHandleErrorsInFlush() throws Exception {
+ LOG.info("Setting up a faulty file system that cannot write");
+
+ final Configuration conf = HBaseConfiguration.create();
+ User user = User.createUserForTesting(conf,
+ "testhandleerrorsinflush", new String[]{"foo"});
+ // Inject our faulty LocalFileSystem
+ conf.setClass("fs.file.impl", FaultyFileSystem.class,
+ FileSystem.class);
+ user.runAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ // Make sure it worked (above is sensitive to caching details in hadoop core)
+ FileSystem fs = FileSystem.get(conf);
+ assertEquals(FaultyFileSystem.class, fs.getClass());
+
+ // Initialize region
+ init(name.getMethodName(), conf);
+
+ LOG.info("Adding some data");
+ store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
+ store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
+ store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
+
+ LOG.info("Before flush, we should have no files");
+
+ Collection<StoreFileInfo> files =
+ store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
+ assertEquals(0, files != null ? files.size() : 0);
+
+ //flush
+ try {
+ LOG.info("Flushing");
+ flush(1);
+ fail("Didn't bubble up IOE!");
+ } catch (IOException ioe) {
+ assertTrue(ioe.getMessage().contains("Fault injected"));
+ }
+
+ LOG.info("After failed flush, we should still have no files!");
+ files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
+ assertEquals(0, files != null ? files.size() : 0);
+ store.getHRegion().getWAL().close();
+ return null;
+ }
+ });
+ FileSystem.closeAllForUGI(user.getUGI());
+ }
+
+ /**
+ * Faulty file system that will fail if you write past its fault position the FIRST TIME
+ * only; thereafter it will succeed. Used by {@link TestHRegion} too.
+ */
+ static class FaultyFileSystem extends FilterFileSystem {
+ List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>();
+ private long faultPos = 200;
+ AtomicBoolean fault = new AtomicBoolean(true);
+
+ public FaultyFileSystem() {
+ super(new LocalFileSystem());
+ System.err.println("Creating faulty!");
+ }
+
+ @Override
+ public FSDataOutputStream create(Path p) throws IOException {
+ return new FaultyOutputStream(super.create(p), faultPos, fault);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, FsPermission permission,
+ boolean overwrite, int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
+ return new FaultyOutputStream(super.create(f, permission,
+ overwrite, bufferSize, replication, blockSize, progress), faultPos, fault);
+ }
+
+ @Override
+ public FSDataOutputStream createNonRecursive(Path f, boolean overwrite,
+ int bufferSize, short replication, long blockSize, Progressable progress)
+ throws IOException {
+ // Fake it. Call create instead. The default implementation throws an IOE
+ // that this is not supported.
+ return create(f, overwrite, bufferSize, replication, blockSize, progress);
+ }
+ }
+
+ static class FaultyOutputStream extends FSDataOutputStream {
+ volatile long faultPos = Long.MAX_VALUE;
+ private final AtomicBoolean fault;
+
+ public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
+ throws IOException {
+ super(out, null);
+ this.faultPos = faultPos;
+ this.fault = fault;
+ }
+
+ @Override
+ public void write(byte[] buf, int offset, int length) throws IOException {
+ System.err.println("faulty stream write at pos " + getPos());
+ injectFault();
+ super.write(buf, offset, length);
+ }
+
+ private void injectFault() throws IOException {
+ if (this.fault.get() && getPos() >= faultPos) {
+ throw new IOException("Fault injected");
+ }
+ }
+ }
+
+ private static void flushStore(HStore store, long id) throws IOException {
+ StoreFlushContext storeFlushCtx = store.createFlushContext(id);
+ storeFlushCtx.prepare();
+ storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
+ storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
+ }
+
+ /**
+ * Generate a list of KeyValues for testing based on given parameters
+ * @param timestamps
+ * @param numRows
+ * @param qualifier
+ * @param family
+ * @return
+ */
+ List<Cell> getKeyValueSet(long[] timestamps, int numRows,
+ byte[] qualifier, byte[] family) {
+ List<Cell> kvList = new ArrayList<>();
+ for (int i=1;i<=numRows;i++) {
+ byte[] b = Bytes.toBytes(i);
+ for (long timestamp: timestamps) {
+ kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
+ }
+ }
+ return kvList;
+ }
+
+ /**
+ * Test to ensure correctness when using Stores with multiple timestamps
+ * @throws IOException
+ */
+ @Test
+ public void testMultipleTimestamps() throws IOException {
+ int numRows = 1;
+ long[] timestamps1 = new long[] {1,5,10,20};
+ long[] timestamps2 = new long[] {30,80};
+
+ init(this.name.getMethodName());
+
+ List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family);
+ for (Cell kv : kvList1) {
+ this.store.add(kv, null);
+ }
+
+ this.store.snapshot();
+ flushStore(store, id++);
+
+ List<Cell> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family);
+ for(Cell kv : kvList2) {
+ this.store.add(kv, null);
+ }
+
+ List<Cell> result;
+ Get get = new Get(Bytes.toBytes(1));
+ get.addColumn(family,qf1);
+
+ get.setTimeRange(0,15);
+ result = HBaseTestingUtility.getFromStoreFile(store, get);
+ assertTrue(result.size()>0);
+
+ get.setTimeRange(40,90);
+ result = HBaseTestingUtility.getFromStoreFile(store, get);
+ assertTrue(result.size()>0);
+
+ get.setTimeRange(10,45);
+ result = HBaseTestingUtility.getFromStoreFile(store, get);
+ assertTrue(result.size()>0);
+
+ get.setTimeRange(80,145);
+ result = HBaseTestingUtility.getFromStoreFile(store, get);
+ assertTrue(result.size()>0);
+
+ get.setTimeRange(1,2);
+ result = HBaseTestingUtility.getFromStoreFile(store, get);
+ assertTrue(result.size()>0);
+
+ get.setTimeRange(90,200);
+ result = HBaseTestingUtility.getFromStoreFile(store, get);
+ assertTrue(result.size()==0);
+ }
+
+ /**
+ * Test for HBASE-3492 - Test split on empty colfam (no store files).
+ *
+ * @throws IOException When the IO operations fail.
+ */
+ @Test
+ public void testSplitWithEmptyColFam() throws IOException {
+ init(this.name.getMethodName());
+ assertFalse(store.getSplitPoint().isPresent());
+ store.getHRegion().forceSplit(null);
+ assertFalse(store.getSplitPoint().isPresent());
+ store.getHRegion().clearSplit();
+ }
+
+ @Test
+ public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
+ final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
+ long anyValue = 10;
+
+ // We'll check that it uses correct config and propagates it appropriately by going thru
+ // the simplest "real" path I can find - "throttleCompaction", which just checks whether
+ // a number we pass in is higher than some config value, inside compactionPolicy.
+ Configuration conf = HBaseConfiguration.create();
+ conf.setLong(CONFIG_KEY, anyValue);
+ init(name.getMethodName() + "-xml", conf);
+ assertTrue(store.throttleCompaction(anyValue + 1));
+ assertFalse(store.throttleCompaction(anyValue));
+
+ // HTD overrides XML.
+ --anyValue;
+ HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ htd.setConfiguration(CONFIG_KEY, Long.toString(anyValue));
+ init(name.getMethodName() + "-htd", conf, htd, hcd);
+ assertTrue(store.throttleCompaction(anyValue + 1));
+ assertFalse(store.throttleCompaction(anyValue));
+
+ // HCD overrides them both.
+ --anyValue;
+ hcd.setConfiguration(CONFIG_KEY, Long.toString(anyValue));
+ init(name.getMethodName() + "-hcd", conf, htd, hcd);
+ assertTrue(store.throttleCompaction(anyValue + 1));
+ assertFalse(store.throttleCompaction(anyValue));
+ }
+
+ public static class DummyStoreEngine extends DefaultStoreEngine {
+ public static DefaultCompactor lastCreatedCompactor = null;
+
+ @Override
+ protected void createComponents(Configuration conf, HStore store, CellComparator comparator)
+ throws IOException {
+ super.createComponents(conf, store, comparator);
+ lastCreatedCompactor = this.compactor;
+ }
+ }
+
+ @Test
+ public void testStoreUsesSearchEngineOverride() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
+ init(this.name.getMethodName(), conf);
+ assertEquals(DummyStoreEngine.lastCreatedCompactor,
+ this.store.storeEngine.getCompactor());
+ }
+
+ private void addStoreFile() throws IOException {
+ HStoreFile f = this.store.getStorefiles().iterator().next();
+ Path storedir = f.getPath().getParent();
+ long seqid = this.store.getMaxSequenceId();
+ Configuration c = TEST_UTIL.getConfiguration();
+ FileSystem fs = FileSystem.get(c);
+ HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
+ StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c),
+ fs)
+ .withOutputDir(storedir)
+ .withFileContext(fileContext)
+ .build();
+ w.appendMetadata(seqid + 1, false);
+ w.close();
+ LOG.info("Added store file:" + w.getPath());
+ }
+
+ private void archiveStoreFile(int index) throws IOException {
+ Collection<HStoreFile> files = this.store.getStorefiles();
+ HStoreFile sf = null;
+ Iterator<HStoreFile> it = files.iterator();
+ for (int i = 0; i <= index; i++) {
+ sf = it.next();
+ }
+ store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf));
+ }
+
+ private void closeCompactedFile(int index) throws IOException {
+ Collection<HStoreFile> files =
+ this.store.getStoreEngine().getStoreFileManager().getCompactedfiles();
+ HStoreFile sf = null;
+ Iterator<HStoreFile> it = files.iterator();
+ for (int i = 0; i <= index; i++) {
+ sf = it.next();
+ }
+ sf.closeStoreFile(true);
+ store.getStoreEngine().getStoreFileManager().removeCompactedFiles(Lists.newArrayList(sf));
+ }
+
+ @Test
+ public void testRefreshStoreFiles() throws Exception {
+ init(name.getMethodName());
+
+ assertEquals(0, this.store.getStorefilesCount());
+
+ // Test refreshing store files when no store files are there
+ store.refreshStoreFiles();
+ assertEquals(0, this.store.getStorefilesCount());
+
+ // add some data, flush
+ this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
+ flush(1);
+ assertEquals(1, this.store.getStorefilesCount());
+
+ // add one more file
+ addStoreFile();
+
+ assertEquals(1, this.store.getStorefilesCount());
+ store.refreshStoreFiles();
+ assertEquals(2, this.store.getStorefilesCount());
+
+ // add three more files
+ addStoreFile();
+ addStoreFile();
+ addStoreFile();
+
+ assertEquals(2, this.store.getStorefilesCount());
+ store.refreshStoreFiles();
+ assertEquals(5, this.store.getStorefilesCount());
+
+ closeCompactedFile(0);
+ archiveStoreFile(0);
+
+ assertEquals(5, this.store.getStorefilesCount());
+ store.refreshStoreFiles();
+ assertEquals(4, this.store.getStorefilesCount());
+
+ archiveStoreFile(0);
+ archiveStoreFile(1);
+ archiveStoreFile(2);
+
+ assertEquals(4, this.store.getStorefilesCount());
+ store.refreshStoreFiles();
+ assertEquals(1, this.store.getStorefilesCount());
+
+ archiveStoreFile(0);
+ store.refreshStoreFiles();
+ assertEquals(0, this.store.getStorefilesCount());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testRefreshStoreFilesNotChanged() throws IOException {
+ init(name.getMethodName());
+
+ assertEquals(0, this.store.getStorefilesCount());
+
+ // add some data, flush
+ this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
+ flush(1);
+ // add one more file
+ addStoreFile();
+
+ HStore spiedStore = spy(store);
+
+ // call first time after files changed
+ spiedStore.refreshStoreFiles();
+ assertEquals(2, this.store.getStorefilesCount());
+ verify(spiedStore, times(1)).replaceStoreFiles(any(Collection.class), any(Collection.class));
+
+ // call second time
+ spiedStore.refreshStoreFiles();
+
+ //ensure that replaceStoreFiles is not called if files are not refreshed
+ verify(spiedStore, times(0)).replaceStoreFiles(null, null);
+ }
+
+ private long countMemStoreScanner(StoreScanner scanner) {
+ if (scanner.currentScanners == null) {
+ return 0;
+ }
+ return scanner.currentScanners.stream()
+ .filter(s -> !s.isFileScanner())
+ .count();
+ }
+
+ @Test
+ public void testNumberOfMemStoreScannersAfterFlush() throws IOException {
+ long seqId = 100;
+ long timestamp = System.currentTimeMillis();
+ Cell cell0 = CellUtil.createCell(row, family, qf1, timestamp,
+ KeyValue.Type.Put.getCode(), qf1);
+ CellUtil.setSequenceId(cell0, seqId);
+ testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.EMPTY_LIST);
+
+ Cell cell1 = CellUtil.createCell(row, family, qf2, timestamp,
+ KeyValue.Type.Put.getCode(), qf1);
+ CellUtil.setSequenceId(cell1, seqId);
+ testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1));
+
+ seqId = 101;
+ timestamp = System.currentTimeMillis();
+ Cell cell2 = CellUtil.createCell(row2, family, qf2, timestamp,
+ KeyValue.Type.Put.getCode(), qf1);
+ CellUtil.setSequenceId(cell2, seqId);
+ testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2));
+ }
+
+ private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot,
+ List<Cell> inputCellsAfterSnapshot) throws IOException {
+ init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size());
+ TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ long seqId = Long.MIN_VALUE;
+ for (Cell c : inputCellsBeforeSnapshot) {
+ quals.add(CellUtil.cloneQualifier(c));
+ seqId = Math.max(seqId, c.getSequenceId());
+ }
+ for (Cell c : inputCellsAfterSnapshot) {
+ quals.add(CellUtil.cloneQualifier(c));
+ seqId = Math.max(seqId, c.getSequenceId());
+ }
+ inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
+ StoreFlushContext storeFlushCtx = store.createFlushContext(id++);
+ storeFlushCtx.prepare();
+ inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
+ int numberOfMemScannersWhenScaning = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
+ try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) {
+ // snaptshot + active (if it isn't empty)
+ assertEquals(numberOfMemScannersWhenScaning, countMemStoreScanner(s));
+ storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
+ storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
+ boolean more;
+ int cellCount = 0;
+ do {
+ List<Cell> cells = new ArrayList<>();
+ more = s.next(cells);
+ cellCount += cells.size();
+ assertEquals(more ? numberOfMemScannersWhenScaning : 0, countMemStoreScanner(s));
+ } while (more);
+ assertEquals("The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size()
+ + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(),
+ inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount);
+ // the current scanners is cleared
+ assertEquals(0, countMemStoreScanner(s));
+ }
+ }
+
+ private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) throws IOException {
+ Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(), value);
+ CellUtil.setSequenceId(c, sequenceId);
+ return c;
+ }
+
+ private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value)
+ throws IOException {
+ Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(), value);
+ CellUtil.setSequenceId(c, sequenceId);
+ return c;
+ }
+
+ @Test
+ public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
+ final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
+ final int expectedSize = 3;
+ testFlushBeforeCompletingScan(new MyListHook() {
+ @Override
+ public void hook(int currentSize) {
+ if (currentSize == expectedSize - 1) {
+ try {
+ flushStore(store, id++);
+ timeToGoNextRow.set(true);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }, new FilterBase() {
+ @Override
+ public Filter.ReturnCode filterKeyValue(Cell v) throws IOException {
+ return ReturnCode.INCLUDE;
+ }
+ }, expectedSize);
+ }
+
+ @Test
+ public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException {
+ final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
+ final int expectedSize = 2;
+ testFlushBeforeCompletingScan(new MyListHook() {
+ @Override
+ public void hook(int currentSize) {
+ if (currentSize == expectedSize - 1) {
+ try {
+ flushStore(store, id++);
+ timeToGoNextRow.set(true);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }, new FilterBase() {
+ @Override
+ public Filter.ReturnCode filterKeyValue(Cell v) throws IOException {
+ if (timeToGoNextRow.get()) {
+ timeToGoNextRow.set(false);
+ return ReturnCode.NEXT_ROW;
+ } else {
+ return ReturnCode.INCLUDE;
+ }
+ }
+ }, expectedSize);
+ }
+
+ @Test
+ public void testFlushBeforeCompletingScanWithFilterHint() throws IOException, InterruptedException {
+ final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
+ final int expectedSize = 2;
+ testFlushBeforeCompletingScan(new MyListHook() {
+ @Override
+ public void hook(int currentSize) {
+ if (currentSize == expectedSize - 1) {
+ try {
+ flushStore(store, id++);
+ timeToGetHint.set(true);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }, new FilterBase() {
+ @Override
+ public Filter.ReturnCode filterKeyValue(Cell v) throws IOException {
+ if (timeToGetHint.get()) {
+ timeToGetHint.set(false);
+ return Filter.ReturnCode.SEEK_NEXT_USING_HINT;
+ } else {
+ return Filter.ReturnCode.INCLUDE;
+ }
+ }
+ @Override
+ public Cell getNextCellHint(Cell currentCell) throws IOException {
+ return currentCell;
+ }
+ }, expectedSize);
+ }
+
+ private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize)
+ throws IOException, InterruptedException {
+ Configuration conf = HBaseConfiguration.create();
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ hcd.setMaxVersions(1);
+ byte[] r0 = Bytes.toBytes("row0");
+ byte[] r1 = Bytes.toBytes("row1");
+ byte[] r2 = Bytes.toBytes("row2");
+ byte[] value0 = Bytes.toBytes("value0");
+ byte[] value1 = Bytes.toBytes("value1");
+ byte[] value2 = Bytes.toBytes("value2");
+ MemstoreSize memStoreSize = new MemstoreSize();
+ long ts = EnvironmentEdgeManager.currentTime();
+ long seqId = 100;
+ init(name.getMethodName(), conf, new HTableDescriptor(TableName.valueOf(table)), hcd, new MyStoreHook() {
+ @Override
+ public long getSmallestReadPoint(HStore store) {
+ return seqId + 3;
+ }
+ });
+ // The cells having the value0 won't be flushed to disk because the value of max version is 1
+ store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSize);
+ store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSize);
+ store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSize);
+ store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSize);
+ store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSize);
+ store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSize);
+ store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSize);
+ store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSize);
+ store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSize);
+ store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSize);
+ store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSize);
+ store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSize);
+ List<Cell> myList = new MyList<>(hook);
+ Scan scan = new Scan()
+ .withStartRow(r1)
+ .setFilter(filter);
+ try (InternalScanner scanner = (InternalScanner) store.getScanner(
+ scan, null, seqId + 3)){
+ // r1
+ scanner.next(myList);
+ assertEquals(expectedSize, myList.size());
+ for (Cell c : myList) {
+ byte[] actualValue = CellUtil.cloneValue(c);
+ assertTrue("expected:" + Bytes.toStringBinary(value1)
+ + ", actual:" + Bytes.toStringBinary(actualValue)
+ , Bytes.equals(actualValue, value1));
+ }
+ List<Cell> normalList = new ArrayList<>(3);
+ // r2
+ scanner.next(normalList);
+ assertEquals(3, normalList.size());
+ for (Cell c : normalList) {
+ byte[] actualValue = CellUtil.cloneValue(c);
+ assertTrue("expected:" + Bytes.toStringBinary(value2)
+ + ", actual:" + Bytes.toStringBinary(actualValue)
+ , Bytes.equals(actualValue, value2));
+ }
+ }
+ }
+
+ @Test
+ public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
+ Configuration conf = HBaseConfiguration.create();
+ conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName());
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
+ init(name.getMethodName(), conf, hcd);
+ byte[] value = Bytes.toBytes("value");
+ MemstoreSize memStoreSize = new MemstoreSize();
+ long ts = EnvironmentEdgeManager.currentTime();
+ long seqId = 100;
+ // older data whihc shouldn't be "seen" by client
+ store.add(createCell(qf1, ts, seqId, value), memStoreSize);
+ store.add(createCell(qf2, ts, seqId, value), memStoreSize);
+ store.add(createCell(qf3, ts, seqId, value), memStoreSize);
+ TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ quals.add(qf1);
+ quals.add(qf2);
+ quals.add(qf3);
+ StoreFlushContext storeFlushCtx = store.createFlushContext(id++);
+ MyCompactingMemStore.START_TEST.set(true);
+ Runnable flush = () -> {
+ // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5)
+ // recreate the active memstore -- phase (4/5)
+ storeFlushCtx.prepare();
+ };
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ service.submit(flush);
+ // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5)
+ // this is blocked until we recreate the active memstore -- phase (3/5)
+ // we get scanner from active memstore but it is empty -- phase (5/5)
+ InternalScanner scanner = (InternalScanner) store.getScanner(
+ new Scan(new Get(row)), quals, seqId + 1);
+ service.shutdown();
+ service.awaitTermination(20, TimeUnit.SECONDS);
+ try {
+ try {
+ List<Cell> results = new ArrayList<>();
+ scanner.next(results);
+ assertEquals(3, results.size());
+ for (Cell c : results) {
+ byte[] actualValue = CellUtil.cloneValue(c);
+ assertTrue("expected:" + Bytes.toStringBinary(value)
+ + ", actual:" + Bytes.toStringBinary(actualValue)
+ , Bytes.equals(actualValue, value));
+ }
+ } finally {
+ scanner.close();
+ }
+ } finally {
+ MyCompactingMemStore.START_TEST.set(false);
+ storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
+ storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
+ }
+ }
+
+ @Test
+ public void testScanWithDoubleFlush() throws IOException {
+ Configuration conf = HBaseConfiguration.create();
+ // Initialize region
+ MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook(){
+ @Override
+ public void getScanners(MyStore store) throws IOException {
+ final long tmpId = id++;
+ ExecutorService s = Executors.newSingleThreadExecutor();
+ s.submit(() -> {
+ try {
+ // flush the store before storescanner updates the scanners from store.
+ // The current data will be flushed into files, and the memstore will
+ // be clear.
+ // -- phase (4/4)
+ flushStore(store, tmpId);
+ }catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ });
+ s.shutdown();
+ try {
+ // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
+ s.awaitTermination(3, TimeUnit.SECONDS);
+ } catch (InterruptedException ex) {
+ }
+ }
+ });
+ byte[] oldValue = Bytes.toBytes("oldValue");
+ byte[] currentValue = Bytes.toBytes("currentValue");
+ MemstoreSize memStoreSize = new MemstoreSize();
+ long ts = EnvironmentEdgeManager.currentTime();
+ long seqId = 100;
+ // older data whihc shouldn't be "seen" by client
+ myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSize);
+ myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSize);
+ myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSize);
+ long snapshotId = id++;
+ // push older data into snapshot -- phase (1/4)
+ StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId);
+ storeFlushCtx.prepare();
+
+ // insert current data into active -- phase (2/4)
+ myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSize);
+ myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSize);
+ myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSize);
+ TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ quals.add(qf1);
+ quals.add(qf2);
+ quals.add(qf3);
+ try (InternalScanner scanner = (InternalScanner) myStore.getScanner(
+ new Scan(new Get(row)), quals, seqId + 1)) {
+ // complete the flush -- phase (3/4)
+ storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
+ storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
+
+ List<Cell> results = new ArrayList<>();
+ scanner.next(results);
+ assertEquals(3, results.size());
+ for (Cell c : results) {
+ byte[] actualValue = CellUtil.cloneValue(c);
+ assertTrue("expected:" + Bytes.toStringBinary(currentValue)
+ + ", actual:" + Bytes.toStringBinary(actualValue)
+ , Bytes.equals(actualValue, currentValue));
+ }
+ }
+ }
+
+ @Test
+ public void testReclaimChunkWhenScaning() throws IOException {
+ init("testReclaimChunkWhenScaning");
+ long ts = EnvironmentEdgeManager.currentTime();
+ long seqId = 100;
+ byte[] value = Bytes.toBytes("value");
+ // older data whihc shouldn't be "seen" by client
+ store.add(createCell(qf1, ts, seqId, value), null);
+ store.add(createCell(qf2, ts, seqId, value), null);
+ store.add(createCell(qf3, ts, seqId, value), null);
+ TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ quals.add(qf1);
+ quals.add(qf2);
+ quals.add(qf3);
+ try (InternalScanner scanner = (InternalScanner) store.getScanner(
+ new Scan(new Get(row)), quals, seqId)) {
+ List<Cell> results = new MyList<>(size -> {
+ switch (size) {
+ // 1) we get the first cell (qf1)
+ // 2) flush the data to have StoreScanner update inner scanners
+ // 3) the chunk will be reclaimed after updaing
+ case 1:
+ try {
+ flushStore(store, id++);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ break;
+ // 1) we get the second cell (qf2)
+ // 2) add some cell to fill some byte into the chunk (we have only one chunk)
+ case 2:
+ try {
+ byte[] newValue = Bytes.toBytes("newValue");
+ // older data whihc shouldn't be "seen" by client
+ store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null);
+ store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null);
+ store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ break;
+ default:
+ break;
+ }
+ });
+ scanner.next(results);
+ assertEquals(3, results.size());
+ for (Cell c : results) {
+ byte[] actualValue = CellUtil.cloneValue(c);
+ assertTrue("expected:" + Bytes.toStringBinary(value)
+ + ", actual:" + Bytes.toStringBinary(actualValue)
+ , Bytes.equals(actualValue, value));
+ }
+ }
+ }
+
+ /**
+ * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable
+ * may change the versionedList. And the first InMemoryFlushRunnable will use the chagned
+ * versionedList to remove the corresponding segments.
+ * In short, there will be some segements which isn't in merge are removed.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test (timeout=30000)
+ public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException {
+ int flushSize = 500;
+ Configuration conf = HBaseConfiguration.create();
+ conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
+ conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
+ // Set the lower threshold to invoke the "MERGE" policy
+ conf.set(MemStoreCompactor.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
+ init(name.getMethodName(), conf, hcd);
+ byte[] value = Bytes.toBytes("thisisavarylargevalue");
+ MemstoreSize memStoreSize = new MemstoreSize();
+ long ts = EnvironmentEdgeManager.currentTime();
+ long seqId = 100;
+ // older data whihc shouldn't be "seen" by client
+ store.add(createCell(qf1, ts, seqId, value), memStoreSize);
+ store.add(createCell(qf2, ts, seqId, value), memStoreSize);
+ store.add(createCell(qf3, ts, seqId, value), memStoreSize);
+ assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
+ StoreFlushContext storeFlushCtx = store.createFlushContext(id++);
+ storeFlushCtx.prepare();
+ // This shouldn't invoke another in-memory flush because the first compactor thread
+ // hasn't accomplished the in-memory compaction.
+ store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize);
+ store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize);
+ store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize);
+ assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
+ //okay. Let the compaction be completed
+ MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
+ CompactingMemStore mem = (CompactingMemStore) ((HStore)store).memstore;
+ while (mem.isMemStoreFlushingInMemory()) {
+ TimeUnit.SECONDS.sleep(1);
+ }
+ // This should invoke another in-memory flush.
+ store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize);
+ store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize);
+ store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize);
+ assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
+ conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
+ String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
+ storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
+ storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
+ }
+
+ private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook)
+ throws IOException {
+ HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ hcd.setMaxVersions(5);
+ return (MyStore) init(methodName, conf, htd, hcd, hook);
+ }
+
+ class MyStore extends HStore {
+ private final MyStoreHook hook;
+
+ MyStore(final HRegion region, final HColumnDescriptor family, final Configuration confParam,
+ MyStoreHook hook, boolean switchToPread) throws IOException {
+ super(region, family, confParam);
+ this.hook = hook;
+ }
+
+ @Override
+ public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
+ boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
+ boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
+ boolean includeMemstoreScanner) throws IOException {
+ hook.getScanners(this);
+ return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true,
+ stopRow, false, readPt, includeMemstoreScanner);
+ }
+
+ @Override
+ public long getSmallestReadPoint() {
+ return hook.getSmallestReadPoint(this);
+ }
+ }
+
+ private abstract class MyStoreHook {
+ void getScanners(MyStore store) throws IOException {
+ }
+ long getSmallestReadPoint(HStore store) {
+ return store.getHRegion().getSmallestReadPoint();
+ }
+ }
+
+ @Test
+ public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception {
+ int flushSize = 500;
+ Configuration conf = HBaseConfiguration.create();
+ conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
+ conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
+ // Set the lower threshold to invoke the "MERGE" policy
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
+ MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {});
+ MemstoreSize memStoreSize = new MemstoreSize();
+ long ts = System.currentTimeMillis();
+ long seqID = 1l;
+ // Add some data to the region and do some flushes
+ for (int i = 1; i < 10; i++) {
+ store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
+ memStoreSize);
+ }
+ // flush them
+ flushStore(store, seqID);
+ for (int i = 11; i < 20; i++) {
+ store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
+ memStoreSize);
+ }
+ // flush them
+ flushStore(store, seqID);
+ for (int i = 21; i < 30; i++) {
+ store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
+ memStoreSize);
+ }
+ // flush them
+ flushStore(store, seqID);
+
+ assertEquals(3, store.getStorefilesCount());
+ ScanInfo scanInfo = store.getScanInfo();
+ Scan scan = new Scan();
+ scan.addFamily(family);
+ Collection<HStoreFile> storefiles2 = store.getStorefiles();
+ ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2);
+ StoreScanner storeScanner =
+ (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
+ // get the current heap
+ KeyValueHeap heap = storeScanner.heap;
+ // create more store files
+ for (int i = 31; i < 40; i++) {
+ store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
+ memStoreSize);
+ }
+ // flush them
+ flushStore(store, seqID);
+
+ for (int i = 41; i < 50; i++) {
+ store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
+ memStoreSize);
+ }
+ // flush them
+ flushStore(store, seqID);
+ storefiles2 = store.getStorefiles();
+ ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2);
+ actualStorefiles1.removeAll(actualStorefiles);
+ // Do compaction
+ List<Exception> exceptions = new ArrayList<Exception>();
+ MyThread thread = new MyThread(storeScanner);
+ thread.start();
+ store.replaceStoreFiles(actualStorefiles, actualStorefiles1);
+ thread.join();
+ KeyValueHeap heap2 = thread.getHeap();
+ assertFalse(heap.equals(heap2));
+ }
+
+ private static class MyThread extends Thread {
+ private StoreScanner scanner;
+ private KeyValueHeap heap;
+
+ public MyThread(StoreScanner scanner) {
+ this.scanner = scanner;
+ }
+
+ public KeyValueHeap getHeap() {
+ return this.heap;
+ }
+
+ public void run() {
+ scanner.trySwitchToStreamRead();
+ heap = scanner.heap;
+ }
+ }
+
+ private static class MyMemStoreCompactor extends MemStoreCompactor {
+ private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
+ private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
+ public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy compactionPolicy) {
+ super(compactingMemStore, compactionPolicy);
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0;
+ boolean rval = super.start();
+ if (isFirst) {
+ try {
+ START_COMPACTOR_LATCH.await();
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ return rval;
+ }
+ }
+
+ public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore {
+ private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
+ public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparator c,
+ HStore store, RegionServicesForStores regionServices,
+ MemoryCompactionPolicy compactionPolicy) throws IOException {
+ super(conf, c, store, regionServices, compactionPolicy);
+ }
+
+ @Override
+ protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) {
+ return new MyMemStoreCompactor(this, compactionPolicy);
+ }
+
+ @Override
+ protected boolean shouldFlushInMemory() {
+ boolean rval = super.shouldFlushInMemory();
+ if (rval) {
+ RUNNER_COUNT.incrementAndGet();
+ }
+ return rval;
+ }
+ }
+
+ public static class MyCompactingMemStore extends CompactingMemStore {
+ private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
+ private final CountDownLatch getScannerLatch = new CountDownLatch(1);
+ private final CountDownLatch snapshotLatch = new CountDownLatch(1);
+ public MyCompactingMemStore(Configuration conf, CellComparator c,
+ HStore store, RegionServicesForStores regionServices,
+ MemoryCompactionPolicy compactionPolicy) throws IOException {
+ super(conf, c, store, regionServices, compactionPolicy);
+ }
+
+ @Override
+ protected List<KeyValueScanner> createList(int capacity) {
+ if (START_TEST.get()) {
+ try {
+ getScannerLatch.countDown();
+ snapshotLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return new ArrayList<>(capacity);
+ }
+ @Override
+ protected void pushActiveToPipeline(MutableSegment active) {
+ if (START_TEST.get()) {
+ try {
+ getScannerLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ super.pushActiveToPipeline(active);
+ if (START_TEST.get()) {
+ snapshotLatch.countDown();
+ }
+ }
+ }
+
+ interface MyListHook {
+ void hook(int currentSize);
+ }
+
+ private static class MyList<T> implements List<T> {
+ private final List<T> delegatee = new ArrayList<>();
+ private final MyListHook hookAtAdd;
+ MyList(final MyListHook hookAtAdd) {
+ this.hookAtAdd = hookAtAdd;
+ }
+ @Override
+ public int size() {return delegatee.size();}
+
+ @Override
+ public boolean isEmpty() {return delegatee.isEmpty();}
+
+ @Override
+ public boolean contains(Object o) {return delegatee.contains(o);}
+
+ @Override
+ public Iterator<T> iterator() {return delegatee.iterator();}
+
+ @Override
+ public Object[] toArray() {return delegatee.toArray();}
+
+ @Override
+ public <T> T[] toArray(T[] a) {return delegatee.toArray(a);}
+
+ @Override
+ public boolean add(T e) {
+ hookAtAdd.hook(size());
+ return delegatee.add(e);
+ }
+
+ @Override
+ public boolean remove(Object o) {return delegatee.remove(o);}
+
+ @Override
+ public boolean containsAll(Collection<?> c) {return delegatee.containsAll(c);}
+
+ @Override
+ public boolean addAll(Collection<? extends T> c) {return delegatee.addAll(c);}
+
+ @Override
+ public boolean addAll(int index, Collection<? extends T> c) {return delegatee.addAll(index, c);}
+
+ @Override
+ public boolean removeAll(Collection<?> c) {return delegatee.removeAll(c);}
+
+ @Override
+ public boolean retainAll(Collection<?> c) {return delegatee.retainAll(c);}
+
+ @Override
+ public void clear() {delegatee.clear();}
+
+ @Override
+ public T get(int index) {return delegatee.get(index);}
+
+ @Override
+ public T set(int index, T element) {return delegatee.set(index, element);}
+
+ @Override
+ public void add(int index, T element) {delegatee.add(index, element);}
+
+ @Override
+ public T remove(int index) {return delegatee.remove(index);}
+
+ @Override
+ public int indexOf(Object o) {return delegatee.indexOf(o);}
+
+ @Override
+ public int lastIndexOf(Object o) {return delegatee.lastIndexOf(o);}
+
+ @Override
+ public ListIterator<T> listIterator() {return delegatee.listIterator();}
+
+ @Override
+ public ListIterator<T> listIterator(int index) {return delegatee.listIterator(index);}
+
+ @Override
+ public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);}
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java
index e74e939..b20cae8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java
@@ -176,11 +176,9 @@ public class TestHStoreFile extends HBaseTestCase {
// Split on a row, not in middle of row. Midkey returned by reader
// may be in middle of row. Create new one with empty column and
// timestamp.
- Cell kv = reader.midkey();
- byte [] midRow = CellUtil.cloneRow(kv);
- kv = reader.getLastKey();
- byte [] finalRow = CellUtil.cloneRow(kv);
- hsf.closeReader(true);
+ byte [] midRow = CellUtil.cloneRow(reader.midKey().get());
+ byte [] finalRow = CellUtil.cloneRow(reader.getLastKey().get());
+ hsf.closeStoreFile(true);
// Make a reference
HRegionInfo splitHri = new HRegionInfo(hri.getTable(), null, midRow);
@@ -190,7 +188,8 @@ public class TestHStoreFile extends HBaseTestCase {
// Now confirm that I can read from the reference and that it only gets
// keys from top half of the file.
HFileScanner s = refHsf.getReader().getScanner(false, false);
- for(boolean first = true; (!s.isSeeked() && s.seekTo()) || s.next();) {
+ Cell kv = null;
+ for (boolean first = true; (!s.isSeeked() && s.seekTo()) || s.next();) {
ByteBuffer bb = ByteBuffer.wrap(((KeyValue) s.getKey()).getKey());
kv = KeyValueUtil.createKeyValueFromKey(bb);
if (first) {
@@ -301,7 +300,7 @@ public class TestHStoreFile extends HBaseTestCase {
f.initReader();
Path pathA = splitStoreFile(cloneRegionFs, splitHriA, TEST_FAMILY, f, SPLITKEY, true); // top
Path pathB = splitStoreFile(cloneRegionFs, splitHriB, TEST_FAMILY, f, SPLITKEY, false);// bottom
- f.closeReader(true);
+ f.closeStoreFile(true);
// OK test the thing
FSUtils.logFileSystemState(fs, testDir, LOG);
@@ -342,7 +341,7 @@ public class TestHStoreFile extends HBaseTestCase {
private void checkHalfHFile(final HRegionFileSystem regionFs, final HStoreFile f)
throws IOException {
f.initReader();
- Cell midkey = f.getReader().midkey();
+ Cell midkey = f.getReader().midKey().get();
KeyValue midKV = (KeyValue)midkey;
byte [] midRow = CellUtil.cloneRow(midKV);
// Create top split.
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
index 707540a..0c33bdb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
@@ -84,7 +84,7 @@ public class TestMajorCompaction {
private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
protected Configuration conf = UTIL.getConfiguration();
- private Region r = null;
+ private HRegion r = null;
private HTableDescriptor htd = null;
private static final byte [] COLUMN_FAMILY = fam1;
private final byte [] STARTROW = Bytes.toBytes(START_KEY);
@@ -328,7 +328,7 @@ public class TestMajorCompaction {
// ensure that major compaction time is deterministic
RatioBasedCompactionPolicy
c = (RatioBasedCompactionPolicy)s.storeEngine.getCompactionPolicy();
- Collection<StoreFile> storeFiles = s.getStorefiles();
+ Collection<HStoreFile> storeFiles = s.getStorefiles();
long mcTime = c.getNextMajorCompactTime(storeFiles);
for (int i = 0; i < 10; ++i) {
assertEquals(mcTime, c.getNextMajorCompactTime(storeFiles));
@@ -358,7 +358,7 @@ public class TestMajorCompaction {
private void verifyCounts(int countRow1, int countRow2) throws Exception {
int count1 = 0;
int count2 = 0;
- for (StoreFile f: r.getStore(COLUMN_FAMILY_TEXT).getStorefiles()) {
+ for (HStoreFile f: r.getStore(COLUMN_FAMILY_TEXT).getStorefiles()) {
HFileScanner scanner = f.getReader().getScanner(false, false);
scanner.seekTo();
do {
@@ -377,7 +377,7 @@ public class TestMajorCompaction {
private int count() throws IOException {
int count = 0;
- for (StoreFile f: r.getStore(COLUMN_FAMILY_TEXT).getStorefiles()) {
+ for (HStoreFile f: r.getStore(COLUMN_FAMILY_TEXT).getStorefiles()) {
HFileScanner scanner = f.getReader().getScanner(false, false);
if (!scanner.seekTo()) {
continue;
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java
index 9ab1440..c08bd71 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
+import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
+import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -318,10 +320,10 @@ public class TestMobStoreCompaction {
if (fs.exists(mobDirPath)) {
FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath);
for (FileStatus file : files) {
- StoreFile sf = new HStoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE, true);
+ HStoreFile sf = new HStoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE, true);
sf.initReader();
Map<byte[], byte[]> fileInfo = sf.getReader().loadFileInfo();
- byte[] count = fileInfo.get(StoreFile.MOB_CELLS_COUNT);
+ byte[] count = fileInfo.get(MOB_CELLS_COUNT);
assertTrue(count != null);
mobCellsCount += Bytes.toLong(count);
}
@@ -349,7 +351,7 @@ public class TestMobStoreCompaction {
Bytes.toBytes("colX"), now, dummyData);
writer.append(kv);
} finally {
- writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
+ writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
writer.close();
}
}
@@ -428,20 +430,20 @@ public class TestMobStoreCompaction {
copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
CacheConfig cacheConfig = new CacheConfig(copyOfConf);
Path mobDirPath = MobUtils.getMobFamilyPath(conf, htd.getTableName(), hcd.getNameAsString());
- List<StoreFile> sfs = new ArrayList<>();
+ List<HStoreFile> sfs = new ArrayList<>();
int numDelfiles = 0;
int size = 0;
if (fs.exists(mobDirPath)) {
for (FileStatus f : fs.listStatus(mobDirPath)) {
- StoreFile sf = new HStoreFile(fs, f.getPath(), conf, cacheConfig, BloomType.NONE, true);
+ HStoreFile sf = new HStoreFile(fs, f.getPath(), conf, cacheConfig, BloomType.NONE, true);
sfs.add(sf);
if (StoreFileInfo.isDelFile(sf.getPath())) {
numDelfiles++;
}
}
- List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, false, false,
- HConstants.LATEST_TIMESTAMP);
+ List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true,
+ false, false, HConstants.LATEST_TIMESTAMP);
long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
long ttl = HStore.determineTTLFromFamily(hcd);
ScanInfo scanInfo = new ScanInfo(copyOfConf, hcd, ttl, timeToPurgeDeletes,
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
index 72a968c..86fe5af 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
@@ -45,9 +45,6 @@ import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -55,9 +52,17 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.util.StringUtils;
-import org.junit.*;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+
/**
* Tests for region replicas. Sad that we cannot isolate these without bringing up a whole
* cluster. See {@link TestRegionServerNoMaster}.
@@ -472,7 +477,7 @@ public class TestRegionReplicas {
// should be able to deal with it giving us all the result we expect.
int keys = 0;
int sum = 0;
- for (StoreFile sf: secondaryRegion.getStore(f).getStorefiles()) {
+ for (HStoreFile sf : ((HStore) secondaryRegion.getStore(f)).getStorefiles()) {
// Our file does not exist anymore. was moved by the compaction above.
LOG.debug(getRS().getFileSystem().exists(sf.getPath()));
Assert.assertFalse(getRS().getFileSystem().exists(sf.getPath()));
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionSplitPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionSplitPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionSplitPolicy.java
index cad060e..06c0bfd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionSplitPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionSplitPolicy.java
@@ -25,13 +25,14 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -246,7 +247,7 @@ public class TestRegionSplitPolicy {
HStore mockStore = Mockito.mock(HStore.class);
Mockito.doReturn(2000L).when(mockStore).getSize();
Mockito.doReturn(true).when(mockStore).canSplit();
- Mockito.doReturn(Bytes.toBytes("abcd")).when(mockStore).getSplitPoint();
+ Mockito.doReturn(Optional.of(Bytes.toBytes("abcd"))).when(mockStore).getSplitPoint();
stores.add(mockStore);
KeyPrefixRegionSplitPolicy policy = (KeyPrefixRegionSplitPolicy) RegionSplitPolicy
@@ -322,8 +323,7 @@ public class TestRegionSplitPolicy {
HStore mockStore = Mockito.mock(HStore.class);
Mockito.doReturn(2000L).when(mockStore).getSize();
Mockito.doReturn(true).when(mockStore).canSplit();
- Mockito.doReturn(Bytes.toBytes("store 1 split"))
- .when(mockStore).getSplitPoint();
+ Mockito.doReturn(Optional.of(Bytes.toBytes("store 1 split"))).when(mockStore).getSplitPoint();
stores.add(mockStore);
assertEquals("store 1 split",
@@ -333,8 +333,7 @@ public class TestRegionSplitPolicy {
HStore mockStore2 = Mockito.mock(HStore.class);
Mockito.doReturn(4000L).when(mockStore2).getSize();
Mockito.doReturn(true).when(mockStore2).canSplit();
- Mockito.doReturn(Bytes.toBytes("store 2 split"))
- .when(mockStore2).getSplitPoint();
+ Mockito.doReturn(Optional.of(Bytes.toBytes("store 2 split"))).when(mockStore2).getSplitPoint();
stores.add(mockStore2);
assertEquals("store 2 split",
@@ -355,7 +354,7 @@ public class TestRegionSplitPolicy {
HStore mockStore = Mockito.mock(HStore.class);
Mockito.doReturn(2000L).when(mockStore).getSize();
Mockito.doReturn(true).when(mockStore).canSplit();
- Mockito.doReturn(Bytes.toBytes("ab,cd")).when(mockStore).getSplitPoint();
+ Mockito.doReturn(Optional.of(Bytes.toBytes("ab,cd"))).when(mockStore).getSplitPoint();
stores.add(mockStore);
DelimitedKeyPrefixRegionSplitPolicy policy = (DelimitedKeyPrefixRegionSplitPolicy) RegionSplitPolicy
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
index 8b34a2f..dbf3be0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
@@ -113,7 +113,7 @@ public class TestReversibleScanners {
.withFileContext(hFileContext).build();
writeStoreFile(writer);
- StoreFile sf = new HStoreFile(fs, writer.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
+ HStoreFile sf = new HStoreFile(fs, writer.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
BloomType.NONE, true);
List<StoreFileScanner> scanners = StoreFileScanner
@@ -167,10 +167,10 @@ public class TestReversibleScanners {
writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1,
writer2 });
- StoreFile sf1 = new HStoreFile(fs, writer1.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
+ HStoreFile sf1 = new HStoreFile(fs, writer1.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
BloomType.NONE, true);
- StoreFile sf2 = new HStoreFile(fs, writer2.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
+ HStoreFile sf2 = new HStoreFile(fs, writer2.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
BloomType.NONE, true);
/**
* Test without MVCC
@@ -257,10 +257,10 @@ public class TestReversibleScanners {
writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1,
writer2 });
- StoreFile sf1 = new HStoreFile(fs, writer1.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
+ HStoreFile sf1 = new HStoreFile(fs, writer1.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
BloomType.NONE, true);
- StoreFile sf2 = new HStoreFile(fs, writer2.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
+ HStoreFile sf2 = new HStoreFile(fs, writer2.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
BloomType.NONE, true);
ScanInfo scanInfo =
@@ -418,19 +418,15 @@ public class TestReversibleScanners {
verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
}
- private StoreScanner getReversibleStoreScanner(MemStore memstore,
- StoreFile sf1, StoreFile sf2, Scan scan,
- ScanInfo scanInfo, int readPoint) throws IOException {
- List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, null,
- false, readPoint);
+ private StoreScanner getReversibleStoreScanner(MemStore memstore, HStoreFile sf1, HStoreFile sf2,
+ Scan scan, ScanInfo scanInfo, int readPoint) throws IOException {
+ List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, null, false, readPoint);
NavigableSet<byte[]> columns = null;
- for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap()
- .entrySet()) {
+ for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
// Should only one family
columns = entry.getValue();
}
- StoreScanner storeScanner = new ReversedStoreScanner(scan, scanInfo,
- columns, scanners);
+ StoreScanner storeScanner = new ReversedStoreScanner(scan, scanInfo, columns, scanners);
return storeScanner;
}
@@ -487,22 +483,17 @@ public class TestReversibleScanners {
assertEquals(null, kvHeap.peek());
}
- private ReversedKeyValueHeap getReversibleKeyValueHeap(MemStore memstore,
- StoreFile sf1, StoreFile sf2, byte[] startRow, int readPoint)
- throws IOException {
- List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, startRow,
- true, readPoint);
- ReversedKeyValueHeap kvHeap = new ReversedKeyValueHeap(scanners,
- CellComparator.COMPARATOR);
+ private ReversedKeyValueHeap getReversibleKeyValueHeap(MemStore memstore, HStoreFile sf1,
+ HStoreFile sf2, byte[] startRow, int readPoint) throws IOException {
+ List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, startRow, true, readPoint);
+ ReversedKeyValueHeap kvHeap = new ReversedKeyValueHeap(scanners, CellComparator.COMPARATOR);
return kvHeap;
}
- private List<KeyValueScanner> getScanners(MemStore memstore, StoreFile sf1,
- StoreFile sf2, byte[] startRow, boolean doSeek, int readPoint)
- throws IOException {
- List<StoreFileScanner> fileScanners = StoreFileScanner
- .getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true,
- false, false, readPoint);
+ private List<KeyValueScanner> getScanners(MemStore memstore, HStoreFile sf1, HStoreFile sf2,
+ byte[] startRow, boolean doSeek, int readPoint) throws IOException {
+ List<StoreFileScanner> fileScanners = StoreFileScanner.getScannersForStoreFiles(
+ Lists.newArrayList(sf1, sf2), false, true, false, false, readPoint);
List<KeyValueScanner> memScanners = memstore.getScanners(readPoint);
List<KeyValueScanner> scanners = new ArrayList<>(fileScanners.size() + 1);
scanners.addAll(fileScanners);
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
index 5dce4ad..0c014fd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
+import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY;
+
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@@ -171,10 +174,10 @@ public class TestScannerWithBulkload {
// Set a big MAX_SEQ_ID_KEY. Scan should not look at this seq id in a bulk loaded file.
// Scan should only look at the seq id appended at the bulk load time, and not skip its
// kv.
- writer.appendFileInfo(StoreFile.MAX_SEQ_ID_KEY, Bytes.toBytes(new Long(9999999)));
+ writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(new Long(9999999)));
}
else {
- writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
+ writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
}
writer.close();
return hfilePath;