You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2015/05/01 17:28:32 UTC
[46/50] [abbrv] hbase git commit: Merge branch 'apache/master'
(4/16/15) into hbase-11339
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
index e4cad6f,0000000..ba0b620
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
@@@ -1,827 -1,0 +1,822 @@@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.filecompactions;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
++import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.Admin;
- import org.apache.hadoop.hbase.client.Delete;
- import org.apache.hadoop.hbase.client.Durability;
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.client.Result;
- import org.apache.hadoop.hbase.client.ResultScanner;
- import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestMobFileCompactor {
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private Configuration conf = null;
+ private String tableNameAsString;
+ private TableName tableName;
- private HTable hTable;
++ private static Connection conn;
++ private BufferedMutator bufMut;
++ private Table hTable;
+ private Admin admin;
+ private HTableDescriptor desc;
+ private HColumnDescriptor hcd1;
+ private HColumnDescriptor hcd2;
+ private FileSystem fs;
- private final String family1 = "family1";
- private final String family2 = "family2";
- private final String qf1 = "qualifier1";
- private final String qf2 = "qualifier2";
- private byte[] KEYS = Bytes.toBytes("012");
- private int regionNum = KEYS.length;
- private int delRowNum = 1;
- private int delCellNum = 6;
- private int cellNumPerRow = 3;
- private int rowNumPerFile = 2;
++ private static final String family1 = "family1";
++ private static final String family2 = "family2";
++ private static final String qf1 = "qualifier1";
++ private static final String qf2 = "qualifier2";
++ private static byte[] KEYS = Bytes.toBytes("012");
++ private static int regionNum = KEYS.length;
++ private static int delRowNum = 1;
++ private static int delCellNum = 6;
++ private static int cellNumPerRow = 3;
++ private static int rowNumPerFile = 2;
+ private static ExecutorService pool;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+ TEST_UTIL.getConfiguration().setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, 5000);
+ TEST_UTIL.startMiniCluster(1);
+ pool = createThreadPool(TEST_UTIL.getConfiguration());
++ conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), pool);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ pool.shutdown();
++ conn.close();
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ fs = TEST_UTIL.getTestFileSystem();
+ conf = TEST_UTIL.getConfiguration();
+ long tid = System.currentTimeMillis();
+ tableNameAsString = "testMob" + tid;
+ tableName = TableName.valueOf(tableNameAsString);
+ hcd1 = new HColumnDescriptor(family1);
+ hcd1.setMobEnabled(true);
+ hcd1.setMobThreshold(0L);
+ hcd1.setMaxVersions(4);
+ hcd2 = new HColumnDescriptor(family2);
+ hcd2.setMobEnabled(true);
+ hcd2.setMobThreshold(0L);
+ hcd2.setMaxVersions(4);
+ desc = new HTableDescriptor(tableName);
+ desc.addFamily(hcd1);
+ desc.addFamily(hcd2);
+ admin = TEST_UTIL.getHBaseAdmin();
+ admin.createTable(desc, getSplitKeys());
- hTable = new HTable(conf, tableNameAsString);
- hTable.setAutoFlush(false, false);
++ hTable = conn.getTable(tableName);
++ bufMut = conn.getBufferedMutator(tableName);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ admin.disableTable(tableName);
+ admin.deleteTable(tableName);
+ admin.close();
+ hTable.close();
+ fs.delete(TEST_UTIL.getDataTestDir(), true);
+ }
+
+ @Test
+ public void testCompactionWithoutDelFilesWithNamespace() throws Exception {
+ resetConf();
+ // create a table with namespace
+ NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create("ns").build();
+ String tableNameAsString = "ns:testCompactionWithoutDelFilesWithNamespace";
+ admin.createNamespace(namespaceDescriptor);
+ TableName tableName = TableName.valueOf(tableNameAsString);
+ HColumnDescriptor hcd1 = new HColumnDescriptor(family1);
+ hcd1.setMobEnabled(true);
+ hcd1.setMobThreshold(0L);
+ hcd1.setMaxVersions(4);
+ HColumnDescriptor hcd2 = new HColumnDescriptor(family2);
+ hcd2.setMobEnabled(true);
+ hcd2.setMobThreshold(0L);
+ hcd2.setMaxVersions(4);
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ desc.addFamily(hcd1);
+ desc.addFamily(hcd2);
+ admin.createTable(desc, getSplitKeys());
- HTable table = new HTable(conf, tableName);
- table.setAutoFlush(false, false);
++ BufferedMutator bufMut= conn.getBufferedMutator(tableName);
++ Table table = conn.getTable(tableName);
+
+ int count = 4;
+ // generate mob files
- loadData(admin, table, tableName, count, rowNumPerFile);
++ loadData(admin, bufMut, tableName, count, rowNumPerFile);
+ int rowNumPerRegion = count * rowNumPerFile;
+
+ assertEquals("Before compaction: mob rows count", regionNum * rowNumPerRegion,
+ countMobRows(table));
+ assertEquals("Before compaction: mob file count", regionNum * count,
+ countFiles(tableName, true, family1));
+ assertEquals("Before compaction: del file count", 0, countFiles(tableName, false, family1));
+
+ MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
+ compactor.compact();
+
+ assertEquals("After compaction: mob rows count", regionNum * rowNumPerRegion,
+ countMobRows(table));
+ assertEquals("After compaction: mob file count", regionNum,
+ countFiles(tableName, true, family1));
+ assertEquals("After compaction: del file count", 0, countFiles(tableName, false, family1));
+
+ table.close();
+ admin.disableTable(tableName);
+ admin.deleteTable(tableName);
+ admin.deleteNamespace("ns");
+ }
+
+ @Test
+ public void testCompactionWithoutDelFiles() throws Exception {
+ resetConf();
+ int count = 4;
+ // generate mob files
- loadData(admin, hTable, tableName, count, rowNumPerFile);
++ loadData(admin, bufMut, tableName, count, rowNumPerFile);
+ int rowNumPerRegion = count*rowNumPerFile;
+
+ assertEquals("Before compaction: mob rows count", regionNum*rowNumPerRegion,
+ countMobRows(hTable));
+ assertEquals("Before compaction: mob file count", regionNum * count,
+ countFiles(tableName, true, family1));
+ assertEquals("Before compaction: del file count", 0, countFiles(tableName, false, family1));
+
+ MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
+ compactor.compact();
+
+ assertEquals("After compaction: mob rows count", regionNum*rowNumPerRegion,
+ countMobRows(hTable));
+ assertEquals("After compaction: mob file count", regionNum,
+ countFiles(tableName, true, family1));
+ assertEquals("After compaction: del file count", 0, countFiles(tableName, false, family1));
+ }
+
+ @Test
+ public void testCompactionWithDelFiles() throws Exception {
+ resetConf();
+ int count = 4;
+ // generate mob files
- loadData(admin, hTable, tableName, count, rowNumPerFile);
++ loadData(admin, bufMut, tableName, count, rowNumPerFile);
+ int rowNumPerRegion = count*rowNumPerFile;
+
+ assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion,
+ countMobRows(hTable));
+ assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion,
+ countMobCells(hTable));
+ assertEquals("Before deleting: family1 mob file count", regionNum*count,
+ countFiles(tableName, true, family1));
+ assertEquals("Before deleting: family2 mob file count", regionNum*count,
+ countFiles(tableName, true, family2));
+
+ createDelFile();
+
+ assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+ countMobRows(hTable));
+ assertEquals("Before compaction: mob cells count",
+ regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+ assertEquals("Before compaction: family1 mob file count", regionNum*count,
+ countFiles(tableName, true, family1));
+ assertEquals("Before compaction: family2 file count", regionNum*count,
+ countFiles(tableName, true, family2));
+ assertEquals("Before compaction: family1 del file count", regionNum,
+ countFiles(tableName, false, family1));
+ assertEquals("Before compaction: family2 del file count", regionNum,
+ countFiles(tableName, false, family2));
+
+ // do the mob file compaction
+ MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
+ compactor.compact();
+
+ assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+ countMobRows(hTable));
+ assertEquals("After compaction: mob cells count",
+ regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+ assertEquals("After compaction: family1 mob file count", regionNum,
+ countFiles(tableName, true, family1));
+ assertEquals("After compaction: family2 mob file count", regionNum*count,
+ countFiles(tableName, true, family2));
+ assertEquals("After compaction: family1 del file count", 0,
+ countFiles(tableName, false, family1));
+ assertEquals("After compaction: family2 del file count", regionNum,
+ countFiles(tableName, false, family2));
+ assertRefFileNameEqual(family1);
+ }
+
+ @Test
+ public void testCompactionWithDelFilesAndNotMergeAllFiles() throws Exception {
+ resetConf();
+ int mergeSize = 5000;
+ // change the mob compaction merge size
+ conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
+
+ int count = 4;
+ // generate mob files
- loadData(admin, hTable, tableName, count, rowNumPerFile);
++ loadData(admin, bufMut, tableName, count, rowNumPerFile);
+ int rowNumPerRegion = count*rowNumPerFile;
+
+ assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion,
+ countMobRows(hTable));
+ assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion,
+ countMobCells(hTable));
+ assertEquals("Before deleting: mob file count", regionNum * count,
+ countFiles(tableName, true, family1));
+
+ int largeFilesCount = countLargeFiles(mergeSize, family1);
+ createDelFile();
+
+ assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+ countMobRows(hTable));
+ assertEquals("Before compaction: mob cells count",
+ regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+ assertEquals("Before compaction: family1 mob file count", regionNum*count,
+ countFiles(tableName, true, family1));
+ assertEquals("Before compaction: family2 mob file count", regionNum*count,
+ countFiles(tableName, true, family2));
+ assertEquals("Before compaction: family1 del file count", regionNum,
+ countFiles(tableName, false, family1));
+ assertEquals("Before compaction: family2 del file count", regionNum,
+ countFiles(tableName, false, family2));
+
+ // do the mob file compaction
+ MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
+ compactor.compact();
+
+ assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+ countMobRows(hTable));
+ assertEquals("After compaction: mob cells count",
+ regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+ // After the compaction, the files smaller than the mob compaction merge size
+ // is merge to one file
+ assertEquals("After compaction: family1 mob file count", largeFilesCount + regionNum,
+ countFiles(tableName, true, family1));
+ assertEquals("After compaction: family2 mob file count", regionNum*count,
+ countFiles(tableName, true, family2));
+ assertEquals("After compaction: family1 del file count", regionNum,
+ countFiles(tableName, false, family1));
+ assertEquals("After compaction: family2 del file count", regionNum,
+ countFiles(tableName, false, family2));
+ }
+
+ @Test
+ public void testCompactionWithDelFilesAndWithSmallCompactionBatchSize() throws Exception {
+ resetConf();
+ int batchSize = 2;
+ conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, batchSize);
+ int count = 4;
+ // generate mob files
- loadData(admin, hTable, tableName, count, rowNumPerFile);
++ loadData(admin, bufMut, tableName, count, rowNumPerFile);
+ int rowNumPerRegion = count*rowNumPerFile;
+
+ assertEquals("Before deleting: mob row count", regionNum*rowNumPerRegion,
+ countMobRows(hTable));
+ assertEquals("Before deleting: family1 mob file count", regionNum*count,
+ countFiles(tableName, true, family1));
+ assertEquals("Before deleting: family2 mob file count", regionNum*count,
+ countFiles(tableName, true, family2));
+
+ createDelFile();
+
+ assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+ countMobRows(hTable));
+ assertEquals("Before compaction: mob cells count",
+ regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+ assertEquals("Before compaction: family1 mob file count", regionNum*count,
+ countFiles(tableName, true, family1));
+ assertEquals("Before compaction: family2 mob file count", regionNum*count,
+ countFiles(tableName, true, family2));
+ assertEquals("Before compaction: family1 del file count", regionNum,
+ countFiles(tableName, false, family1));
+ assertEquals("Before compaction: family2 del file count", regionNum,
+ countFiles(tableName, false, family2));
+
+ // do the mob file compaction
+ MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
+ compactor.compact();
+
+ assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+ countMobRows(hTable));
+ assertEquals("After compaction: mob cells count",
+ regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+ assertEquals("After compaction: family1 mob file count", regionNum*(count/batchSize),
+ countFiles(tableName, true, family1));
+ assertEquals("After compaction: family2 mob file count", regionNum*count,
+ countFiles(tableName, true, family2));
+ assertEquals("After compaction: family1 del file count", 0,
+ countFiles(tableName, false, family1));
+ assertEquals("After compaction: family2 del file count", regionNum,
+ countFiles(tableName, false, family2));
+ }
+
+ @Test
+ public void testCompactionWithHFileLink() throws IOException, InterruptedException {
+ resetConf();
+ int count = 4;
+ // generate mob files
- loadData(admin, hTable, tableName, count, rowNumPerFile);
++ loadData(admin, bufMut, tableName, count, rowNumPerFile);
+ int rowNumPerRegion = count*rowNumPerFile;
+
+ long tid = System.currentTimeMillis();
+ byte[] snapshotName1 = Bytes.toBytes("snaptb-" + tid);
+ // take a snapshot
+ admin.snapshot(snapshotName1, tableName);
+
+ createDelFile();
+
+ assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+ countMobRows(hTable));
+ assertEquals("Before compaction: mob cells count",
+ regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+ assertEquals("Before compaction: family1 mob file count", regionNum*count,
+ countFiles(tableName, true, family1));
+ assertEquals("Before compaction: family2 mob file count", regionNum*count,
+ countFiles(tableName, true, family2));
+ assertEquals("Before compaction: family1 del file count", regionNum,
+ countFiles(tableName, false, family1));
+ assertEquals("Before compaction: family2 del file count", regionNum,
+ countFiles(tableName, false, family2));
+
+ // do the mob file compaction
+ MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
+ compactor.compact();
+
+ assertEquals("After first compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+ countMobRows(hTable));
+ assertEquals("After first compaction: mob cells count",
+ regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+ assertEquals("After first compaction: family1 mob file count", regionNum,
+ countFiles(tableName, true, family1));
+ assertEquals("After first compaction: family2 mob file count", regionNum*count,
+ countFiles(tableName, true, family2));
+ assertEquals("After first compaction: family1 del file count", 0,
+ countFiles(tableName, false, family1));
+ assertEquals("After first compaction: family2 del file count", regionNum,
+ countFiles(tableName, false, family2));
+ assertEquals("After first compaction: family1 hfilelink count", 0, countHFileLinks(family1));
+ assertEquals("After first compaction: family2 hfilelink count", 0, countHFileLinks(family2));
+
+ admin.disableTable(tableName);
+ // Restore from snapshot, the hfilelink will exist in mob dir
+ admin.restoreSnapshot(snapshotName1);
+ admin.enableTable(tableName);
+
+ assertEquals("After restoring snapshot: mob rows count", regionNum*rowNumPerRegion,
+ countMobRows(hTable));
+ assertEquals("After restoring snapshot: mob cells count",
+ regionNum*cellNumPerRow*rowNumPerRegion, countMobCells(hTable));
+ assertEquals("After restoring snapshot: family1 mob file count", regionNum*count,
+ countFiles(tableName, true, family1));
+ assertEquals("After restoring snapshot: family2 mob file count", regionNum*count,
+ countFiles(tableName, true, family2));
+ assertEquals("After restoring snapshot: family1 del file count", 0,
+ countFiles(tableName, false, family1));
+ assertEquals("After restoring snapshot: family2 del file count", 0,
+ countFiles(tableName, false, family2));
+ assertEquals("After restoring snapshot: family1 hfilelink count", regionNum*count,
+ countHFileLinks(family1));
+ assertEquals("After restoring snapshot: family2 hfilelink count", 0,
+ countHFileLinks(family2));
+
+ compactor.compact();
+
+ assertEquals("After second compaction: mob rows count", regionNum*rowNumPerRegion,
+ countMobRows(hTable));
+ assertEquals("After second compaction: mob cells count",
+ regionNum*cellNumPerRow*rowNumPerRegion, countMobCells(hTable));
+ assertEquals("After second compaction: family1 mob file count", regionNum,
+ countFiles(tableName, true, family1));
+ assertEquals("After second compaction: family2 mob file count", regionNum*count,
+ countFiles(tableName, true, family2));
+ assertEquals("After second compaction: family1 del file count", 0,
+ countFiles(tableName, false, family1));
+ assertEquals("After second compaction: family2 del file count", 0,
+ countFiles(tableName, false, family2));
+ assertEquals("After second compaction: family1 hfilelink count", 0, countHFileLinks(family1));
+ assertEquals("After second compaction: family2 hfilelink count", 0, countHFileLinks(family2));
+ assertRefFileNameEqual(family1);
+ }
+
+ @Test
+ public void testCompactionFromAdmin() throws Exception {
+ int count = 4;
+ // generate mob files
- loadData(admin, hTable, tableName, count, rowNumPerFile);
++ loadData(admin, bufMut, tableName, count, rowNumPerFile);
+ int rowNumPerRegion = count*rowNumPerFile;
+
+ assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion,
+ countMobRows(hTable));
+ assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion,
+ countMobCells(hTable));
+ assertEquals("Before deleting: family1 mob file count", regionNum*count,
+ countFiles(tableName, true, family1));
+ assertEquals("Before deleting: family2 mob file count", regionNum*count,
+ countFiles(tableName, true, family2));
+
+ createDelFile();
+
+ assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+ countMobRows(hTable));
+ assertEquals("Before compaction: mob cells count",
+ regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+ assertEquals("Before compaction: family1 mob file count", regionNum*count,
+ countFiles(tableName, true, family1));
+ assertEquals("Before compaction: family2 file count", regionNum*count,
+ countFiles(tableName, true, family2));
+ assertEquals("Before compaction: family1 del file count", regionNum,
+ countFiles(tableName, false, family1));
+ assertEquals("Before compaction: family2 del file count", regionNum,
+ countFiles(tableName, false, family2));
+
+ int largeFilesCount = countLargeFiles(5000, family1);
+ // do the mob file compaction
+ admin.compactMob(tableName, hcd1.getName());
+
+ waitUntilCompactionFinished(tableName);
+ assertEquals("After compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum),
+ countMobRows(hTable));
+ assertEquals("After compaction: mob cells count", regionNum
+ * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(hTable));
+ assertEquals("After compaction: family1 mob file count", regionNum + largeFilesCount,
+ countFiles(tableName, true, family1));
+ assertEquals("After compaction: family2 mob file count", regionNum * count,
+ countFiles(tableName, true, family2));
+ assertEquals("After compaction: family1 del file count", regionNum,
+ countFiles(tableName, false, family1));
+ assertEquals("After compaction: family2 del file count", regionNum,
+ countFiles(tableName, false, family2));
+ assertRefFileNameEqual(family1);
+ }
+
+ @Test
+ public void testMajorCompactionFromAdmin() throws Exception {
+ int count = 4;
+ // generate mob files
- loadData(admin, hTable, tableName, count, rowNumPerFile);
++ loadData(admin, bufMut, tableName, count, rowNumPerFile);
+ int rowNumPerRegion = count*rowNumPerFile;
+
+ assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion,
+ countMobRows(hTable));
+ assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion,
+ countMobCells(hTable));
+ assertEquals("Before deleting: mob file count", regionNum*count,
+ countFiles(tableName, true, family1));
+
+ createDelFile();
+
+ assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+ countMobRows(hTable));
+ assertEquals("Before compaction: mob cells count",
+ regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+ assertEquals("Before compaction: family1 mob file count", regionNum*count,
+ countFiles(tableName, true, family1));
+ assertEquals("Before compaction: family2 mob file count", regionNum*count,
+ countFiles(tableName, true, family2));
+ assertEquals("Before compaction: family1 del file count", regionNum,
+ countFiles(tableName, false, family1));
+ assertEquals("Before compaction: family2 del file count", regionNum,
+ countFiles(tableName, false, family2));
+
+ // do the major mob file compaction, it will force all files to compaction
+ admin.majorCompactMob(tableName, hcd1.getName());
+
+ waitUntilCompactionFinished(tableName);
+ assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+ countMobRows(hTable));
+ assertEquals("After compaction: mob cells count",
+ regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+ assertEquals("After compaction: family1 mob file count", regionNum,
+ countFiles(tableName, true, family1));
+ assertEquals("After compaction: family2 mob file count", regionNum*count,
+ countFiles(tableName, true, family2));
+ assertEquals("After compaction: family1 del file count", 0,
+ countFiles(tableName, false, family1));
+ assertEquals("After compaction: family2 del file count", regionNum,
+ countFiles(tableName, false, family2));
+ }
+
+ private void waitUntilCompactionFinished(TableName tableName) throws IOException,
+ InterruptedException {
+ long finished = EnvironmentEdgeManager.currentTime() + 60000;
+ CompactionState state = admin.getMobCompactionState(tableName);
+ while (EnvironmentEdgeManager.currentTime() < finished) {
+ if (state == CompactionState.NONE) {
+ break;
+ }
+ state = admin.getMobCompactionState(tableName);
+ Thread.sleep(10);
+ }
+ assertEquals(CompactionState.NONE, state);
+ }
+
+ /**
+ * Gets the number of rows in the given table.
+ * @param table to get the scanner
+ * @return the number of rows
+ */
- private int countMobRows(final HTable table) throws IOException {
++ private int countMobRows(final Table table) throws IOException {
+ Scan scan = new Scan();
+ // Do not retrieve the mob data when scanning
+ scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+ ResultScanner results = table.getScanner(scan);
+ int count = 0;
+ for (Result res : results) {
+ count++;
+ }
+ results.close();
+ return count;
+ }
+
+ /**
+ * Gets the number of cells in the given table.
+ * @param table to get the scanner
+ * @return the number of cells
+ */
- private int countMobCells(final HTable table) throws IOException {
++ private int countMobCells(final Table table) throws IOException {
+ Scan scan = new Scan();
+ // Do not retrieve the mob data when scanning
+ scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+ ResultScanner results = table.getScanner(scan);
+ int count = 0;
+ for (Result res : results) {
+ for (Cell cell : res.listCells()) {
+ count++;
+ }
+ }
+ results.close();
+ return count;
+ }
+
+ /**
+ * Gets the number of files in the mob path.
+ * @param isMobFile gets number of the mob files or del files
+ * @param familyName the family name
+ * @return the number of the files
+ */
+ private int countFiles(TableName tableName, boolean isMobFile, String familyName)
+ throws IOException {
+ Path mobDirPath = MobUtils.getMobFamilyPath(
+ MobUtils.getMobRegionPath(conf, tableName), familyName);
+ int count = 0;
+ if (fs.exists(mobDirPath)) {
+ FileStatus[] files = fs.listStatus(mobDirPath);
+ for (FileStatus file : files) {
+ if (isMobFile == true) {
+ if (!StoreFileInfo.isDelFile(file.getPath())) {
+ count++;
+ }
+ } else {
+ if (StoreFileInfo.isDelFile(file.getPath())) {
+ count++;
+ }
+ }
+ }
+ }
+ return count;
+ }
+
+ /**
+ * Gets the number of HFileLink in the mob path.
+ * @param familyName the family name
+ * @return the number of the HFileLink
+ */
+ private int countHFileLinks(String familyName) throws IOException {
+ Path mobDirPath = MobUtils.getMobFamilyPath(
+ MobUtils.getMobRegionPath(conf, tableName), familyName);
+ int count = 0;
+ if (fs.exists(mobDirPath)) {
+ FileStatus[] files = fs.listStatus(mobDirPath);
+ for (FileStatus file : files) {
+ if (HFileLink.isHFileLink(file.getPath())) {
+ count++;
+ }
+ }
+ }
+ return count;
+ }
+
+ /**
+ * Gets the number of files.
+ * @param size the size of the file
+ * @param familyName the family name
+ * @return the number of files large than the size
+ */
+ private int countLargeFiles(int size, String familyName) throws IOException {
+ Path mobDirPath = MobUtils.getMobFamilyPath(
+ MobUtils.getMobRegionPath(conf, tableName), familyName);
+ int count = 0;
+ if (fs.exists(mobDirPath)) {
+ FileStatus[] files = fs.listStatus(mobDirPath);
+ for (FileStatus file : files) {
+ // ignore the del files in the mob path
+ if ((!StoreFileInfo.isDelFile(file.getPath()))
+ && (file.getLen() > size)) {
+ count++;
+ }
+ }
+ }
+ return count;
+ }
+
+ /**
+ * loads some data to the table.
- * @param count the mob file number
+ */
- private void loadData(Admin admin, HTable table, TableName tableName, int fileNum,
++ private void loadData(Admin admin, BufferedMutator table, TableName tableName, int fileNum,
+ int rowNumPerFile) throws IOException, InterruptedException {
+ if (fileNum <= 0) {
+ throw new IllegalArgumentException();
+ }
+ for (byte k0 : KEYS) {
+ byte[] k = new byte[] { k0 };
+ for (int i = 0; i < fileNum * rowNumPerFile; i++) {
+ byte[] key = Bytes.add(k, Bytes.toBytes(i));
+ byte[] mobVal = makeDummyData(10 * (i + 1));
+ Put put = new Put(key);
+ put.setDurability(Durability.SKIP_WAL);
- put.add(Bytes.toBytes(family1), Bytes.toBytes(qf1), mobVal);
- put.add(Bytes.toBytes(family1), Bytes.toBytes(qf2), mobVal);
- put.add(Bytes.toBytes(family2), Bytes.toBytes(qf1), mobVal);
- table.put(put);
++ put.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), mobVal);
++ put.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf2), mobVal);
++ put.addColumn(Bytes.toBytes(family2), Bytes.toBytes(qf1), mobVal);
++ table.mutate(put);
+ if ((i + 1) % rowNumPerFile == 0) {
- table.flushCommits();
++ table.flush();
+ admin.flush(tableName);
+ }
+ }
+ }
+ }
+
+ /**
+ * delete the row, family and cell to create the del file
+ */
+ private void createDelFile() throws IOException, InterruptedException {
+ for (byte k0 : KEYS) {
+ byte[] k = new byte[] { k0 };
+ // delete a family
+ byte[] key1 = Bytes.add(k, Bytes.toBytes(0));
+ Delete delete1 = new Delete(key1);
- delete1.deleteFamily(Bytes.toBytes(family1));
++ delete1.addFamily(Bytes.toBytes(family1));
+ hTable.delete(delete1);
+ // delete one row
+ byte[] key2 = Bytes.add(k, Bytes.toBytes(2));
+ Delete delete2 = new Delete(key2);
+ hTable.delete(delete2);
+ // delete one cell
+ byte[] key3 = Bytes.add(k, Bytes.toBytes(4));
+ Delete delete3 = new Delete(key3);
- delete3.deleteColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1));
++ delete3.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1));
+ hTable.delete(delete3);
- hTable.flushCommits();
+ admin.flush(tableName);
+ List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(
+ Bytes.toBytes(tableNameAsString));
+ for (HRegion region : regions) {
+ region.waitForFlushesAndCompactions();
- region.compactStores(true);
++ region.compact(true);
+ }
+ }
+ }
+ /**
+ * Creates the dummy data with a specific size.
- * @param the size of data
++ * @param size the size of value
+ * @return the dummy data
+ */
+ private byte[] makeDummyData(int size) {
+ byte[] dummyData = new byte[size];
+ new Random().nextBytes(dummyData);
+ return dummyData;
+ }
+
+ /**
+ * Gets the split keys
+ */
+ private byte[][] getSplitKeys() {
+ byte[][] splitKeys = new byte[KEYS.length - 1][];
+ for (int i = 0; i < splitKeys.length; ++i) {
+ splitKeys[i] = new byte[] { KEYS[i + 1] };
+ }
+ return splitKeys;
+ }
+
+ private static ExecutorService createThreadPool(Configuration conf) {
+ int maxThreads = 10;
+ long keepAliveTime = 60;
+ final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
+ ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads,
+ keepAliveTime, TimeUnit.SECONDS, queue,
+ Threads.newDaemonThreadFactory("MobFileCompactionChore"),
+ new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ try {
+ // waiting for a thread to pick up instead of throwing exceptions.
+ queue.put(r);
+ } catch (InterruptedException e) {
+ throw new RejectedExecutionException(e);
+ }
+ }
+ });
+ ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
+ return pool;
+ }
+
+ private void assertRefFileNameEqual(String familyName) throws IOException {
+ Scan scan = new Scan();
+ scan.addFamily(Bytes.toBytes(familyName));
+ // Do not retrieve the mob data when scanning
+ scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+ ResultScanner results = hTable.getScanner(scan);
+ Path mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(),
+ tableName), familyName);
+ List<Path> actualFilePaths = new ArrayList<>();
+ List<Path> expectFilePaths = new ArrayList<>();
+ for (Result res : results) {
+ for (Cell cell : res.listCells()) {
+ byte[] referenceValue = CellUtil.cloneValue(cell);
+ String fileName = Bytes.toString(referenceValue, Bytes.SIZEOF_INT,
+ referenceValue.length - Bytes.SIZEOF_INT);
+ Path targetPath = new Path(mobFamilyPath, fileName);
+ if(!actualFilePaths.contains(targetPath)) {
+ actualFilePaths.add(targetPath);
+ }
+ }
+ }
+ results.close();
+ if (fs.exists(mobFamilyPath)) {
+ FileStatus[] files = fs.listStatus(mobFamilyPath);
+ for (FileStatus file : files) {
+ if (!StoreFileInfo.isDelFile(file.getPath())) {
+ expectFilePaths.add(file.getPath());
+ }
+ }
+ }
+ Collections.sort(actualFilePaths);
+ Collections.sort(expectFilePaths);
+ assertEquals(expectFilePaths, actualFilePaths);
+ }
+
+ /**
+ * Resets the configuration.
+ */
+ private void resetConf() {
+ conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD,
+ MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD);
+ conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE,
+ MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java
index 3c73d52,0000000..ed3853e
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java
@@@ -1,446 -1,0 +1,441 @@@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.filecompactions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
++import org.apache.hadoop.hbase.regionserver.*;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobFileName;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactionRequest.CompactionType;
+import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition;
- import org.apache.hadoop.hbase.regionserver.BloomType;
- import org.apache.hadoop.hbase.regionserver.HStore;
- import org.apache.hadoop.hbase.regionserver.ScanInfo;
- import org.apache.hadoop.hbase.regionserver.ScanType;
- import org.apache.hadoop.hbase.regionserver.StoreFile;
- import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
- import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestPartitionedMobFileCompactor {
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private final static String family = "family";
+ private final static String qf = "qf";
+ private HColumnDescriptor hcd = new HColumnDescriptor(family);
+ private Configuration conf = TEST_UTIL.getConfiguration();
+ private CacheConfig cacheConf = new CacheConfig(conf);
+ private FileSystem fs;
+ private List<FileStatus> mobFiles = new ArrayList<>();
+ private List<FileStatus> delFiles = new ArrayList<>();
+ private List<FileStatus> allFiles = new ArrayList<>();
+ private Path basePath;
+ private String mobSuffix;
+ private String delSuffix;
+ private static ExecutorService pool;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+ TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+ TEST_UTIL.startMiniCluster(1);
- pool = createThreadPool(TEST_UTIL.getConfiguration());
++ pool = createThreadPool();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ pool.shutdown();
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ private void init(String tableName) throws Exception {
+ fs = FileSystem.get(conf);
+ Path testDir = FSUtils.getRootDir(conf);
+ Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
+ basePath = new Path(new Path(mobTestDir, tableName), family);
+ mobSuffix = UUID.randomUUID().toString().replaceAll("-", "");
+ delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del";
+ }
+
+ @Test
+ public void testCompactionSelectWithAllFiles() throws Exception {
+ resetConf();
+ String tableName = "testCompactionSelectWithAllFiles";
+ init(tableName);
+ int count = 10;
+ // create 10 mob files.
+ createStoreFiles(basePath, family, qf, count, Type.Put);
+ // create 10 del files
+ createStoreFiles(basePath, family, qf, count, Type.Delete);
+ listFiles();
+ long mergeSize = MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD;
+ List<String> expectedStartKeys = new ArrayList<>();
+ for(FileStatus file : mobFiles) {
+ if(file.getLen() < mergeSize) {
+ String fileName = file.getPath().getName();
+ String startKey = fileName.substring(0, 32);
+ expectedStartKeys.add(startKey);
+ }
+ }
+ testSelectFiles(tableName, CompactionType.ALL_FILES, false, expectedStartKeys);
+ }
+
+ @Test
+ public void testCompactionSelectWithPartFiles() throws Exception {
+ resetConf();
+ String tableName = "testCompactionSelectWithPartFiles";
+ init(tableName);
+ int count = 10;
+ // create 10 mob files.
+ createStoreFiles(basePath, family, qf, count, Type.Put);
+ // create 10 del files
+ createStoreFiles(basePath, family, qf, count, Type.Delete);
+ listFiles();
+ long mergeSize = 4000;
+ List<String> expectedStartKeys = new ArrayList<>();
+ for(FileStatus file : mobFiles) {
+ if(file.getLen() < 4000) {
+ String fileName = file.getPath().getName();
+ String startKey = fileName.substring(0, 32);
+ expectedStartKeys.add(startKey);
+ }
+ }
+ // set the mob file compaction mergeable threshold
+ conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
+ testSelectFiles(tableName, CompactionType.PART_FILES, false, expectedStartKeys);
+ }
+
+ @Test
+ public void testCompactionSelectWithForceAllFiles() throws Exception {
+ resetConf();
+ String tableName = "testCompactionSelectWithForceAllFiles";
+ init(tableName);
+ int count = 10;
+ // create 10 mob files.
+ createStoreFiles(basePath, family, qf, count, Type.Put);
+ // create 10 del files
+ createStoreFiles(basePath, family, qf, count, Type.Delete);
+ listFiles();
+ long mergeSize = 4000;
+ List<String> expectedStartKeys = new ArrayList<>();
+ for(FileStatus file : mobFiles) {
+ String fileName = file.getPath().getName();
+ String startKey = fileName.substring(0, 32);
+ expectedStartKeys.add(startKey);
+ }
+ // set the mob file compaction mergeable threshold
+ conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
+ testSelectFiles(tableName, CompactionType.ALL_FILES, true, expectedStartKeys);
+ }
+
+ @Test
+ public void testCompactDelFilesWithDefaultBatchSize() throws Exception {
+ resetConf();
+ String tableName = "testCompactDelFilesWithDefaultBatchSize";
+ init(tableName);
+ // create 20 mob files.
+ createStoreFiles(basePath, family, qf, 20, Type.Put);
+ // create 13 del files
+ createStoreFiles(basePath, family, qf, 13, Type.Delete);
+ listFiles();
+ testCompactDelFiles(tableName, 1, 13, false);
+ }
+
+ @Test
+ public void testCompactDelFilesWithSmallBatchSize() throws Exception {
+ resetConf();
+ String tableName = "testCompactDelFilesWithSmallBatchSize";
+ init(tableName);
+ // create 20 mob files.
+ createStoreFiles(basePath, family, qf, 20, Type.Put);
+ // create 13 del files
+ createStoreFiles(basePath, family, qf, 13, Type.Delete);
+ listFiles();
+
+ // set the mob file compaction batch size
+ conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 4);
+ testCompactDelFiles(tableName, 1, 13, false);
+ }
+
+ @Test
+ public void testCompactDelFilesChangeMaxDelFileCount() throws Exception {
+ resetConf();
+ String tableName = "testCompactDelFilesWithSmallBatchSize";
+ init(tableName);
+ // create 20 mob files.
+ createStoreFiles(basePath, family, qf, 20, Type.Put);
+ // create 13 del files
+ createStoreFiles(basePath, family, qf, 13, Type.Delete);
+ listFiles();
+
+ // set the max del file count
+ conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, 5);
+ // set the mob file compaction batch size
+ conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 2);
+ testCompactDelFiles(tableName, 4, 13, false);
+ }
+
+ /**
+ * Tests the selectFiles
+ * @param tableName the table name
+ * @param type the expected compaction type
+ * @param expected the expected start keys
+ */
+ private void testSelectFiles(String tableName, final CompactionType type,
+ final boolean isForceAllFiles, final List<String> expected) throws IOException {
+ PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs,
+ TableName.valueOf(tableName), hcd, pool) {
+ @Override
+ public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles)
+ throws IOException {
+ if (files == null || files.isEmpty()) {
+ return null;
+ }
+ PartitionedMobFileCompactionRequest request = select(files, isForceAllFiles);
+ // assert the compaction type
+ Assert.assertEquals(type, request.type);
+ // assert get the right partitions
+ compareCompactedPartitions(expected, request.compactionPartitions);
+ // assert get the right del files
+ compareDelFiles(request.delFiles);
+ return null;
+ }
+ };
+ compactor.compact(allFiles, isForceAllFiles);
+ }
+
+ /**
+ * Tests the compacteDelFile
+ * @param tableName the table name
+ * @param expectedFileCount the expected file count
+ * @param expectedCellCount the expected cell count
+ */
+ private void testCompactDelFiles(String tableName, final int expectedFileCount,
+ final int expectedCellCount, boolean isForceAllFiles) throws IOException {
+ PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs,
+ TableName.valueOf(tableName), hcd, pool) {
+ @Override
+ protected List<Path> performCompaction(PartitionedMobFileCompactionRequest request)
+ throws IOException {
+ List<Path> delFilePaths = new ArrayList<Path>();
+ for (FileStatus delFile : request.delFiles) {
+ delFilePaths.add(delFile.getPath());
+ }
+ List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
+ // assert the del files are merged.
+ Assert.assertEquals(expectedFileCount, newDelPaths.size());
+ Assert.assertEquals(expectedCellCount, countDelCellsInDelFiles(newDelPaths));
+ return null;
+ }
+ };
+ compactor.compact(allFiles, isForceAllFiles);
+ }
+
+ /**
+ * Lists the files in the path
+ */
+ private void listFiles() throws IOException {
+ for (FileStatus file : fs.listStatus(basePath)) {
+ allFiles.add(file);
+ if (file.getPath().getName().endsWith("_del")) {
+ delFiles.add(file);
+ } else {
+ mobFiles.add(file);
+ }
+ }
+ }
+
+ /**
+ * Compares the compacted partitions.
+ * @param partitions the collection of CompactedPartitions
+ */
+ private void compareCompactedPartitions(List<String> expected,
+ Collection<CompactionPartition> partitions) {
+ List<String> actualKeys = new ArrayList<>();
+ for (CompactionPartition partition : partitions) {
+ actualKeys.add(partition.getPartitionId().getStartKey());
+ }
+ Collections.sort(expected);
+ Collections.sort(actualKeys);
+ Assert.assertEquals(expected.size(), actualKeys.size());
+ for (int i = 0; i < expected.size(); i++) {
+ Assert.assertEquals(expected.get(i), actualKeys.get(i));
+ }
+ }
+
+ /**
+ * Compares the del files.
+ * @param allDelFiles all the del files
+ */
+ private void compareDelFiles(Collection<FileStatus> allDelFiles) {
+ int i = 0;
+ for (FileStatus file : allDelFiles) {
+ Assert.assertEquals(delFiles.get(i), file);
+ i++;
+ }
+ }
+
+ /**
+ * Creates store files.
+ * @param basePath the path to create file
+ * @family the family name
+ * @qualifier the column qualifier
+ * @count the store file number
+ * @type the key type
+ */
+ private void createStoreFiles(Path basePath, String family, String qualifier, int count,
+ Type type) throws IOException {
+ HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
+ String startKey = "row_";
+ MobFileName mobFileName = null;
+ for (int i = 0; i < count; i++) {
+ byte[] startRow = Bytes.toBytes(startKey + i) ;
+ if(type.equals(Type.Delete)) {
+ mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
+ new Date()), delSuffix);
+ }
+ if(type.equals(Type.Put)){
+ mobFileName = MobFileName.create(Bytes.toBytes(startKey + i), MobUtils.formatDate(
+ new Date()), mobSuffix);
+ }
+ StoreFile.Writer mobFileWriter = new StoreFile.WriterBuilder(conf, cacheConf, fs)
+ .withFileContext(meta).withFilePath(new Path(basePath, mobFileName.getFileName())).build();
+ writeStoreFile(mobFileWriter, startRow, Bytes.toBytes(family), Bytes.toBytes(qualifier),
+ type, (i+1)*1000);
+ }
+ }
+
+ /**
+ * Writes data to store file.
+ * @param writer the store file writer
+ * @param row the row key
+ * @param family the family name
+ * @param qualifier the column qualifier
+ * @param type the key type
+ * @param size the size of value
+ */
+ private static void writeStoreFile(final StoreFile.Writer writer, byte[]row, byte[] family,
+ byte[] qualifier, Type type, int size) throws IOException {
+ long now = System.currentTimeMillis();
+ try {
+ byte[] dummyData = new byte[size];
+ new Random().nextBytes(dummyData);
+ writer.append(new KeyValue(row, family, qualifier, now, type, dummyData));
+ } finally {
+ writer.close();
+ }
+ }
+
+ /**
+ * Gets the number of del cell in the del files
+ * @param paths the del file paths
+ * @return the cell size
+ */
+ private int countDelCellsInDelFiles(List<Path> paths) throws IOException {
+ List<StoreFile> sfs = new ArrayList<StoreFile>();
+ int size = 0;
+ for(Path path : paths) {
+ StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
+ sfs.add(sf);
+ }
+ List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true,
+ false, null, HConstants.LATEST_TIMESTAMP);
+ Scan scan = new Scan();
+ scan.setMaxVersions(hcd.getMaxVersions());
+ long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
+ long ttl = HStore.determineTTLFromFamily(hcd);
+ ScanInfo scanInfo = new ScanInfo(hcd, ttl, timeToPurgeDeletes, KeyValue.COMPARATOR);
+ StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.COMPACT_RETAIN_DELETES, null,
+ scanners, 0L, HConstants.LATEST_TIMESTAMP);
+ List<Cell> results = new ArrayList<>();
+ boolean hasMore = true;
++
+ while (hasMore) {
+ hasMore = scanner.next(results);
+ size += results.size();
+ results.clear();
+ }
+ scanner.close();
+ return size;
+ }
+
- private static ExecutorService createThreadPool(Configuration conf) {
++ private static ExecutorService createThreadPool() {
+ int maxThreads = 10;
+ long keepAliveTime = 60;
+ final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
+ ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime,
+ TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"),
+ new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ try {
+ // waiting for a thread to pick up instead of throwing exceptions.
+ queue.put(r);
+ } catch (InterruptedException e) {
+ throw new RejectedExecutionException(e);
+ }
+ }
+ });
+ ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
+ return pool;
+ }
+
+ /**
+ * Resets the configuration.
+ */
+ private void resetConf() {
+ conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD,
+ MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD);
+ conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
+ conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE,
+ MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
index 49345e4,0000000..3023849
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
@@@ -1,168 -1,0 +1,169 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
++import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.serializer.JavaSerialization;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestMobSweepJob {
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+ TEST_UTIL.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
+ JavaSerialization.class.getName() + "," + WritableSerialization.class.getName());
+ TEST_UTIL.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ private void writeFileNames(FileSystem fs, Configuration conf, Path path,
+ String[] filesNames) throws IOException {
+ // write the names to a sequence file
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path,
+ String.class, String.class);
+ try {
+ for (String fileName : filesNames) {
+ writer.append(fileName, MobConstants.EMPTY_STRING);
+ }
+ } finally {
+ IOUtils.closeStream(writer);
+ }
+ }
+
+ @Test
+ public void testSweeperJobWithOutUnusedFile() throws Exception {
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ Configuration configuration = new Configuration(
+ TEST_UTIL.getConfiguration());
+ Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+ "/hbase/mobcompaction/SweepJob/working/names/0/visited");
+ Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+ "/hbase/mobcompaction/SweepJob/working/names/0/all");
+ configuration.set(SweepJob.WORKING_VISITED_DIR_KEY,
+ vistiedFileNamesPath.toString());
+ configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY,
+ allFileNamesPath.toString());
+
+ writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1",
+ "2", "3", "4", "5", "6"});
+
+ Path r0 = new Path(vistiedFileNamesPath, "r0");
+ writeFileNames(fs, configuration, r0, new String[] { "1",
+ "2", "3"});
+ Path r1 = new Path(vistiedFileNamesPath, "r1");
+ writeFileNames(fs, configuration, r1, new String[] { "1", "4", "5"});
+ Path r2 = new Path(vistiedFileNamesPath, "r2");
+ writeFileNames(fs, configuration, r2, new String[] { "2", "3", "6"});
+
+ SweepJob sweepJob = new SweepJob(configuration, fs);
+ List<String> toBeArchived = sweepJob.getUnusedFiles(configuration);
+
+ assertEquals(0, toBeArchived.size());
+ }
+
+ @Test
+ public void testSweeperJobWithUnusedFile() throws Exception {
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ Configuration configuration = new Configuration(
+ TEST_UTIL.getConfiguration());
+ Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+ "/hbase/mobcompaction/SweepJob/working/names/1/visited");
+ Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+ "/hbase/mobcompaction/SweepJob/working/names/1/all");
+ configuration.set(SweepJob.WORKING_VISITED_DIR_KEY,
+ vistiedFileNamesPath.toString());
+ configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY,
+ allFileNamesPath.toString());
+
+ writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1",
+ "2", "3", "4", "5", "6"});
+
+ Path r0 = new Path(vistiedFileNamesPath, "r0");
+ writeFileNames(fs, configuration, r0, new String[] { "1",
+ "2", "3"});
+ Path r1 = new Path(vistiedFileNamesPath, "r1");
+ writeFileNames(fs, configuration, r1, new String[] { "1", "5"});
+ Path r2 = new Path(vistiedFileNamesPath, "r2");
+ writeFileNames(fs, configuration, r2, new String[] { "2", "3"});
+
+ SweepJob sweepJob = new SweepJob(configuration, fs);
+ List<String> toBeArchived = sweepJob.getUnusedFiles(configuration);
+
+ assertEquals(2, toBeArchived.size());
- assertEquals(new String[] { "4", "6" }, toBeArchived.toArray(new String[0]));
++ assertArrayEquals(new String[]{"4", "6"}, toBeArchived.toArray(new String[0]));
+ }
+
+ @Test
+ public void testSweeperJobWithRedundantFile() throws Exception {
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ Configuration configuration = new Configuration(
+ TEST_UTIL.getConfiguration());
+ Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+ "/hbase/mobcompaction/SweepJob/working/names/2/visited");
+ Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+ "/hbase/mobcompaction/SweepJob/working/names/2/all");
+ configuration.set(SweepJob.WORKING_VISITED_DIR_KEY,
+ vistiedFileNamesPath.toString());
+ configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY,
+ allFileNamesPath.toString());
+
+ writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1",
+ "2", "3", "4", "5", "6"});
+
+ Path r0 = new Path(vistiedFileNamesPath, "r0");
+ writeFileNames(fs, configuration, r0, new String[] { "1",
+ "2", "3"});
+ Path r1 = new Path(vistiedFileNamesPath, "r1");
+ writeFileNames(fs, configuration, r1, new String[] { "1", "5", "6", "7"});
+ Path r2 = new Path(vistiedFileNamesPath, "r2");
+ writeFileNames(fs, configuration, r2, new String[] { "2", "3", "4"});
+
+ SweepJob sweepJob = new SweepJob(configuration, fs);
+ List<String> toBeArchived = sweepJob.getUnusedFiles(configuration);
+
+ assertEquals(0, toBeArchived.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
index 308b50e,0000000..8c24123
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
@@@ -1,220 -1,0 +1,219 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.Admin;
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.client.Put;
++import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.master.TableLockManager;
+import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.JavaSerialization;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.counters.GenericCounter;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Matchers;
+
+@Category(MediumTests.class)
+public class TestMobSweepReducer {
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private final static String tableName = "testSweepReducer";
+ private final static String row = "row";
+ private final static String family = "family";
+ private final static String qf = "qf";
- private static HTable table;
++ private static BufferedMutator table;
+ private static Admin admin;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+
+ TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+
+ TEST_UTIL.startMiniCluster(1);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @SuppressWarnings("deprecation")
+ @Before
+ public void setUp() throws Exception {
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ hcd.setMobEnabled(true);
+ hcd.setMobThreshold(3L);
+ hcd.setMaxVersions(4);
+ desc.addFamily(hcd);
+
+ admin = TEST_UTIL.getHBaseAdmin();
+ admin.createTable(desc);
- table = new HTable(TEST_UTIL.getConfiguration(), tableName);
++ table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())
++ .getBufferedMutator(TableName.valueOf(tableName));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ admin.disableTable(TableName.valueOf(tableName));
+ admin.deleteTable(TableName.valueOf(tableName));
+ admin.close();
+ }
+
+ private List<String> getKeyFromSequenceFile(FileSystem fs, Path path,
+ Configuration conf) throws Exception {
+ List<String> list = new ArrayList<String>();
+ SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path));
+
+ String next = (String) reader.next((String) null);
+ while (next != null) {
+ list.add(next);
+ next = (String) reader.next((String) null);
+ }
+ reader.close();
+ return list;
+ }
+
+ @Test
+ public void testRun() throws Exception {
+
+ TableName tn = TableName.valueOf(tableName);
+ byte[] mobValueBytes = new byte[100];
+
+ //get the path where mob files lie in
+ Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, family);
+
+ Put put = new Put(Bytes.toBytes(row));
- put.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
++ put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
+ Put put2 = new Put(Bytes.toBytes(row + "ignore"));
- put2.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
- table.put(put);
- table.put(put2);
- table.flushCommits();
++ put2.addColumn(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
++ table.mutate(put);
++ table.mutate(put2);
++ table.flush();
+ admin.flush(tn);
+
+ FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+ //check the generation of a mob file
+ assertEquals(1, fileStatuses.length);
+
+ String mobFile1 = fileStatuses[0].getPath().getName();
+
+ Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
+ configuration.setFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO, 0.6f);
+ configuration.setStrings(TableInputFormat.INPUT_TABLE, tableName);
+ configuration.setStrings(TableInputFormat.SCAN_COLUMN_FAMILY, family);
+ configuration.setStrings(SweepJob.WORKING_VISITED_DIR_KEY, "jobWorkingNamesDir");
+ configuration.setStrings(SweepJob.WORKING_FILES_DIR_KEY, "compactionFileDir");
+ configuration.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
+ JavaSerialization.class.getName());
+ configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, "compactionVisitedDir");
+ configuration.setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE,
+ System.currentTimeMillis() + 24 * 3600 * 1000);
+
+ ZooKeeperWatcher zkw = new ZooKeeperWatcher(configuration, "1", new DummyMobAbortable());
+ TableName lockName = MobUtils.getTableLockName(tn);
+ String znode = ZKUtil.joinZNode(zkw.tableLockZNode, lockName.getNameAsString());
+ configuration.set(SweepJob.SWEEP_JOB_ID, "1");
+ configuration.set(SweepJob.SWEEP_JOB_TABLE_NODE, znode);
+ ServerName serverName = SweepJob.getCurrentServerName(configuration);
+ configuration.set(SweepJob.SWEEP_JOB_SERVERNAME, serverName.toString());
+
+ TableLockManager tableLockManager = TableLockManager.createTableLockManager(configuration, zkw,
+ serverName);
+ TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool");
+ lock.acquire();
+ try {
+ // use the same counter when mocking
+ Counter counter = new GenericCounter();
+ Reducer<Text, KeyValue, Writable, Writable>.Context ctx = mock(Reducer.Context.class);
+ when(ctx.getConfiguration()).thenReturn(configuration);
+ when(ctx.getCounter(Matchers.any(SweepCounter.class))).thenReturn(counter);
+ when(ctx.nextKey()).thenReturn(true).thenReturn(false);
+ when(ctx.getCurrentKey()).thenReturn(new Text(mobFile1));
+
+ byte[] refBytes = Bytes.toBytes(mobFile1);
+ long valueLength = refBytes.length;
+ byte[] newValue = Bytes.add(Bytes.toBytes(valueLength), refBytes);
+ KeyValue kv2 = new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family), Bytes.toBytes(qf), 1,
+ KeyValue.Type.Put, newValue);
+ List<KeyValue> list = new ArrayList<KeyValue>();
+ list.add(kv2);
+
+ when(ctx.getValues()).thenReturn(list);
+
+ SweepReducer reducer = new SweepReducer();
+ reducer.run(ctx);
+ } finally {
+ lock.release();
+ }
+ FileStatus[] filsStatuses2 = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+ String mobFile2 = filsStatuses2[0].getPath().getName();
+ //new mob file is generated, old one has been archived
+ assertEquals(1, filsStatuses2.length);
+ assertEquals(false, mobFile2.equalsIgnoreCase(mobFile1));
+
+ //test sequence file
+ String workingPath = configuration.get(SweepJob.WORKING_VISITED_DIR_KEY);
+ FileStatus[] statuses = TEST_UTIL.getTestFileSystem().listStatus(new Path(workingPath));
+ Set<String> files = new TreeSet<String>();
+ for (FileStatus st : statuses) {
+ files.addAll(getKeyFromSequenceFile(TEST_UTIL.getTestFileSystem(),
+ st.getPath(), configuration));
+ }
+ assertEquals(1, files.size());
+ assertEquals(true, files.contains(mobFile1));
+ }
+}