You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2015/02/22 21:56:18 UTC
[45/50] [abbrv] hbase git commit: Merge branch 'master' (2/11/15)
into hbase-11339
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
index 9a8b7d9,0000000..4bf1623
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
@@@ -1,652 -1,0 +1,652 @@@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.filecompactions;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
- import org.apache.hadoop.hbase.LargeTests;
++import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestMobFileCompactor {
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private Configuration conf = null;
+ private String tableNameAsString;
+ private TableName tableName;
+ private static HTable hTable;
+ private static Admin admin;
+ private static HTableDescriptor desc;
+ private static HColumnDescriptor hcd1;
+ private static HColumnDescriptor hcd2;
+ private static FileSystem fs;
+ private final static String family1 = "family1";
+ private final static String family2 = "family2";
+ private final static String qf1 = "qualifier1";
+ private final static String qf2 = "qualifier2";
+ private static byte[] KEYS = Bytes.toBytes("012");
+ private static int regionNum = KEYS.length;
+ private static int delRowNum = 1;
+ private static int delCellNum = 6;
+ private static int cellNumPerRow = 3;
+ private static int rowNumPerFile = 2;
+ private static ExecutorService pool;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+ TEST_UTIL.startMiniCluster(1);
+ pool = createThreadPool(TEST_UTIL.getConfiguration());
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ pool.shutdown();
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ fs = TEST_UTIL.getTestFileSystem();
+ conf = TEST_UTIL.getConfiguration();
+ long tid = System.currentTimeMillis();
+ tableNameAsString = "testMob" + tid;
+ tableName = TableName.valueOf(tableNameAsString);
+ hcd1 = new HColumnDescriptor(family1);
+ hcd1.setMobEnabled(true);
+ hcd1.setMobThreshold(0L);
+ hcd1.setMaxVersions(4);
+ hcd2 = new HColumnDescriptor(family2);
+ hcd2.setMobEnabled(true);
+ hcd2.setMobThreshold(0L);
+ hcd2.setMaxVersions(4);
+ desc = new HTableDescriptor(tableName);
+ desc.addFamily(hcd1);
+ desc.addFamily(hcd2);
+ admin = TEST_UTIL.getHBaseAdmin();
+ admin.createTable(desc, getSplitKeys());
+ hTable = new HTable(conf, tableNameAsString);
+ hTable.setAutoFlush(false, false);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ admin.disableTable(tableName);
+ admin.deleteTable(tableName);
+ admin.close();
+ hTable.close();
+ fs.delete(TEST_UTIL.getDataTestDir(), true);
+ }
+
+ @Test
+ public void testCompactionWithoutDelFiles() throws Exception {
+ resetConf();
+ int count = 4;
+ // generate mob files
+ loadData(count, rowNumPerFile);
+ int rowNumPerRegion = count*rowNumPerFile;
+
+ assertEquals("Before compaction: mob rows count", regionNum*rowNumPerRegion,
+ countMobRows(hTable));
+ assertEquals("Before compaction: mob file count", regionNum*count, countFiles(true, family1));
+ assertEquals("Before compaction: del file count", 0, countFiles(false, family1));
+
+ MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
+ compactor.compact();
+
+ assertEquals("After compaction: mob rows count", regionNum*rowNumPerRegion,
+ countMobRows(hTable));
+ assertEquals("After compaction: mob file count", regionNum, countFiles(true, family1));
+ assertEquals("After compaction: del file count", 0, countFiles(false, family1));
+ }
+
+ @Test
+ public void testCompactionWithDelFiles() throws Exception {
+ resetConf();
+ int count = 4;
+ // generate mob files
+ loadData(count, rowNumPerFile);
+ int rowNumPerRegion = count*rowNumPerFile;
+
+ assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion,
+ countMobRows(hTable));
+ assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion,
+ countMobCells(hTable));
+ assertEquals("Before deleting: family1 mob file count", regionNum*count,
+ countFiles(true, family1));
+ assertEquals("Before deleting: family2 mob file count", regionNum*count,
+ countFiles(true, family2));
+
+ createDelFile();
+
+ assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+ countMobRows(hTable));
+ assertEquals("Before compaction: mob cells count",
+ regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+ assertEquals("Before compaction: family1 mob file count", regionNum*count,
+ countFiles(true, family1));
+ assertEquals("Before compaction: family2 file count", regionNum*count,
+ countFiles(true, family2));
+ assertEquals("Before compaction: family1 del file count", regionNum,
+ countFiles(false, family1));
+ assertEquals("Before compaction: family2 del file count", regionNum,
+ countFiles(false, family2));
+
+ // do the mob file compaction
+ MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
+ compactor.compact();
+
+ assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+ countMobRows(hTable));
+ assertEquals("After compaction: mob cells count",
+ regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+ assertEquals("After compaction: family1 mob file count", regionNum,
+ countFiles(true, family1));
+ assertEquals("After compaction: family2 mob file count", regionNum*count,
+ countFiles(true, family2));
+ assertEquals("After compaction: family1 del file count", 0, countFiles(false, family1));
+ assertEquals("After compaction: family2 del file count", regionNum,
+ countFiles(false, family2));
+ assertRefFileNameEqual(family1);
+ }
+
+ private void assertRefFileNameEqual(String familyName) throws IOException {
+ Scan scan = new Scan();
+ scan.addFamily(Bytes.toBytes(familyName));
+ // Do not retrieve the mob data when scanning
+ scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+ ResultScanner results = hTable.getScanner(scan);
+ Path mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(),
+ tableName), familyName);
+ List<Path> actualFilePaths = new ArrayList<>();
+ List<Path> expectFilePaths = new ArrayList<>();
+ for (Result res : results) {
+ for (Cell cell : res.listCells()) {
+ byte[] referenceValue = CellUtil.cloneValue(cell);
+ String fileName = Bytes.toString(referenceValue, Bytes.SIZEOF_INT,
+ referenceValue.length - Bytes.SIZEOF_INT);
+ Path targetPath = new Path(mobFamilyPath, fileName);
+ if(!actualFilePaths.contains(targetPath)) {
+ actualFilePaths.add(targetPath);
+ }
+ }
+ }
+ results.close();
+ if (fs.exists(mobFamilyPath)) {
+ FileStatus[] files = fs.listStatus(mobFamilyPath);
+ for (FileStatus file : files) {
+ if (!StoreFileInfo.isDelFile(file.getPath())) {
+ expectFilePaths.add(file.getPath());
+ }
+ }
+ }
+ Collections.sort(actualFilePaths);
+ Collections.sort(expectFilePaths);
+ assertEquals(expectFilePaths, actualFilePaths);
+ }
+
+ @Test
+ public void testCompactionWithDelFilesAndNotMergeAllFiles() throws Exception {
+ resetConf();
+ int mergeSize = 5000;
+ // change the mob compaction merge size
+ conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
+
+ int count = 4;
+ // generate mob files
+ loadData(count, rowNumPerFile);
+ int rowNumPerRegion = count*rowNumPerFile;
+
+ assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion,
+ countMobRows(hTable));
+ assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion,
+ countMobCells(hTable));
+ assertEquals("Before deleting: mob file count", regionNum*count, countFiles(true, family1));
+
+ int largeFilesCount = countLargeFiles(mergeSize, family1);
+ createDelFile();
+
+ assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+ countMobRows(hTable));
+ assertEquals("Before compaction: mob cells count",
+ regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+ assertEquals("Before compaction: family1 mob file count", regionNum*count,
+ countFiles(true, family1));
+ assertEquals("Before compaction: family2 mob file count", regionNum*count,
+ countFiles(true, family2));
+ assertEquals("Before compaction: family1 del file count", regionNum,
+ countFiles(false, family1));
+ assertEquals("Before compaction: family2 del file count", regionNum,
+ countFiles(false, family2));
+
+ // do the mob file compaction
+ MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
+ compactor.compact();
+
+ assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+ countMobRows(hTable));
+ assertEquals("After compaction: mob cells count",
+ regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+ // After the compaction, the files smaller than the mob compaction merge size
+ // is merge to one file
+ assertEquals("After compaction: family1 mob file count", largeFilesCount + regionNum,
+ countFiles(true, family1));
+ assertEquals("After compaction: family2 mob file count", regionNum*count,
+ countFiles(true, family2));
+ assertEquals("After compaction: family1 del file count", regionNum,
+ countFiles(false, family1));
+ assertEquals("After compaction: family2 del file count", regionNum,
+ countFiles(false, family2));
+ }
+
+ @Test
+ public void testCompactionWithDelFilesAndWithSmallCompactionBatchSize() throws Exception {
+ resetConf();
+ int batchSize = 2;
+ conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, batchSize);
+ int count = 4;
+ // generate mob files
+ loadData(count, rowNumPerFile);
+ int rowNumPerRegion = count*rowNumPerFile;
+
+ assertEquals("Before deleting: mob row count", regionNum*rowNumPerRegion,
+ countMobRows(hTable));
+ assertEquals("Before deleting: family1 mob file count", regionNum*count,
+ countFiles(true, family1));
+ assertEquals("Before deleting: family2 mob file count", regionNum*count,
+ countFiles(true, family2));
+
+ createDelFile();
+
+ assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+ countMobRows(hTable));
+ assertEquals("Before compaction: mob cells count",
+ regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+ assertEquals("Before compaction: family1 mob file count", regionNum*count,
+ countFiles(true, family1));
+ assertEquals("Before compaction: family2 mob file count", regionNum*count,
+ countFiles(true, family2));
+ assertEquals("Before compaction: family1 del file count", regionNum,
+ countFiles(false, family1));
+ assertEquals("Before compaction: family2 del file count", regionNum,
+ countFiles(false, family2));
+
+ // do the mob file compaction
+ MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
+ compactor.compact();
+
+ assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+ countMobRows(hTable));
+ assertEquals("After compaction: mob cells count",
+ regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+ assertEquals("After compaction: family1 mob file count", regionNum*(count/batchSize),
+ countFiles(true, family1));
+ assertEquals("After compaction: family2 mob file count", regionNum*count,
+ countFiles(true, family2));
+ assertEquals("After compaction: family1 del file count", 0, countFiles(false, family1));
+ assertEquals("After compaction: family2 del file count", regionNum,
+ countFiles(false, family2));
+ }
+
+ @Test
+ public void testCompactionWithHFileLink() throws IOException, InterruptedException {
+ resetConf();
+ int count = 4;
+ // generate mob files
+ loadData(count, rowNumPerFile);
+ int rowNumPerRegion = count*rowNumPerFile;
+
+ long tid = System.currentTimeMillis();
+ byte[] snapshotName1 = Bytes.toBytes("snaptb-" + tid);
+ // take a snapshot
+ admin.snapshot(snapshotName1, tableName);
+
+ createDelFile();
+
+ assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+ countMobRows(hTable));
+ assertEquals("Before compaction: mob cells count",
+ regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+ assertEquals("Before compaction: family1 mob file count", regionNum*count,
+ countFiles(true, family1));
+ assertEquals("Before compaction: family2 mob file count", regionNum*count,
+ countFiles(true, family2));
+ assertEquals("Before compaction: family1 del file count", regionNum,
+ countFiles(false, family1));
+ assertEquals("Before compaction: family2 del file count", regionNum,
+ countFiles(false, family2));
+
+ // do the mob file compaction
+ MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
+ compactor.compact();
+
+ assertEquals("After first compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+ countMobRows(hTable));
+ assertEquals("After first compaction: mob cells count",
+ regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+ assertEquals("After first compaction: family1 mob file count", regionNum,
+ countFiles(true, family1));
+ assertEquals("After first compaction: family2 mob file count", regionNum*count,
+ countFiles(true, family2));
+ assertEquals("After first compaction: family1 del file count", 0, countFiles(false, family1));
+ assertEquals("After first compaction: family2 del file count", regionNum,
+ countFiles(false, family2));
+ assertEquals("After first compaction: family1 hfilelink count", 0, countHFileLinks(family1));
+ assertEquals("After first compaction: family2 hfilelink count", 0, countHFileLinks(family2));
+
+ admin.disableTable(tableName);
+ // Restore from snapshot, the hfilelink will exist in mob dir
+ admin.restoreSnapshot(snapshotName1);
+ admin.enableTable(tableName);
+
+ assertEquals("After restoring snapshot: mob rows count", regionNum*rowNumPerRegion,
+ countMobRows(hTable));
+ assertEquals("After restoring snapshot: mob cells count",
+ regionNum*cellNumPerRow*rowNumPerRegion, countMobCells(hTable));
+ assertEquals("After restoring snapshot: family1 mob file count", regionNum*count,
+ countFiles(true, family1));
+ assertEquals("After restoring snapshot: family2 mob file count", regionNum*count,
+ countFiles(true, family2));
+ assertEquals("After restoring snapshot: family1 del file count", 0,
+ countFiles(false, family1));
+ assertEquals("After restoring snapshot: family2 del file count", 0,
+ countFiles(false, family2));
+ assertEquals("After restoring snapshot: family1 hfilelink count", regionNum*count,
+ countHFileLinks(family1));
+ assertEquals("After restoring snapshot: family2 hfilelink count", 0,
+ countHFileLinks(family2));
+
+ compactor.compact();
+
+ assertEquals("After second compaction: mob rows count", regionNum*rowNumPerRegion,
+ countMobRows(hTable));
+ assertEquals("After second compaction: mob cells count",
+ regionNum*cellNumPerRow*rowNumPerRegion, countMobCells(hTable));
+ assertEquals("After second compaction: family1 mob file count", regionNum,
+ countFiles(true, family1));
+ assertEquals("After second compaction: family2 mob file count", regionNum*count,
+ countFiles(true, family2));
+ assertEquals("After second compaction: family1 del file count", 0, countFiles(false, family1));
+ assertEquals("After second compaction: family2 del file count", 0, countFiles(false, family2));
+ assertEquals("After second compaction: family1 hfilelink count", 0, countHFileLinks(family1));
+ assertEquals("After second compaction: family2 hfilelink count", 0, countHFileLinks(family2));
+ }
+
+ /**
+ * Gets the number of rows in the given table.
+ * @param table to get the scanner
+ * @return the number of rows
+ */
+ private int countMobRows(final HTable table) throws IOException {
+ Scan scan = new Scan();
+ // Do not retrieve the mob data when scanning
+ scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+ ResultScanner results = table.getScanner(scan);
+ int count = 0;
+ for (Result res : results) {
+ count++;
+ }
+ results.close();
+ return count;
+ }
+
+ /**
+ * Gets the number of cells in the given table.
+ * @param table to get the scanner
+ * @return the number of cells
+ */
+ private int countMobCells(final HTable table) throws IOException {
+ Scan scan = new Scan();
+ // Do not retrieve the mob data when scanning
+ scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+ ResultScanner results = table.getScanner(scan);
+ int count = 0;
+ for (Result res : results) {
+ for (Cell cell : res.listCells()) {
+ count++;
+ }
+ }
+ results.close();
+ return count;
+ }
+
+ /**
+ * Gets the number of files in the mob path.
+ * @param isMobFile gets number of the mob files or del files
+ * @param familyName the family name
+ * @return the number of the files
+ */
+ private int countFiles(boolean isMobFile, String familyName) throws IOException {
+ Path mobDirPath = MobUtils.getMobFamilyPath(
+ MobUtils.getMobRegionPath(conf, tableName), familyName);
+ int count = 0;
+ if (fs.exists(mobDirPath)) {
+ FileStatus[] files = fs.listStatus(mobDirPath);
+ for (FileStatus file : files) {
+ if (isMobFile == true) {
+ if (!StoreFileInfo.isDelFile(file.getPath())) {
+ count++;
+ }
+ } else {
+ if (StoreFileInfo.isDelFile(file.getPath())) {
+ count++;
+ }
+ }
+ }
+ }
+ return count;
+ }
+
+ /**
+ * Gets the number of HFileLink in the mob path.
+ * @param familyName the family name
+ * @return the number of the HFileLink
+ */
+ private int countHFileLinks(String familyName) throws IOException {
+ Path mobDirPath = MobUtils.getMobFamilyPath(
+ MobUtils.getMobRegionPath(conf, tableName), familyName);
+ int count = 0;
+ if (fs.exists(mobDirPath)) {
+ FileStatus[] files = fs.listStatus(mobDirPath);
+ for (FileStatus file : files) {
+ if (HFileLink.isHFileLink(file.getPath())) {
+ count++;
+ }
+ }
+ }
+ return count;
+ }
+
+ /**
+ * Gets the number of files.
+ * @param size the size of the file
+ * @param familyName the family name
+ * @return the number of files large than the size
+ */
+ private int countLargeFiles(int size, String familyName) throws IOException {
+ Path mobDirPath = MobUtils.getMobFamilyPath(
+ MobUtils.getMobRegionPath(conf, tableName), familyName);
+ int count = 0;
+ if (fs.exists(mobDirPath)) {
+ FileStatus[] files = fs.listStatus(mobDirPath);
+ for (FileStatus file : files) {
+ // ignore the del files in the mob path
+ if ((!StoreFileInfo.isDelFile(file.getPath()))
+ && (file.getLen() > size)) {
+ count++;
+ }
+ }
+ }
+ return count;
+ }
+
+ /**
+ * loads some data to the table.
+ * @param count the mob file number
+ */
+ private void loadData(int fileNum, int rowNumPerFile) throws IOException,
+ InterruptedException {
+ if (fileNum <= 0) {
+ throw new IllegalArgumentException();
+ }
+ for (byte k0 : KEYS) {
+ byte[] k = new byte[] { k0 };
+ for (int i = 0; i < fileNum * rowNumPerFile; i++) {
+ byte[] key = Bytes.add(k, Bytes.toBytes(i));
+ byte[] mobVal = makeDummyData(10 * (i + 1));
+ Put put = new Put(key);
+ put.setDurability(Durability.SKIP_WAL);
+ put.add(Bytes.toBytes(family1), Bytes.toBytes(qf1), mobVal);
+ put.add(Bytes.toBytes(family1), Bytes.toBytes(qf2), mobVal);
+ put.add(Bytes.toBytes(family2), Bytes.toBytes(qf1), mobVal);
+ hTable.put(put);
+ if ((i + 1) % rowNumPerFile == 0) {
+ hTable.flushCommits();
+ admin.flush(tableName);
+ }
+ }
+ }
+ }
+
+ /**
+ * delete the row, family and cell to create the del file
+ */
+ private void createDelFile() throws IOException, InterruptedException {
+ for (byte k0 : KEYS) {
+ byte[] k = new byte[] { k0 };
+ // delete a family
+ byte[] key1 = Bytes.add(k, Bytes.toBytes(0));
+ Delete delete1 = new Delete(key1);
+ delete1.deleteFamily(Bytes.toBytes(family1));
+ hTable.delete(delete1);
+ // delete one row
+ byte[] key2 = Bytes.add(k, Bytes.toBytes(2));
+ Delete delete2 = new Delete(key2);
+ hTable.delete(delete2);
+ // delete one cell
+ byte[] key3 = Bytes.add(k, Bytes.toBytes(4));
+ Delete delete3 = new Delete(key3);
+ delete3.deleteColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1));
+ hTable.delete(delete3);
+ hTable.flushCommits();
+ admin.flush(tableName);
+ List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(
+ Bytes.toBytes(tableNameAsString));
+ for (HRegion region : regions) {
+ region.waitForFlushesAndCompactions();
+ region.compactStores(true);
+ }
+ }
+ }
+ /**
+ * Creates the dummy data with a specific size.
+ * @param the size of data
+ * @return the dummy data
+ */
+ private byte[] makeDummyData(int size) {
+ byte[] dummyData = new byte[size];
+ new Random().nextBytes(dummyData);
+ return dummyData;
+ }
+
+ /**
+ * Gets the split keys
+ */
+ public static byte[][] getSplitKeys() {
+ byte[][] splitKeys = new byte[KEYS.length - 1][];
+ for (int i = 0; i < splitKeys.length; ++i) {
+ splitKeys[i] = new byte[] { KEYS[i + 1] };
+ }
+ return splitKeys;
+ }
+
+ private static ExecutorService createThreadPool(Configuration conf) {
+ int maxThreads = 10;
+ long keepAliveTime = 60;
+ final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
+ ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads,
+ keepAliveTime, TimeUnit.SECONDS, queue,
+ Threads.newDaemonThreadFactory("MobFileCompactionChore"),
+ new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ try {
+ // waiting for a thread to pick up instead of throwing exceptions.
+ queue.put(r);
+ } catch (InterruptedException e) {
+ throw new RejectedExecutionException(e);
+ }
+ }
+ });
+ ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
+ return pool;
+ }
+
+ /**
+ * Resets the configuration.
+ */
+ private void resetConf() {
+ conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD,
+ MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD);
+ conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE,
+ MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactionRequest.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactionRequest.java
index ac66d95,0000000..f9159aa
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactionRequest.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactionRequest.java
@@@ -1,60 -1,0 +1,60 @@@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.filecompactions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hbase.SmallTests;
++import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition;
+import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartitionId;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestPartitionedMobFileCompactionRequest {
+
+ @Test
+ public void testCompactedPartitionId() {
+ String startKey1 = "startKey1";
+ String startKey2 = "startKey2";
+ String date1 = "date1";
+ String date2 = "date2";
+ CompactionPartitionId partitionId1 = new CompactionPartitionId(startKey1, date1);
+ CompactionPartitionId partitionId2 = new CompactionPartitionId(startKey2, date2);
+ CompactionPartitionId partitionId3 = new CompactionPartitionId(startKey1, date2);
+
+ Assert.assertTrue(partitionId1.equals(partitionId1));
+ Assert.assertFalse(partitionId1.equals(partitionId2));
+ Assert.assertFalse(partitionId1.equals(partitionId3));
+ Assert.assertFalse(partitionId2.equals(partitionId3));
+
+ Assert.assertEquals(startKey1, partitionId1.getStartKey());
+ Assert.assertEquals(date1, partitionId1.getDate());
+ }
+
+ @Test
+ public void testCompactedPartition() {
+ CompactionPartitionId partitionId = new CompactionPartitionId("startKey1", "date1");
+ CompactionPartition partition = new CompactionPartition(partitionId);
+ FileStatus file = new FileStatus(1, false, 1, 1024, 1, new Path("/test"));
+ partition.addFile(file);
+ Assert.assertEquals(file, partition.listFiles().get(0));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java
index 1d64c0c,0000000..12c88b2
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java
@@@ -1,423 -1,0 +1,423 @@@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.filecompactions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
- import org.apache.hadoop.hbase.LargeTests;
++import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobFileName;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactionRequest.CompactionType;
+import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestPartitionedMobFileCompactor {
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private final static String family = "family";
+ private final static String qf = "qf";
+ private HColumnDescriptor hcd = new HColumnDescriptor(family);
+ private Configuration conf = TEST_UTIL.getConfiguration();
+ private CacheConfig cacheConf = new CacheConfig(conf);
+ private FileSystem fs;
+ private List<FileStatus> mobFiles = new ArrayList<>();
+ private List<FileStatus> delFiles = new ArrayList<>();
+ private List<FileStatus> allFiles = new ArrayList<>();
+ private Path basePath;
+ private String mobSuffix;
+ private String delSuffix;
+ private static ExecutorService pool;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+ TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+ TEST_UTIL.startMiniCluster(1);
+ pool = createThreadPool(TEST_UTIL.getConfiguration());
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ pool.shutdown();
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ private void init(String tableName) throws Exception {
+ fs = FileSystem.get(conf);
+ Path testDir = FSUtils.getRootDir(conf);
+ Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
+ basePath = new Path(new Path(mobTestDir, tableName), family);
+ mobSuffix = UUID.randomUUID().toString().replaceAll("-", "");
+ delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del";
+ }
+
+ @Test
+ public void testCompactionSelectWithAllFiles() throws Exception {
+ resetConf();
+ String tableName = "testCompactionSelectWithAllFiles";
+ init(tableName);
+ int count = 10;
+ // create 10 mob files.
+ createStoreFiles(basePath, family, qf, count, Type.Put);
+ // create 10 del files
+ createStoreFiles(basePath, family, qf, count, Type.Delete);
+ listFiles();
+ long mergeSize = MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD;
+ List<String> expectedStartKeys = new ArrayList<>();
+ for(FileStatus file : mobFiles) {
+ if(file.getLen() < mergeSize) {
+ String fileName = file.getPath().getName();
+ String startKey = fileName.substring(0, 32);
+ expectedStartKeys.add(startKey);
+ }
+ }
+ testSelectFiles(tableName, CompactionType.ALL_FILES, expectedStartKeys);
+ }
+
+ @Test
+ public void testCompactionSelectWithPartFiles() throws Exception {
+ resetConf();
+ String tableName = "testCompactionSelectWithPartFiles";
+ init(tableName);
+ int count = 10;
+ // create 10 mob files.
+ createStoreFiles(basePath, family, qf, count, Type.Put);
+ // create 10 del files
+ createStoreFiles(basePath, family, qf, count, Type.Delete);
+ listFiles();
+ long mergeSize = 4000;
+ List<String> expectedStartKeys = new ArrayList<>();
+ for(FileStatus file : mobFiles) {
+ if(file.getLen() < 4000) {
+ String fileName = file.getPath().getName();
+ String startKey = fileName.substring(0, 32);
+ expectedStartKeys.add(startKey);
+ }
+ }
+ // set the mob file compaction mergeable threshold
+ conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
+ testSelectFiles(tableName, CompactionType.PART_FILES, expectedStartKeys);
+ }
+
+ @Test
+ public void testCompactDelFilesWithDefaultBatchSize() throws Exception {
+ resetConf();
+ String tableName = "testCompactDelFilesWithDefaultBatchSize";
+ init(tableName);
+ // create 20 mob files.
+ createStoreFiles(basePath, family, qf, 20, Type.Put);
+ // create 13 del files
+ createStoreFiles(basePath, family, qf, 13, Type.Delete);
+ listFiles();
+ testCompactDelFiles(tableName, 1, 13);
+ }
+
+ @Test
+ public void testCompactDelFilesWithSmallBatchSize() throws Exception {
+ resetConf();
+ String tableName = "testCompactDelFilesWithSmallBatchSize";
+ init(tableName);
+ // create 20 mob files.
+ createStoreFiles(basePath, family, qf, 20, Type.Put);
+ // create 13 del files
+ createStoreFiles(basePath, family, qf, 13, Type.Delete);
+ listFiles();
+
+ // set the mob file compaction batch size
+ conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 4);
+ testCompactDelFiles(tableName, 1, 13);
+ }
+
+ @Test
+ public void testCompactDelFilesChangeMaxDelFileCount() throws Exception {
+ resetConf();
+ String tableName = "testCompactDelFilesWithSmallBatchSize";
+ init(tableName);
+ // create 20 mob files.
+ createStoreFiles(basePath, family, qf, 20, Type.Put);
+ // create 13 del files
+ createStoreFiles(basePath, family, qf, 13, Type.Delete);
+ listFiles();
+
+ // set the max del file count
+ conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, 5);
+ // set the mob file compaction batch size
+ conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 2);
+ testCompactDelFiles(tableName, 4, 13);
+ }
+
+ /**
+ * Tests the selectFiles
+ * @param tableName the table name
+ * @param type the expected compaction type
+ * @param expected the expected start keys
+ */
+ private void testSelectFiles(String tableName, final CompactionType type,
+ final List<String> expected) throws IOException {
+ PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs,
+ TableName.valueOf(tableName), hcd, pool) {
+ @Override
+ public List<Path> compact(List<FileStatus> files) throws IOException {
+ if (files == null || files.isEmpty()) {
+ return null;
+ }
+ PartitionedMobFileCompactionRequest request = select(files);
+ // assert the compaction type is ALL_FILES
+ Assert.assertEquals(type, request.type);
+ // assert get the right partitions
+ compareCompactedPartitions(expected, request.compactionPartitions);
+ // assert get the right del files
+ compareDelFiles(request.delFiles);
+ return null;
+ }
+ };
+ compactor.compact(allFiles);
+ }
+
+ /**
+ * Tests the compacteDelFile
+ * @param tableName the table name
+ * @param expectedFileCount the expected file count
+ * @param expectedCellCount the expected cell count
+ */
+ private void testCompactDelFiles(String tableName, final int expectedFileCount,
+ final int expectedCellCount) throws IOException {
+ PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs,
+ TableName.valueOf(tableName), hcd, pool) {
+ @Override
+ protected List<Path> performCompaction(PartitionedMobFileCompactionRequest request)
+ throws IOException {
+ List<Path> delFilePaths = new ArrayList<Path>();
+ for (FileStatus delFile : request.delFiles) {
+ delFilePaths.add(delFile.getPath());
+ }
+ List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
+ // assert the del files are merged.
+ Assert.assertEquals(expectedFileCount, newDelPaths.size());
+ Assert.assertEquals(expectedCellCount, countDelCellsInDelFiles(newDelPaths));
+ return null;
+ }
+ };
+
+ compactor.compact(allFiles);
+ }
+
+ /**
+ * Lists the files in the path
+ */
+ private void listFiles() throws IOException {
+ for (FileStatus file : fs.listStatus(basePath)) {
+ allFiles.add(file);
+ if (file.getPath().getName().endsWith("_del")) {
+ delFiles.add(file);
+ } else {
+ mobFiles.add(file);
+ }
+ }
+ }
+
+ /**
+ * Compares the compacted partitions.
+ * @param partitions the collection of CompactedPartitions
+ */
+ private void compareCompactedPartitions(List<String> expected,
+ Collection<CompactionPartition> partitions) {
+ List<String> actualKeys = new ArrayList<>();
+ for (CompactionPartition partition : partitions) {
+ actualKeys.add(partition.getPartitionId().getStartKey());
+ }
+ Collections.sort(expected);
+ Collections.sort(actualKeys);
+ Assert.assertEquals(expected.size(), actualKeys.size());
+ for (int i = 0; i < expected.size(); i++) {
+ Assert.assertEquals(expected.get(i), actualKeys.get(i));
+ }
+ }
+
+ /**
+ * Compares the del files.
+ * @param allDelFiles all the del files
+ */
+ private void compareDelFiles(Collection<FileStatus> allDelFiles) {
+ int i = 0;
+ for (FileStatus file : allDelFiles) {
+ Assert.assertEquals(delFiles.get(i), file);
+ i++;
+ }
+ }
+
+ /**
+ * Creates store files.
+ * @param basePath the path to create file
+ * @family the family name
+ * @qualifier the column qualifier
+ * @count the store file number
+ * @type the key type
+ */
+ private void createStoreFiles(Path basePath, String family, String qualifier, int count,
+ Type type) throws IOException {
+ HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
+ String startKey = "row_";
+ MobFileName mobFileName = null;
+ for (int i = 0; i < count; i++) {
+ byte[] startRow = Bytes.toBytes(startKey + i) ;
+ if(type.equals(Type.Delete)) {
+ mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
+ new Date()), delSuffix);
+ }
+ if(type.equals(Type.Put)){
+ mobFileName = MobFileName.create(Bytes.toBytes(startKey + i), MobUtils.formatDate(
+ new Date()), mobSuffix);
+ }
+ StoreFile.Writer mobFileWriter = new StoreFile.WriterBuilder(conf, cacheConf, fs)
+ .withFileContext(meta).withFilePath(new Path(basePath, mobFileName.getFileName())).build();
+ writeStoreFile(mobFileWriter, startRow, Bytes.toBytes(family), Bytes.toBytes(qualifier),
+ type, (i+1)*1000);
+ }
+ }
+
+ /**
+ * Writes data to store file.
+ * @param writer the store file writer
+ * @param row the row key
+ * @param family the family name
+ * @param qualifier the column qualifier
+ * @param type the key type
+ * @param size the size of value
+ */
+ private static void writeStoreFile(final StoreFile.Writer writer, byte[]row, byte[] family,
+ byte[] qualifier, Type type, int size) throws IOException {
+ long now = System.currentTimeMillis();
+ try {
+ byte[] dummyData = new byte[size];
+ new Random().nextBytes(dummyData);
+ writer.append(new KeyValue(row, family, qualifier, now, type, dummyData));
+ } finally {
+ writer.close();
+ }
+ }
+
+ /**
+ * Gets the number of del cell in the del files
+ * @param paths the del file paths
+ * @return the cell size
+ */
+ private int countDelCellsInDelFiles(List<Path> paths) throws IOException {
+ List<StoreFile> sfs = new ArrayList<StoreFile>();
+ int size = 0;
+ for(Path path : paths) {
+ StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
+ sfs.add(sf);
+ }
+ List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true,
+ false, null, HConstants.LATEST_TIMESTAMP);
+ Scan scan = new Scan();
+ scan.setMaxVersions(hcd.getMaxVersions());
+ long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
+ long ttl = HStore.determineTTLFromFamily(hcd);
+ ScanInfo scanInfo = new ScanInfo(hcd, ttl, timeToPurgeDeletes, KeyValue.COMPARATOR);
+ StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.COMPACT_RETAIN_DELETES, null,
+ scanners, 0L, HConstants.LATEST_TIMESTAMP);
+ List<Cell> results = new ArrayList<>();
+ boolean hasMore = true;
+ while (hasMore) {
+ hasMore = scanner.next(results);
+ size += results.size();
+ results.clear();
+ }
+ scanner.close();
+ return size;
+ }
+
+ private static ExecutorService createThreadPool(Configuration conf) {
+ int maxThreads = 10;
+ long keepAliveTime = 60;
+ final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
+ ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime,
+ TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"),
+ new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ try {
+ // waiting for a thread to pick up instead of throwing exceptions.
+ queue.put(r);
+ } catch (InterruptedException e) {
+ throw new RejectedExecutionException(e);
+ }
+ }
+ });
+ ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
+ return pool;
+ }
+
+ /**
+ * Resets the configuration.
+ */
+ private void resetConf() {
+ conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD,
+ MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD);
+ conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
+ conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE,
+ MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
index e0b9a83,0000000..49345e4
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
@@@ -1,168 -1,0 +1,168 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
- import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
++import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.serializer.JavaSerialization;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestMobSweepJob {
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+ TEST_UTIL.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
+ JavaSerialization.class.getName() + "," + WritableSerialization.class.getName());
+ TEST_UTIL.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ private void writeFileNames(FileSystem fs, Configuration conf, Path path,
+ String[] filesNames) throws IOException {
+ // write the names to a sequence file
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path,
+ String.class, String.class);
+ try {
+ for (String fileName : filesNames) {
+ writer.append(fileName, MobConstants.EMPTY_STRING);
+ }
+ } finally {
+ IOUtils.closeStream(writer);
+ }
+ }
+
+ @Test
+ public void testSweeperJobWithOutUnusedFile() throws Exception {
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ Configuration configuration = new Configuration(
+ TEST_UTIL.getConfiguration());
+ Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+ "/hbase/mobcompaction/SweepJob/working/names/0/visited");
+ Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+ "/hbase/mobcompaction/SweepJob/working/names/0/all");
+ configuration.set(SweepJob.WORKING_VISITED_DIR_KEY,
+ vistiedFileNamesPath.toString());
+ configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY,
+ allFileNamesPath.toString());
+
+ writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1",
+ "2", "3", "4", "5", "6"});
+
+ Path r0 = new Path(vistiedFileNamesPath, "r0");
+ writeFileNames(fs, configuration, r0, new String[] { "1",
+ "2", "3"});
+ Path r1 = new Path(vistiedFileNamesPath, "r1");
+ writeFileNames(fs, configuration, r1, new String[] { "1", "4", "5"});
+ Path r2 = new Path(vistiedFileNamesPath, "r2");
+ writeFileNames(fs, configuration, r2, new String[] { "2", "3", "6"});
+
+ SweepJob sweepJob = new SweepJob(configuration, fs);
+ List<String> toBeArchived = sweepJob.getUnusedFiles(configuration);
+
+ assertEquals(0, toBeArchived.size());
+ }
+
+ @Test
+ public void testSweeperJobWithUnusedFile() throws Exception {
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ Configuration configuration = new Configuration(
+ TEST_UTIL.getConfiguration());
+ Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+ "/hbase/mobcompaction/SweepJob/working/names/1/visited");
+ Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+ "/hbase/mobcompaction/SweepJob/working/names/1/all");
+ configuration.set(SweepJob.WORKING_VISITED_DIR_KEY,
+ vistiedFileNamesPath.toString());
+ configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY,
+ allFileNamesPath.toString());
+
+ writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1",
+ "2", "3", "4", "5", "6"});
+
+ Path r0 = new Path(vistiedFileNamesPath, "r0");
+ writeFileNames(fs, configuration, r0, new String[] { "1",
+ "2", "3"});
+ Path r1 = new Path(vistiedFileNamesPath, "r1");
+ writeFileNames(fs, configuration, r1, new String[] { "1", "5"});
+ Path r2 = new Path(vistiedFileNamesPath, "r2");
+ writeFileNames(fs, configuration, r2, new String[] { "2", "3"});
+
+ SweepJob sweepJob = new SweepJob(configuration, fs);
+ List<String> toBeArchived = sweepJob.getUnusedFiles(configuration);
+
+ assertEquals(2, toBeArchived.size());
+ assertEquals(new String[] { "4", "6" }, toBeArchived.toArray(new String[0]));
+ }
+
+ @Test
+ public void testSweeperJobWithRedundantFile() throws Exception {
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ Configuration configuration = new Configuration(
+ TEST_UTIL.getConfiguration());
+ Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+ "/hbase/mobcompaction/SweepJob/working/names/2/visited");
+ Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+ "/hbase/mobcompaction/SweepJob/working/names/2/all");
+ configuration.set(SweepJob.WORKING_VISITED_DIR_KEY,
+ vistiedFileNamesPath.toString());
+ configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY,
+ allFileNamesPath.toString());
+
+ writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1",
+ "2", "3", "4", "5", "6"});
+
+ Path r0 = new Path(vistiedFileNamesPath, "r0");
+ writeFileNames(fs, configuration, r0, new String[] { "1",
+ "2", "3"});
+ Path r1 = new Path(vistiedFileNamesPath, "r1");
+ writeFileNames(fs, configuration, r1, new String[] { "1", "5", "6", "7"});
+ Path r2 = new Path(vistiedFileNamesPath, "r2");
+ writeFileNames(fs, configuration, r2, new String[] { "2", "3", "4"});
+
+ SweepJob sweepJob = new SweepJob(configuration, fs);
+ List<String> toBeArchived = sweepJob.getUnusedFiles(configuration);
+
+ assertEquals(0, toBeArchived.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
index 2aa3a4a,0000000..9e95a39
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
@@@ -1,120 -1,0 +1,120 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
- import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.TableName;
++import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.master.TableLockManager;
+import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+@Category(SmallTests.class)
+public class TestMobSweepMapper {
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+ TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+ TEST_UTIL.startMiniCluster(1);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void TestMap() throws Exception {
+ String prefix = "0000";
+ final String fileName = "19691231f2cd014ea28f42788214560a21a44cef";
+ final String mobFilePath = prefix + fileName;
+
+ ImmutableBytesWritable r = new ImmutableBytesWritable(Bytes.toBytes("r"));
+ final KeyValue[] kvList = new KeyValue[1];
+ kvList[0] = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"),
+ Bytes.toBytes("column"), Bytes.toBytes(mobFilePath));
+
+ Result columns = mock(Result.class);
- when(columns.raw()).thenReturn(kvList);
++ when(columns.rawCells()).thenReturn(kvList);
+
+ Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
+ ZooKeeperWatcher zkw = new ZooKeeperWatcher(configuration, "1", new DummyMobAbortable());
+ TableName tn = TableName.valueOf("testSweepMapper");
+ TableName lockName = MobUtils.getTableLockName(tn);
+ String znode = ZKUtil.joinZNode(zkw.tableLockZNode, lockName.getNameAsString());
+ configuration.set(SweepJob.SWEEP_JOB_ID, "1");
+ configuration.set(SweepJob.SWEEP_JOB_TABLE_NODE, znode);
+ ServerName serverName = SweepJob.getCurrentServerName(configuration);
+ configuration.set(SweepJob.SWEEP_JOB_SERVERNAME, serverName.toString());
+
+ TableLockManager tableLockManager = TableLockManager.createTableLockManager(configuration, zkw,
+ serverName);
+ TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool");
+ lock.acquire();
+ try {
+ Mapper<ImmutableBytesWritable, Result, Text, KeyValue>.Context ctx =
+ mock(Mapper.Context.class);
+ when(ctx.getConfiguration()).thenReturn(configuration);
+ SweepMapper map = new SweepMapper();
+ doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ Text text = (Text) invocation.getArguments()[0];
+ KeyValue kv = (KeyValue) invocation.getArguments()[1];
+
+ assertEquals(Bytes.toString(text.getBytes(), 0, text.getLength()), fileName);
+ assertEquals(0, Bytes.compareTo(kv.getKey(), kvList[0].getKey()));
+
+ return null;
+ }
+ }).when(ctx).write(any(Text.class), any(KeyValue.class));
+
+ map.map(r, columns, ctx);
+ } finally {
+ lock.release();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
index 1a69d06,0000000..308b50e
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
@@@ -1,220 -1,0 +1,220 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
- import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.master.TableLockManager;
+import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
++import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.JavaSerialization;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.counters.GenericCounter;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Matchers;
+
+@Category(MediumTests.class)
+public class TestMobSweepReducer {
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private final static String tableName = "testSweepReducer";
+ private final static String row = "row";
+ private final static String family = "family";
+ private final static String qf = "qf";
+ private static HTable table;
+ private static Admin admin;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+
+ TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+
+ TEST_UTIL.startMiniCluster(1);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @SuppressWarnings("deprecation")
+ @Before
+ public void setUp() throws Exception {
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ hcd.setMobEnabled(true);
+ hcd.setMobThreshold(3L);
+ hcd.setMaxVersions(4);
+ desc.addFamily(hcd);
+
+ admin = TEST_UTIL.getHBaseAdmin();
+ admin.createTable(desc);
+ table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ admin.disableTable(TableName.valueOf(tableName));
+ admin.deleteTable(TableName.valueOf(tableName));
+ admin.close();
+ }
+
+ private List<String> getKeyFromSequenceFile(FileSystem fs, Path path,
+ Configuration conf) throws Exception {
+ List<String> list = new ArrayList<String>();
+ SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path));
+
+ String next = (String) reader.next((String) null);
+ while (next != null) {
+ list.add(next);
+ next = (String) reader.next((String) null);
+ }
+ reader.close();
+ return list;
+ }
+
+ @Test
+ public void testRun() throws Exception {
+
+ TableName tn = TableName.valueOf(tableName);
+ byte[] mobValueBytes = new byte[100];
+
+ //get the path where mob files lie in
+ Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, family);
+
+ Put put = new Put(Bytes.toBytes(row));
+ put.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
+ Put put2 = new Put(Bytes.toBytes(row + "ignore"));
+ put2.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
+ table.put(put);
+ table.put(put2);
+ table.flushCommits();
+ admin.flush(tn);
+
+ FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+ //check the generation of a mob file
+ assertEquals(1, fileStatuses.length);
+
+ String mobFile1 = fileStatuses[0].getPath().getName();
+
+ Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
+ configuration.setFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO, 0.6f);
+ configuration.setStrings(TableInputFormat.INPUT_TABLE, tableName);
+ configuration.setStrings(TableInputFormat.SCAN_COLUMN_FAMILY, family);
+ configuration.setStrings(SweepJob.WORKING_VISITED_DIR_KEY, "jobWorkingNamesDir");
+ configuration.setStrings(SweepJob.WORKING_FILES_DIR_KEY, "compactionFileDir");
+ configuration.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
+ JavaSerialization.class.getName());
+ configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, "compactionVisitedDir");
+ configuration.setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE,
+ System.currentTimeMillis() + 24 * 3600 * 1000);
+
+ ZooKeeperWatcher zkw = new ZooKeeperWatcher(configuration, "1", new DummyMobAbortable());
+ TableName lockName = MobUtils.getTableLockName(tn);
+ String znode = ZKUtil.joinZNode(zkw.tableLockZNode, lockName.getNameAsString());
+ configuration.set(SweepJob.SWEEP_JOB_ID, "1");
+ configuration.set(SweepJob.SWEEP_JOB_TABLE_NODE, znode);
+ ServerName serverName = SweepJob.getCurrentServerName(configuration);
+ configuration.set(SweepJob.SWEEP_JOB_SERVERNAME, serverName.toString());
+
+ TableLockManager tableLockManager = TableLockManager.createTableLockManager(configuration, zkw,
+ serverName);
+ TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool");
+ lock.acquire();
+ try {
+ // use the same counter when mocking
+ Counter counter = new GenericCounter();
+ Reducer<Text, KeyValue, Writable, Writable>.Context ctx = mock(Reducer.Context.class);
+ when(ctx.getConfiguration()).thenReturn(configuration);
+ when(ctx.getCounter(Matchers.any(SweepCounter.class))).thenReturn(counter);
+ when(ctx.nextKey()).thenReturn(true).thenReturn(false);
+ when(ctx.getCurrentKey()).thenReturn(new Text(mobFile1));
+
+ byte[] refBytes = Bytes.toBytes(mobFile1);
+ long valueLength = refBytes.length;
+ byte[] newValue = Bytes.add(Bytes.toBytes(valueLength), refBytes);
+ KeyValue kv2 = new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family), Bytes.toBytes(qf), 1,
+ KeyValue.Type.Put, newValue);
+ List<KeyValue> list = new ArrayList<KeyValue>();
+ list.add(kv2);
+
+ when(ctx.getValues()).thenReturn(list);
+
+ SweepReducer reducer = new SweepReducer();
+ reducer.run(ctx);
+ } finally {
+ lock.release();
+ }
+ FileStatus[] filsStatuses2 = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+ String mobFile2 = filsStatuses2[0].getPath().getName();
+ //new mob file is generated, old one has been archived
+ assertEquals(1, filsStatuses2.length);
+ assertEquals(false, mobFile2.equalsIgnoreCase(mobFile1));
+
+ //test sequence file
+ String workingPath = configuration.get(SweepJob.WORKING_VISITED_DIR_KEY);
+ FileStatus[] statuses = TEST_UTIL.getTestFileSystem().listStatus(new Path(workingPath));
+ Set<String> files = new TreeSet<String>();
+ for (FileStatus st : statuses) {
+ files.addAll(getKeyFromSequenceFile(TEST_UTIL.getTestFileSystem(),
+ st.getPath(), configuration));
+ }
+ assertEquals(1, files.size());
+ assertEquals(true, files.contains(mobFile1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
index c4817aa,0000000..1689c2a
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
@@@ -1,307 -1,0 +1,307 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
- import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
++import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestMobSweeper {
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private String tableName;
+ private final static String row = "row_";
+ private final static String family = "family";
+ private final static String column = "column";
+ private static HTable table;
+ private static Admin admin;
+
+ private Random random = new Random();
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+ TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 15); // avoid major compactions
+ TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.max", 30); // avoid major compactions
+ TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+
+ TEST_UTIL.startMiniCluster();
+
+ TEST_UTIL.startMiniMapReduceCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ TEST_UTIL.shutdownMiniMapReduceCluster();
+ }
+
+ @SuppressWarnings("deprecation")
+ @Before
+ public void setUp() throws Exception {
+ long tid = System.currentTimeMillis();
+ tableName = "testSweeper" + tid;
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ hcd.setMobEnabled(true);
+ hcd.setMobThreshold(3L);
+ hcd.setMaxVersions(4);
+ desc.addFamily(hcd);
+
+ admin = TEST_UTIL.getHBaseAdmin();
+ admin.createTable(desc);
+ table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+ table.setAutoFlush(false);
+
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ admin.disableTable(TableName.valueOf(tableName));
+ admin.deleteTable(TableName.valueOf(tableName));
+ admin.close();
+ }
+
+ private Path getMobFamilyPath(Configuration conf, String tableNameStr,
+ String familyName) {
+ Path p = new Path(MobUtils.getMobRegionPath(conf, TableName.valueOf(tableNameStr)),
+ familyName);
+ return p;
+ }
+
+
+ private String mergeString(Set<String> set) {
+ StringBuilder sb = new StringBuilder();
+ for (String s : set)
+ sb.append(s);
+ return sb.toString();
+ }
+
+
+ private void generateMobTable(int count, int flushStep)
+ throws IOException, InterruptedException {
+ if (count <= 0 || flushStep <= 0)
+ return;
+ int index = 0;
+ for (int i = 0; i < count; i++) {
+ byte[] mobVal = new byte[101*1024];
+ random.nextBytes(mobVal);
+
+ Put put = new Put(Bytes.toBytes(row + i));
+ put.add(Bytes.toBytes(family), Bytes.toBytes(column), mobVal);
+ table.put(put);
+ if (index++ % flushStep == 0) {
+ table.flushCommits();
+ admin.flush(TableName.valueOf(tableName));
+ }
+
+
+ }
+ table.flushCommits();
+ admin.flush(TableName.valueOf(tableName));
+ }
+
+ @Test
+ public void testSweeper() throws Exception {
+
+ int count = 10;
+ //create table and generate 10 mob files
+ generateMobTable(count, 1);
+
+ //get mob files
+ Path mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);
+ FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+ // mobFileSet0 stores the orignal mob files
+ TreeSet<String> mobFilesSet = new TreeSet<String>();
+ for (FileStatus status : fileStatuses) {
+ mobFilesSet.add(status.getPath().getName());
+ }
+
+ //scan the table, retreive the references
+ Scan scan = new Scan();
+ scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+ scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
+ ResultScanner rs = table.getScanner(scan);
+ TreeSet<String> mobFilesScanned = new TreeSet<String>();
+ for (Result res : rs) {
+ byte[] valueBytes = res.getValue(Bytes.toBytes(family),
+ Bytes.toBytes(column));
+ mobFilesScanned.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT,
+ valueBytes.length - Bytes.SIZEOF_INT));
+ }
+
+ //there should be 10 mob files
+ assertEquals(10, mobFilesScanned.size());
+ //check if we store the correct reference of mob files
+ assertEquals(mergeString(mobFilesSet), mergeString(mobFilesScanned));
+
+
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setLong(SweepJob.MOB_SWEEP_JOB_DELAY, 24 * 60 * 60 * 1000);
+
+ String[] args = new String[2];
+ args[0] = tableName;
+ args[1] = family;
+ ToolRunner.run(conf, new Sweeper(), args);
+
+
+ mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);
+ fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+ mobFilesSet = new TreeSet<String>();
+ for (FileStatus status : fileStatuses) {
+ mobFilesSet.add(status.getPath().getName());
+ }
+
+ assertEquals(10, mobFilesSet.size());
+
+
+ scan = new Scan();
+ scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+ scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
+ rs = table.getScanner(scan);
+ TreeSet<String> mobFilesScannedAfterJob = new TreeSet<String>();
+ for (Result res : rs) {
+ byte[] valueBytes = res.getValue(Bytes.toBytes(family), Bytes.toBytes(
+ column));
+ mobFilesScannedAfterJob.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT,
+ valueBytes.length - Bytes.SIZEOF_INT));
+ }
+
+ assertEquals(10, mobFilesScannedAfterJob.size());
+
+ fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+ mobFilesSet = new TreeSet<String>();
+ for (FileStatus status : fileStatuses) {
+ mobFilesSet.add(status.getPath().getName());
+ }
+
+ assertEquals(10, mobFilesSet.size());
+ assertEquals(true, mobFilesScannedAfterJob.iterator().next()
+ .equalsIgnoreCase(mobFilesSet.iterator().next()));
+
+ }
+
+ @Test
+ public void testCompactionDelaySweeper() throws Exception {
+
+ int count = 10;
+ //create table and generate 10 mob files
+ generateMobTable(count, 1);
+
+ //get mob files
+ Path mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);
+ FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+ // mobFileSet0 stores the orignal mob files
+ TreeSet<String> mobFilesSet = new TreeSet<String>();
+ for (FileStatus status : fileStatuses) {
+ mobFilesSet.add(status.getPath().getName());
+ }
+
+ //scan the table, retreive the references
+ Scan scan = new Scan();
+ scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+ scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
+ ResultScanner rs = table.getScanner(scan);
+ TreeSet<String> mobFilesScanned = new TreeSet<String>();
+ for (Result res : rs) {
+ byte[] valueBytes = res.getValue(Bytes.toBytes(family),
+ Bytes.toBytes(column));
+ mobFilesScanned.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT,
+ valueBytes.length - Bytes.SIZEOF_INT));
+ }
+
+ //there should be 10 mob files
+ assertEquals(10, mobFilesScanned.size());
+ //check if we store the correct reference of mob files
+ assertEquals(mergeString(mobFilesSet), mergeString(mobFilesScanned));
+
+
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setLong(SweepJob.MOB_SWEEP_JOB_DELAY, 0);
+
+ String[] args = new String[2];
+ args[0] = tableName;
+ args[1] = family;
+ ToolRunner.run(conf, new Sweeper(), args);
+
+
+ mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);
+ fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+ mobFilesSet = new TreeSet<String>();
+ for (FileStatus status : fileStatuses) {
+ mobFilesSet.add(status.getPath().getName());
+ }
+
+ assertEquals(1, mobFilesSet.size());
+
+
+ scan = new Scan();
+ scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+ scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
+ rs = table.getScanner(scan);
+ TreeSet<String> mobFilesScannedAfterJob = new TreeSet<String>();
+ for (Result res : rs) {
+ byte[] valueBytes = res.getValue(Bytes.toBytes(family), Bytes.toBytes(
+ column));
+ mobFilesScannedAfterJob.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT,
+ valueBytes.length - Bytes.SIZEOF_INT));
+ }
+
+ assertEquals(1, mobFilesScannedAfterJob.size());
+
+ fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+ mobFilesSet = new TreeSet<String>();
+ for (FileStatus status : fileStatuses) {
+ mobFilesSet.add(status.getPath().getName());
+ }
+
+ assertEquals(1, mobFilesSet.size());
+ assertEquals(true, mobFilesScannedAfterJob.iterator().next()
+ .equalsIgnoreCase(mobFilesSet.iterator().next()));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
index c7d146b,852d319..0d28e54
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
@@@ -242,73 -242,22 +242,92 @@@ public class MetricsRegionServerWrapper
}
@Override
+ public long getHedgedReadOps() {
+ return 100;
+ }
+
+ @Override
+ public long getHedgedReadWins() {
+ return 10;
+ }
+
+ @Override
+ public long getBlockedRequestsCount() {
+ return 0;
+ }
+
+ @Override
+ public int getSplitQueueSize() {
+ return 0;
+ }
++
++ @Override
+ public long getMobCompactedIntoMobCellsCount() {
+ return 20;
+ }
+
+ @Override
+ public long getMobCompactedFromMobCellsCount() {
+ return 10;
+ }
+
+ @Override
+ public long getMobCompactedIntoMobCellsSize() {
+ return 200;
+ }
+
+ @Override
+ public long getMobCompactedFromMobCellsSize() {
+ return 100;
+ }
+
+ @Override
+ public long getMobFlushCount() {
+ return 1;
+ }
+
+ @Override
+ public long getMobFlushedCellsCount() {
+ return 10;
+ }
+
+ @Override
+ public long getMobFlushedCellsSize() {
+ return 1000;
+ }
+
+ @Override
+ public long getMobScanCellsCount() {
+ return 10;
+ }
+
+ @Override
+ public long getMobScanCellsSize() {
+ return 1000;
+ }
+
+ @Override
+ public long getMobFileCacheAccessCount() {
+ return 100;
+ }
+
+ @Override
+ public long getMobFileCacheMissCount() {
+ return 50;
+ }
+
+ @Override
+ public long getMobFileCacheEvictedCount() {
+ return 0;
+ }
+
+ @Override
+ public long getMobFileCacheCount() {
+ return 100;
+ }
+
+ @Override
+ public int getMobFileCacheHitPercent() {
+ return 50;
+ }
-
- }
+ }