You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2014/09/19 12:11:01 UTC
[1/2] HBASE-11644 External MOB compaction tools (Jingcheng Du)
Repository: hbase
Updated Branches:
refs/heads/hbase-11339 9cf46dcbe -> 84e957c87
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
new file mode 100644
index 0000000..04fe359
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
@@ -0,0 +1,506 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.InvalidFamilyOperationException;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobFile;
+import org.apache.hadoop.hbase.mob.MobFileName;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.MobZookeeper.DummyMobAbortable;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.DefaultMemStore;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * The reducer of a sweep job.
+ * This reducer merges the small mob files into bigger ones, and write visited
+ * names of mob files to a sequence file which is used by the sweep job to delete
+ * the unused mob files.
+ * The key of the input is a file name, the value is a collection of KeyValue where
+ * the KeyValue is the actual cell (its format is valueLength + fileName) in HBase.
+ * In this reducer, we could know how many cells exist in HBase for a mob file.
+ * If the existCellSize/mobFileSize < compactionRatio, this mob
+ * file needs to be merged.
+ */
+@InterfaceAudience.Private
+public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
+
+ private static final Log LOG = LogFactory.getLog(SweepReducer.class);
+
+ private SequenceFile.Writer writer = null;
+ private MemStoreWrapper memstore;
+ private Configuration conf;
+ private FileSystem fs;
+
+ private Path familyDir;
+ private CacheConfig cacheConfig;
+ private long compactionBegin;
+ private HTable table;
+ private HColumnDescriptor family;
+ private long mobCompactionDelay;
+ private Path mobTableDir;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ this.conf = context.getConfiguration();
+ this.fs = FileSystem.get(conf);
+ // the MOB_COMPACTION_DELAY is ONE_DAY by default. Its value is only changed when testing.
+ mobCompactionDelay = conf.getLong(SweepJob.MOB_COMPACTION_DELAY, SweepJob.ONE_DAY);
+ String tableName = conf.get(TableInputFormat.INPUT_TABLE);
+ String familyName = conf.get(TableInputFormat.SCAN_COLUMN_FAMILY);
+ TableName tn = TableName.valueOf(tableName);
+ this.familyDir = MobUtils.getMobFamilyPath(conf, tn, familyName);
+ HBaseAdmin admin = new HBaseAdmin(this.conf);
+ try {
+ family = admin.getTableDescriptor(tn).getFamily(Bytes.toBytes(familyName));
+ if (family == null) {
+ // this column family might be removed, directly return.
+ throw new InvalidFamilyOperationException("Column family '" + familyName
+ + "' does not exist. It might be removed.");
+ }
+ } finally {
+ try {
+ admin.close();
+ } catch (IOException e) {
+ LOG.warn("Fail to close the HBaseAdmin", e);
+ }
+ }
+ // disable the block cache.
+ Configuration copyOfConf = new Configuration(conf);
+ copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.00001f);
+ this.cacheConfig = new CacheConfig(copyOfConf);
+
+ table = new HTable(this.conf, Bytes.toBytes(tableName));
+ table.setAutoFlush(false, false);
+
+ table.setWriteBufferSize(1 * 1024 * 1024); // 1MB
+ memstore = new MemStoreWrapper(context, fs, table, family, new DefaultMemStore(), cacheConfig);
+
+ // The start time of the sweep tool.
+ // Only the mob files whose creation time is older than startTime-oneDay will be handled by the
+ // reducer since it brings inconsistency to handle the latest mob files.
+ this.compactionBegin = conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE, 0);
+ mobTableDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), tn);
+ }
+
+ private SweepPartition createPartition(SweepPartitionId id, Context context) throws IOException {
+ return new SweepPartition(id, context);
+ }
+
+ @Override
+ public void run(Context context) throws IOException, InterruptedException {
+ String jobId = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID);
+ String sweeperNode = context.getConfiguration().get(SweepJob.SWEEPER_NODE);
+ ZooKeeperWatcher zkw = new ZooKeeperWatcher(context.getConfiguration(), jobId,
+ new DummyMobAbortable());
+ try {
+ SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, jobId);
+ tracker.start();
+ setup(context);
+ // create a sequence contains all the visited file names in this reducer.
+ String dir = this.conf.get(SweepJob.WORKING_VISITED_DIR_KEY);
+ Path nameFilePath = new Path(dir, UUID.randomUUID().toString()
+ .replace("-", MobConstants.EMPTY_STRING));
+ if (!fs.exists(nameFilePath)) {
+ fs.create(nameFilePath, true);
+ }
+ writer = SequenceFile.createWriter(fs, context.getConfiguration(), nameFilePath,
+ String.class, String.class);
+ SweepPartitionId id;
+ SweepPartition partition = null;
+ // the mob files which have the same start key and date are in the same partition.
+ while (context.nextKey()) {
+ Text key = context.getCurrentKey();
+ String keyString = key.toString();
+ id = SweepPartitionId.create(keyString);
+ if (null == partition || !id.equals(partition.getId())) {
+ // It's the first mob file in the current partition.
+ if (null != partition) {
+ // this mob file is in different partitions with the previous mob file.
+ // directly close.
+ partition.close();
+ }
+ // create a new one
+ partition = createPartition(id, context);
+ }
+ if (partition != null) {
+ // run the partition
+ partition.execute(key, context.getValues());
+ }
+ }
+ if (null != partition) {
+ partition.close();
+ }
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ } finally {
+ cleanup(context);
+ zkw.close();
+ if (writer != null) {
+ IOUtils.closeStream(writer);
+ }
+ if (table != null) {
+ try {
+ table.close();
+ } catch (IOException e) {
+ LOG.warn(e);
+ }
+ }
+ }
+
+ }
+
+ /**
+ * The mob files which have the same start key and date are in the same partition.
+ * The files in the same partition are merged together into bigger ones.
+ */
+ public class SweepPartition {
+
+ private final SweepPartitionId id;
+ private final Context context;
+ private boolean memstoreUpdated = false;
+ private boolean mergeSmall = false;
+ private final Map<String, MobFileStatus> fileStatusMap = new HashMap<String, MobFileStatus>();
+ private final List<Path> toBeDeleted = new ArrayList<Path>();
+
+ public SweepPartition(SweepPartitionId id, Context context) throws IOException {
+ this.id = id;
+ this.context = context;
+ memstore.setPartitionId(id);
+ init();
+ }
+
+ public SweepPartitionId getId() {
+ return this.id;
+ }
+
+ /**
+ * Prepares the map of files.
+ *
+ * @throws IOException
+ */
+ private void init() throws IOException {
+ FileStatus[] fileStats = listStatus(familyDir, id.getStartKey());
+ if (null == fileStats) {
+ return;
+ }
+
+ int smallFileCount = 0;
+ float compactionRatio = conf.getFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO,
+ MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO);
+ long compactionMergeableSize = conf.getLong(
+ MobConstants.MOB_SWEEP_TOOL_COMPACTION_MERGEABLE_SIZE,
+ MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE);
+ // list the files. Just merge the hfiles, don't merge the hfile links.
+ // prepare the map of mob files. The key is the file name, the value is the file status.
+ for (FileStatus fileStat : fileStats) {
+ MobFileStatus mobFileStatus = null;
+ if (!HFileLink.isHFileLink(fileStat.getPath())) {
+ mobFileStatus = new MobFileStatus(fileStat, compactionRatio, compactionMergeableSize);
+ if (mobFileStatus.needMerge()) {
+ smallFileCount++;
+ }
+ // key is file name (not hfile name), value is hfile status.
+ fileStatusMap.put(fileStat.getPath().getName(), mobFileStatus);
+ }
+ }
+ if (smallFileCount >= 2) {
+ // merge the files only when there're more than 1 files in the same partition.
+ this.mergeSmall = true;
+ }
+ }
+
+ /**
+ * Flushes the data into mob files and store files, and archives the small
+ * files after they're merged.
+ * @throws IOException
+ */
+ public void close() throws IOException {
+ if (null == id) {
+ return;
+ }
+ // flush remain key values into mob files
+ if (memstoreUpdated) {
+ memstore.flushMemStore();
+ }
+ List<StoreFile> storeFiles = new ArrayList<StoreFile>(toBeDeleted.size());
+ // delete samll files after compaction
+ for (Path path : toBeDeleted) {
+ LOG.info("[In Partition close] Delete the file " + path + " in partition close");
+ storeFiles.add(new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE));
+ }
+ if (!storeFiles.isEmpty()) {
+ try {
+ MobUtils.removeMobFiles(conf, fs, table.getName(), mobTableDir, family.getName(),
+ storeFiles);
+ context.getCounter(SweepCounter.FILE_TO_BE_MERGE_OR_CLEAN).increment(storeFiles.size());
+ } catch (IOException e) {
+ LOG.error("Fail to archive the store files " + storeFiles, e);
+ }
+ storeFiles.clear();
+ }
+ fileStatusMap.clear();
+ }
+
+ /**
+ * Merges the small mob files into bigger ones.
+ * @param fileName The current mob file name.
+ * @param values The collection of KeyValues in this mob file.
+ * @throws IOException
+ */
+ public void execute(Text fileName, Iterable<KeyValue> values) throws IOException {
+ if (null == values) {
+ return;
+ }
+ MobFileName mobFileName = MobFileName.create(fileName.toString());
+ LOG.info("[In reducer] The file name: " + fileName.toString());
+ MobFileStatus mobFileStat = fileStatusMap.get(mobFileName.getFileName());
+ if (null == mobFileStat) {
+ LOG.info("[In reducer] Cannot find the file, probably this record is obsolete");
+ return;
+ }
+ // only handle the files that are older then one day.
+ if (compactionBegin - mobFileStat.getFileStatus().getModificationTime()
+ <= mobCompactionDelay) {
+ return;
+ }
+ // write the hfile name
+ writer.append(mobFileName.getFileName(), MobConstants.EMPTY_STRING);
+ Set<KeyValue> kvs = new HashSet<KeyValue>();
+ for (KeyValue kv : values) {
+ if (kv.getValueLength() > Bytes.SIZEOF_INT) {
+ mobFileStat.addValidSize(Bytes.toInt(kv.getValueArray(), kv.getValueOffset(),
+ Bytes.SIZEOF_INT));
+ }
+ kvs.add(kv.createKeyOnly(false));
+ }
+ // If the mob file is a invalid one or a small one, merge it into new/bigger ones.
+ if (mobFileStat.needClean() || (mergeSmall && mobFileStat.needMerge())) {
+ context.getCounter(SweepCounter.INPUT_FILE_COUNT).increment(1);
+ MobFile file = MobFile.create(fs,
+ new Path(familyDir, mobFileName.getFileName()), conf, cacheConfig);
+ StoreFileScanner scanner = null;
+ try {
+ scanner = file.getScanner();
+ scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY));
+ Cell cell;
+ while (null != (cell = scanner.next())) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ KeyValue keyOnly = kv.createKeyOnly(false);
+ if (kvs.contains(keyOnly)) {
+ // write the KeyValue existing in HBase to the memstore.
+ memstore.addToMemstore(kv);
+ memstoreUpdated = true;
+ }
+ }
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ }
+ toBeDeleted.add(mobFileStat.getFileStatus().getPath());
+ }
+ }
+
+ /**
+ * Lists the files with the same prefix.
+ * @param p The file path.
+ * @param prefix The prefix.
+ * @return The files with the same prefix.
+ * @throws IOException
+ */
+ private FileStatus[] listStatus(Path p, String prefix) throws IOException {
+ return fs.listStatus(p, new PathPrefixFilter(prefix));
+ }
+ }
+
+ static class PathPrefixFilter implements PathFilter {
+
+ private final String prefix;
+
+ public PathPrefixFilter(String prefix) {
+ this.prefix = prefix;
+ }
+
+ public boolean accept(Path path) {
+ return path.getName().startsWith(prefix, 0);
+ }
+
+ }
+
+ /**
+ * The sweep partition id.
+ * It consists of the start key and date.
+ * The start key is a hex string of the checksum of a region start key.
+ * The date is the latest timestamp of cells in a mob file.
+ */
+ public static class SweepPartitionId {
+ private String date;
+ private String startKey;
+
+ public SweepPartitionId(MobFileName fileName) {
+ this.date = fileName.getDate();
+ this.startKey = fileName.getStartKey();
+ }
+
+ public SweepPartitionId(String date, String startKey) {
+ this.date = date;
+ this.startKey = startKey;
+ }
+
+ public static SweepPartitionId create(String key) {
+ return new SweepPartitionId(MobFileName.create(key));
+ }
+
+ @Override
+ public boolean equals(Object anObject) {
+ if (this == anObject) {
+ return true;
+ }
+ if (anObject instanceof SweepPartitionId) {
+ SweepPartitionId another = (SweepPartitionId) anObject;
+ if (this.date.equals(another.getDate()) && this.startKey.equals(another.getStartKey())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public String getDate() {
+ return this.date;
+ }
+
+ public String getStartKey() {
+ return this.startKey;
+ }
+
+ public void setDate(String date) {
+ this.date = date;
+ }
+
+ public void setStartKey(String startKey) {
+ this.startKey = startKey;
+ }
+ }
+
+ /**
+ * The mob file status used in the sweep reduecer.
+ */
+ private static class MobFileStatus {
+ private FileStatus fileStatus;
+ private int validSize;
+ private long size;
+
+ private float compactionRatio = MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO;
+ private long compactionMergeableSize =
+ MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE;
+
+ /**
+ * @param fileStatus The current FileStatus.
+ * @param compactionRatio compactionRatio the invalid ratio.
+ * If there're too many cells deleted in a mob file, it's regarded as invalid,
+ * and needs to be written to a new one.
+ * If existingCellSize/fileSize < compactionRatio, it's regarded as a invalid one.
+ * @param compactionMergeableSize compactionMergeableSize If the size of a mob file is less
+ * than this value, it's regarded as a small file and needs to be merged
+ */
+ public MobFileStatus(FileStatus fileStatus, float compactionRatio,
+ long compactionMergeableSize) {
+ this.fileStatus = fileStatus;
+ this.size = fileStatus.getLen();
+ validSize = 0;
+ this.compactionRatio = compactionRatio;
+ this.compactionMergeableSize = compactionMergeableSize;
+ }
+
+ /**
+ * Add size to this file.
+ * @param size The size to be added.
+ */
+ public void addValidSize(int size) {
+ this.validSize += size;
+ }
+
+ /**
+ * Whether the mob files need to be cleaned.
+ * If there're too many cells deleted in this mob file, it needs to be cleaned.
+ * @return True if it needs to be cleaned.
+ */
+ public boolean needClean() {
+ return validSize < compactionRatio * size;
+ }
+
+ /**
+ * Whether the mob files need to be merged.
+ * If this mob file is too small, it needs to be merged.
+ * @return True if it needs to be merged.
+ */
+ public boolean needMerge() {
+ return this.size < compactionMergeableSize;
+ }
+
+ /**
+ * Gets the file status.
+ * @return The file status.
+ */
+ public FileStatus getFileStatus() {
+ return fileStatus;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
new file mode 100644
index 0000000..d71dc83
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
@@ -0,0 +1,108 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.zookeeper.KeeperException;
+
+import com.google.protobuf.ServiceException;
+
+/**
+ * The sweep tool. It deletes the mob files that are not used and merges the small mob files to
+ * bigger ones. Each run of this sweep tool only handles one column family. The runs on
+ * the same column family are mutually exclusive. And the major compaction and sweep tool on the
+ * same column family are mutually exclusive too.
+ */
+@InterfaceAudience.Public
+public class Sweeper extends Configured implements Tool {
+
+ /**
+ * Sweeps the mob files on one column family. It deletes the unused mob files and merges
+ * the small mob files into bigger ones.
+ * @param tableName The current table name in string format.
+ * @param familyName The column family name.
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws ClassNotFoundException
+ * @throws KeeperException
+ * @throws ServiceException
+ */
+ void sweepFamily(String tableName, String familyName) throws IOException, InterruptedException,
+ ClassNotFoundException, KeeperException, ServiceException {
+ Configuration conf = getConf();
+ // make sure the target HBase exists.
+ HBaseAdmin.checkHBaseAvailable(conf);
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ TableName tn = TableName.valueOf(tableName);
+ HTableDescriptor htd = admin.getTableDescriptor(tn);
+ HColumnDescriptor family = htd.getFamily(Bytes.toBytes(familyName));
+ if (family == null || !MobUtils.isMobFamily(family)) {
+ throw new IOException("Column family " + familyName + " is not a MOB column family");
+ }
+ SweepJob job = new SweepJob(conf, fs);
+ // Run the sweeping
+ job.sweep(tn, family);
+ } finally {
+ try {
+ admin.close();
+ } catch (IOException e) {
+ System.out.println("Fail to close the HBaseAdmin: " + e.getMessage());
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ ToolRunner.run(conf, new Sweeper(), args);
+ }
+
+ private void printUsage() {
+ System.err.println("Usage:\n" + "--------------------------\n" + Sweeper.class.getName()
+ + " tableName familyName");
+ System.err.println(" tableName The table name");
+ System.err.println(" familyName The column family name");
+ }
+
+ public int run(String[] args) throws Exception {
+ if (args.length != 2) {
+ printUsage();
+ return 1;
+ }
+ String table = args[0];
+ String family = args[1];
+ sweepFamily(table, family);
+ return 0;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index 9c6f34e..071b5fe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -18,13 +18,17 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.List;
import java.util.NavigableSet;
import java.util.UUID;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -32,7 +36,10 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
@@ -44,7 +51,10 @@ import org.apache.hadoop.hbase.mob.MobFile;
import org.apache.hadoop.hbase.mob.MobFileName;
import org.apache.hadoop.hbase.mob.MobStoreEngine;
import org.apache.hadoop.hbase.mob.MobUtils;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.mob.MobZookeeper;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.apache.zookeeper.KeeperException;
/**
* The store implementation to save MOBs (medium objects), it extends the HStore.
@@ -68,6 +78,7 @@ public class HMobStore extends HStore {
private MobCacheConfig mobCacheConfig;
private Path homePath;
private Path mobFamilyPath;
+ private List<Path> mobDirLocations;
public HMobStore(final HRegion region, final HColumnDescriptor family,
final Configuration confParam) throws IOException {
@@ -76,6 +87,11 @@ public class HMobStore extends HStore {
this.homePath = MobUtils.getMobHome(conf);
this.mobFamilyPath = MobUtils.getMobFamilyPath(conf, this.getTableName(),
family.getNameAsString());
+ mobDirLocations = new ArrayList<Path>();
+ mobDirLocations.add(mobFamilyPath);
+ TableName tn = region.getTableDesc().getTableName();
+ mobDirLocations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils
+ .getMobRegionInfo(tn).getEncodedName(), family.getNameAsString()));
}
/**
@@ -87,6 +103,13 @@ public class HMobStore extends HStore {
}
/**
+ * Gets current config.
+ */
+ public Configuration getConfiguration() {
+ return this.conf;
+ }
+
+ /**
* Gets the MobStoreScanner or MobReversedStoreScanner. In these scanners, a additional seeks in
* the mob files should be performed after the seek in HBase is done.
*/
@@ -94,6 +117,15 @@ public class HMobStore extends HStore {
protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
long readPt, KeyValueScanner scanner) throws IOException {
if (scanner == null) {
+ if (MobUtils.isRefOnlyScan(scan)) {
+ Filter refOnlyFilter = new MobReferenceOnlyFilter();
+ Filter filter = scan.getFilter();
+ if (filter != null) {
+ scan.setFilter(new FilterList(filter, refOnlyFilter));
+ } else {
+ scan.setFilter(refOnlyFilter);
+ }
+ }
scanner = scan.isReversed() ? new ReversedMobStoreScanner(this, getScanInfo(), scan,
targetCols, readPt) : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt);
}
@@ -219,30 +251,10 @@ public class HMobStore extends HStore {
*/
public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException {
Cell result = null;
- if (MobUtils.isValidMobRefCellValue(reference)) {
+ if (MobUtils.hasValidMobRefCellValue(reference)) {
String fileName = MobUtils.getMobFileName(reference);
- Path targetPath = new Path(mobFamilyPath, fileName);
- MobFile file = null;
- try {
- file = mobCacheConfig.getMobFileCache().openFile(region.getFilesystem(), targetPath,
- mobCacheConfig);
- result = file.readCell(reference, cacheBlocks);
- } catch (IOException e) {
- LOG.error("Fail to open/read the mob file " + targetPath.toString(), e);
- } catch (NullPointerException e) {
- // When delete the file during the scan, the hdfs getBlockRange will
- // throw NullPointerException, catch it and manage it.
- LOG.error("Fail to read the mob file " + targetPath.toString()
- + " since it's already deleted", e);
- } finally {
- if (file != null) {
- mobCacheConfig.getMobFileCache().closeFile(file);
- }
- }
- } else {
- LOG.warn("Invalid reference to mob, " + reference.getValueLength() + " bytes is too short");
+ result = readCell(fileName, reference, cacheBlocks);
}
-
if (result == null) {
LOG.warn("The KeyValue result is null, assemble a new KeyValue with the same row,family,"
+ "qualifier,timestamp,type and tags but with an empty value to return.");
@@ -258,10 +270,132 @@ public class HMobStore extends HStore {
}
/**
+ * Reads the cell from a mob file.
+ * The mob file might be located in different directories.
+ * 1. The working directory.
+ * 2. The archive directory.
+ * Reads the cell from the files located in both of the above directories.
+ * @param fileName The file to be read.
+ * @param search The cell to be searched.
+ * @param cacheMobBlocks Whether the scanner should cache blocks.
+ * @return The found cell. Null if there's no such a cell.
+ * @throws IOException
+ */
+ private Cell readCell(String fileName, Cell search, boolean cacheMobBlocks) throws IOException {
+ FileSystem fs = getFileSystem();
+ for (Path location : mobDirLocations) {
+ MobFile file = null;
+ Path path = new Path(location, fileName);
+ try {
+ file = mobCacheConfig.getMobFileCache().openFile(fs, path, mobCacheConfig);
+ return file.readCell(search, cacheMobBlocks);
+ } catch (IOException e) {
+ mobCacheConfig.getMobFileCache().evictFile(fileName);
+ if (e instanceof FileNotFoundException) {
+ LOG.warn("Fail to read the cell, the mob file " + path + " doesn't exist", e);
+ } else {
+ throw e;
+ }
+ } finally {
+ if (file != null) {
+ mobCacheConfig.getMobFileCache().closeFile(file);
+ }
+ }
+ }
+ LOG.error("The mob file " + fileName + " could not be found in the locations "
+ + mobDirLocations);
+ return null;
+ }
+
+ /**
* Gets the mob file path.
* @return The mob file path.
*/
public Path getPath() {
return mobFamilyPath;
}
+
+ /**
+ * The compaction in the store of mob.
+ * The cells in this store contains the path of the mob files. There might be race
+ * condition between the major compaction and the sweeping in mob files.
+ * In order to avoid this, we need mutually exclude the running of the major compaction and
+ * sweeping in mob files.
+ * The minor compaction is not affected.
+ * The major compaction is converted to a minor one when a sweeping is in progress.
+ */
+ @Override
+ public List<StoreFile> compact(CompactionContext compaction) throws IOException {
+ // If it's major compaction, try to find whether there's a sweeper is running
+ // If yes, change the major compaction to a minor one.
+ if (compaction.getRequest().isMajor()) {
+ // Use the Zookeeper to coordinate.
+ // 1. Acquire a operation lock.
+ // 1.1. If no, convert the major compaction to a minor one and continue the compaction.
+ // 1.2. If the lock is obtained, search the node of sweeping.
+ // 1.2.1. If the node is there, the sweeping is in progress, convert the major
+ // compaction to a minor one and continue the compaction.
+ // 1.2.2. If the node is not there, add a child to the major compaction node, and
+ // run the compaction directly.
+ String compactionName = UUID.randomUUID().toString().replaceAll("-", "");
+ MobZookeeper zk = null;
+ try {
+ zk = MobZookeeper.newInstance(this.conf, compactionName);
+ } catch (KeeperException e) {
+ LOG.error("Cannot connect to the zookeeper, ready to perform the minor compaction instead",
+ e);
+ // change the major compaction into a minor one
+ compaction.getRequest().setIsMajor(false, false);
+ return super.compact(compaction);
+ }
+ boolean major = false;
+ try {
+ // try to acquire the operation lock.
+ if (zk.lockColumnFamily(getTableName().getNameAsString(), getFamily().getNameAsString())) {
+ try {
+ LOG.info("Obtain the lock for the store[" + this
+ + "], ready to perform the major compaction");
+ // check the sweeping node to find out whether the sweeping is in progress.
+ boolean hasSweeper = zk.isSweeperZNodeExist(getTableName().getNameAsString(),
+ getFamily().getNameAsString());
+ if (!hasSweeper) {
+ // if not, add a child to the major compaction node of this store.
+ major = zk.addMajorCompactionZNode(getTableName().getNameAsString(), getFamily()
+ .getNameAsString(), compactionName);
+ }
+ } catch (Exception e) {
+ LOG.error("Fail to handle the Zookeeper", e);
+ } finally {
+ // release the operation lock
+ zk.unlockColumnFamily(getTableName().getNameAsString(), getFamily().getNameAsString());
+ }
+ }
+ try {
+ if (major) {
+ return super.compact(compaction);
+ } else {
+ LOG.warn("Cannot obtain the lock or a sweep tool is running on this store["
+ + this + "], ready to perform the minor compaction instead");
+ // change the major compaction into a minor one
+ compaction.getRequest().setIsMajor(false, false);
+ return super.compact(compaction);
+ }
+ } finally {
+ if (major) {
+ try {
+ zk.deleteMajorCompactionZNode(getTableName().getNameAsString(), getFamily()
+ .getNameAsString(), compactionName);
+ } catch (KeeperException e) {
+ LOG.error("Fail to delete the compaction znode" + compactionName, e);
+ }
+ }
+ }
+ } finally {
+ zk.close();
+ }
+ } else {
+ // If it's not a major compaction, continue the compaction.
+ return super.compact(compaction);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobReferenceOnlyFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobReferenceOnlyFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobReferenceOnlyFilter.java
new file mode 100644
index 0000000..10aea24
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobReferenceOnlyFilter.java
@@ -0,0 +1,42 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.mob.MobUtils;
+
+/**
+ * A filter that returns the cells which have mob reference tags. It's a server-side filter.
+ */
+@InterfaceAudience.Private
+class MobReferenceOnlyFilter extends FilterBase {
+
+ @Override
+ public ReturnCode filterKeyValue(Cell cell) {
+ if (null != cell) {
+ // If a cell with a mob reference tag, it's included.
+ if (MobUtils.isMobReferenceCell(cell)) {
+ return ReturnCode.INCLUDE;
+ }
+ }
+ return ReturnCode.SKIP;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleaner.java
new file mode 100644
index 0000000..7cba86c
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleaner.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mob.ExpiredMobFileCleaner;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestExpiredMobFileCleaner {
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private final static TableName tableName = TableName.valueOf("TestExpiredMobFileCleaner");
+ private final static String family = "family";
+ private final static byte[] row1 = Bytes.toBytes("row1");
+ private final static byte[] row2 = Bytes.toBytes("row2");
+ private final static byte[] qf = Bytes.toBytes("qf");
+
+ private static HTable table;
+ private static Admin admin;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+
+ TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ TEST_UTIL.startMiniCluster(1);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ admin.disableTable(tableName);
+ admin.deleteTable(tableName);
+ admin.close();
+ TEST_UTIL.shutdownMiniCluster();
+ TEST_UTIL.getTestFileSystem().delete(TEST_UTIL.getDataTestDir(), true);
+ }
+
+ private void init() throws Exception {
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+ hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(3L));
+ hcd.setMaxVersions(4);
+ desc.addFamily(hcd);
+
+ admin = TEST_UTIL.getHBaseAdmin();
+ admin.createTable(desc);
+ table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+ table.setAutoFlush(false, false);
+ }
+
+ private void modifyColumnExpiryDays(int expireDays) throws Exception {
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+ hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(3L));
+ // change ttl as expire days to make some row expired
+ int timeToLive = expireDays * secondsOfDay();
+ hcd.setTimeToLive(timeToLive);
+
+ admin.modifyColumn(tableName, hcd);
+ }
+
+ private void putKVAndFlush(HTable table, byte[] row, byte[] value, long ts)
+ throws Exception {
+
+ Put put = new Put(row, ts);
+ put.add(Bytes.toBytes(family), qf, value);
+ table.put(put);
+
+ table.flushCommits();
+ admin.flush(tableName);
+ }
+
+ /**
+ * Creates a 3 day old hfile and an 1 day old hfile then sets expiry to 2 days.
+ * Verifies that the 3 day old hfile is removed but the 1 day one is still present
+ * after the expiry based cleaner is run.
+ */
+ @Test
+ public void testCleaner() throws Exception {
+ init();
+
+ Path mobDirPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);
+
+ byte[] dummyData = makeDummyData(600);
+ long ts = System.currentTimeMillis() - 3 * secondsOfDay() * 1000; // 3 days before
+ putKVAndFlush(table, row1, dummyData, ts);
+ FileStatus[] firstFiles = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
+ //the first mob file
+ assertEquals("Before cleanup without delay 1", 1, firstFiles.length);
+ String firstFile = firstFiles[0].getPath().getName();
+
+ ts = System.currentTimeMillis() - 1 * secondsOfDay() * 1000; // 1 day before
+ putKVAndFlush(table, row2, dummyData, ts);
+ FileStatus[] secondFiles = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
+ //now there are 2 mob files
+ assertEquals("Before cleanup without delay 2", 2, secondFiles.length);
+ String f1 = secondFiles[0].getPath().getName();
+ String f2 = secondFiles[1].getPath().getName();
+ String secondFile = f1.equals(firstFile) ? f2 : f1;
+
+ modifyColumnExpiryDays(2); // ttl = 2, make the first row expired
+
+ //run the cleaner
+ String[] args = new String[2];
+ args[0] = tableName.getNameAsString();
+ args[1] = family;
+ ToolRunner.run(TEST_UTIL.getConfiguration(), new ExpiredMobFileCleaner(), args);
+
+ FileStatus[] filesAfterClean = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
+ String lastFile = filesAfterClean[0].getPath().getName();
+ //the first mob fie is removed
+ assertEquals("After cleanup without delay 1", 1, filesAfterClean.length);
+ assertEquals("After cleanup without delay 2", secondFile, lastFile);
+ }
+
+ private Path getMobFamilyPath(Configuration conf, TableName tableName, String familyName) {
+ Path p = new Path(MobUtils.getMobRegionPath(conf, tableName), familyName);
+ return p;
+ }
+
+ private int secondsOfDay() {
+ return 24 * 3600;
+ }
+
+ private byte[] makeDummyData(int size) {
+ byte [] dummyData = new byte[size];
+ new Random().nextBytes(dummyData);
+ return dummyData;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
new file mode 100644
index 0000000..e0b9a83
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.serializer.JavaSerialization;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestMobSweepJob {
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+ TEST_UTIL.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
+ JavaSerialization.class.getName() + "," + WritableSerialization.class.getName());
+ TEST_UTIL.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ private void writeFileNames(FileSystem fs, Configuration conf, Path path,
+ String[] filesNames) throws IOException {
+ // write the names to a sequence file
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path,
+ String.class, String.class);
+ try {
+ for (String fileName : filesNames) {
+ writer.append(fileName, MobConstants.EMPTY_STRING);
+ }
+ } finally {
+ IOUtils.closeStream(writer);
+ }
+ }
+
+ @Test
+ public void testSweeperJobWithOutUnusedFile() throws Exception {
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ Configuration configuration = new Configuration(
+ TEST_UTIL.getConfiguration());
+ Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+ "/hbase/mobcompaction/SweepJob/working/names/0/visited");
+ Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+ "/hbase/mobcompaction/SweepJob/working/names/0/all");
+ configuration.set(SweepJob.WORKING_VISITED_DIR_KEY,
+ vistiedFileNamesPath.toString());
+ configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY,
+ allFileNamesPath.toString());
+
+ writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1",
+ "2", "3", "4", "5", "6"});
+
+ Path r0 = new Path(vistiedFileNamesPath, "r0");
+ writeFileNames(fs, configuration, r0, new String[] { "1",
+ "2", "3"});
+ Path r1 = new Path(vistiedFileNamesPath, "r1");
+ writeFileNames(fs, configuration, r1, new String[] { "1", "4", "5"});
+ Path r2 = new Path(vistiedFileNamesPath, "r2");
+ writeFileNames(fs, configuration, r2, new String[] { "2", "3", "6"});
+
+ SweepJob sweepJob = new SweepJob(configuration, fs);
+ List<String> toBeArchived = sweepJob.getUnusedFiles(configuration);
+
+ assertEquals(0, toBeArchived.size());
+ }
+
+ @Test
+ public void testSweeperJobWithUnusedFile() throws Exception {
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ Configuration configuration = new Configuration(
+ TEST_UTIL.getConfiguration());
+ Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+ "/hbase/mobcompaction/SweepJob/working/names/1/visited");
+ Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+ "/hbase/mobcompaction/SweepJob/working/names/1/all");
+ configuration.set(SweepJob.WORKING_VISITED_DIR_KEY,
+ vistiedFileNamesPath.toString());
+ configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY,
+ allFileNamesPath.toString());
+
+ writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1",
+ "2", "3", "4", "5", "6"});
+
+ Path r0 = new Path(vistiedFileNamesPath, "r0");
+ writeFileNames(fs, configuration, r0, new String[] { "1",
+ "2", "3"});
+ Path r1 = new Path(vistiedFileNamesPath, "r1");
+ writeFileNames(fs, configuration, r1, new String[] { "1", "5"});
+ Path r2 = new Path(vistiedFileNamesPath, "r2");
+ writeFileNames(fs, configuration, r2, new String[] { "2", "3"});
+
+ SweepJob sweepJob = new SweepJob(configuration, fs);
+ List<String> toBeArchived = sweepJob.getUnusedFiles(configuration);
+
+ assertEquals(2, toBeArchived.size());
+ assertEquals(new String[] { "4", "6" }, toBeArchived.toArray(new String[0]));
+ }
+
+ @Test
+ public void testSweeperJobWithRedundantFile() throws Exception {
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ Configuration configuration = new Configuration(
+ TEST_UTIL.getConfiguration());
+ Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+ "/hbase/mobcompaction/SweepJob/working/names/2/visited");
+ Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration),
+ "/hbase/mobcompaction/SweepJob/working/names/2/all");
+ configuration.set(SweepJob.WORKING_VISITED_DIR_KEY,
+ vistiedFileNamesPath.toString());
+ configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY,
+ allFileNamesPath.toString());
+
+ writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1",
+ "2", "3", "4", "5", "6"});
+
+ Path r0 = new Path(vistiedFileNamesPath, "r0");
+ writeFileNames(fs, configuration, r0, new String[] { "1",
+ "2", "3"});
+ Path r1 = new Path(vistiedFileNamesPath, "r1");
+ writeFileNames(fs, configuration, r1, new String[] { "1", "5", "6", "7"});
+ Path r2 = new Path(vistiedFileNamesPath, "r2");
+ writeFileNames(fs, configuration, r2, new String[] { "2", "3", "4"});
+
+ SweepJob sweepJob = new SweepJob(configuration, fs);
+ List<String> toBeArchived = sweepJob.getUnusedFiles(configuration);
+
+ assertEquals(0, toBeArchived.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
new file mode 100644
index 0000000..a7e2538
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mob.MobZookeeper;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepMapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;
+
+@Category(SmallTests.class)
+public class TestMobSweepMapper {
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+ TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+ TEST_UTIL.startMiniCluster(1);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void TestMap() throws Exception {
+ String prefix = "0000";
+ final String fileName = "19691231f2cd014ea28f42788214560a21a44cef";
+ final String mobFilePath = prefix + fileName;
+
+ ImmutableBytesWritable r = new ImmutableBytesWritable(Bytes.toBytes("r"));
+ final KeyValue[] kvList = new KeyValue[1];
+ kvList[0] = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"),
+ Bytes.toBytes("column"), Bytes.toBytes(mobFilePath));
+
+ Result columns = mock(Result.class);
+ when(columns.raw()).thenReturn(kvList);
+
+ Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
+ configuration.set(SweepJob.SWEEP_JOB_ID, "1");
+ configuration.set(SweepJob.SWEEPER_NODE, "/hbase/MOB/testSweepMapper:family-sweeper");
+
+ MobZookeeper zk = MobZookeeper.newInstance(configuration, "1");
+ zk.addSweeperZNode("testSweepMapper", "family", Bytes.toBytes("1"));
+
+ Mapper<ImmutableBytesWritable, Result, Text, KeyValue>.Context ctx =
+ mock(Mapper.Context.class);
+ when(ctx.getConfiguration()).thenReturn(configuration);
+ SweepMapper map = new SweepMapper();
+ doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ Text text = (Text) invocation.getArguments()[0];
+ KeyValue kv = (KeyValue) invocation.getArguments()[1];
+
+ assertEquals(Bytes.toString(text.getBytes(), 0, text.getLength()), fileName);
+ assertEquals(0, Bytes.compareTo(kv.getKey(), kvList[0].getKey()));
+
+ return null;
+ }
+ }).when(ctx).write(any(Text.class), any(KeyValue.class));
+
+ map.map(r, columns, ctx);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
new file mode 100644
index 0000000..0f4c3ff
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.MobZookeeper;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.JavaSerialization;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.counters.GenericCounter;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Matchers;
+
+@Category(MediumTests.class)
+public class TestMobSweepReducer {
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private final static String tableName = "testSweepReducer";
+ private final static String row = "row";
+ private final static String family = "family";
+ private final static String qf = "qf";
+ private static HTable table;
+ private static Admin admin;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+
+ TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+
+ TEST_UTIL.startMiniCluster(1);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @SuppressWarnings("deprecation")
+ @Before
+ public void setUp() throws Exception {
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+ hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(3L));
+ hcd.setMaxVersions(4);
+ desc.addFamily(hcd);
+
+ admin = TEST_UTIL.getHBaseAdmin();
+ admin.createTable(desc);
+ table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ admin.disableTable(TableName.valueOf(tableName));
+ admin.deleteTable(TableName.valueOf(tableName));
+ admin.close();
+ }
+
+ private List<String> getKeyFromSequenceFile(FileSystem fs, Path path,
+ Configuration conf) throws Exception {
+ List<String> list = new ArrayList<String>();
+ SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path));
+
+ String next = (String) reader.next((String) null);
+ while (next != null) {
+ list.add(next);
+ next = (String) reader.next((String) null);
+ }
+ reader.close();
+ return list;
+ }
+
+ @Test
+ public void testRun() throws Exception {
+
+ byte[] mobValueBytes = new byte[100];
+
+ //get the path where mob files lie in
+ Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(),
+ TableName.valueOf(tableName), family);
+
+ Put put = new Put(Bytes.toBytes(row));
+ put.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
+ Put put2 = new Put(Bytes.toBytes(row + "ignore"));
+ put2.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
+ table.put(put);
+ table.put(put2);
+ table.flushCommits();
+ admin.flush(TableName.valueOf(tableName));
+
+ FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+ //check the generation of a mob file
+ assertEquals(1, fileStatuses.length);
+
+ String mobFile1 = fileStatuses[0].getPath().getName();
+
+ Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
+ configuration.setFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO, 0.6f);
+ configuration.setStrings(TableInputFormat.INPUT_TABLE, tableName);
+ configuration.setStrings(TableInputFormat.SCAN_COLUMN_FAMILY, family);
+ configuration.setStrings(SweepJob.WORKING_VISITED_DIR_KEY, "jobWorkingNamesDir");
+ configuration.setStrings(SweepJob.WORKING_FILES_DIR_KEY, "compactionFileDir");
+ configuration.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
+ JavaSerialization.class.getName());
+ configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, "compactionVisitedDir");
+ configuration.setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE,
+ System.currentTimeMillis() + 24 * 3600 * 1000);
+
+ configuration.set(SweepJob.SWEEP_JOB_ID, "1");
+ configuration.set(SweepJob.SWEEPER_NODE, "/hbase/MOB/testSweepReducer:family-sweeper");
+
+ MobZookeeper zk = MobZookeeper.newInstance(configuration, "1");
+ zk.addSweeperZNode(tableName, family, Bytes.toBytes("1"));
+
+ //use the same counter when mocking
+ Counter counter = new GenericCounter();
+ Reducer<Text, KeyValue, Writable, Writable>.Context ctx =
+ mock(Reducer.Context.class);
+ when(ctx.getConfiguration()).thenReturn(configuration);
+ when(ctx.getCounter(Matchers.any(SweepCounter.class))).thenReturn(counter);
+ when(ctx.nextKey()).thenReturn(true).thenReturn(false);
+ when(ctx.getCurrentKey()).thenReturn(new Text(mobFile1));
+
+ byte[] refBytes = Bytes.toBytes(mobFile1);
+ long valueLength = refBytes.length;
+ byte[] newValue = Bytes.add(Bytes.toBytes(valueLength), refBytes);
+ KeyValue kv2 = new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family),
+ Bytes.toBytes(qf), 1, KeyValue.Type.Put, newValue);
+ List<KeyValue> list = new ArrayList<KeyValue>();
+ list.add(kv2);
+
+ when(ctx.getValues()).thenReturn(list);
+
+ SweepReducer reducer = new SweepReducer();
+ reducer.run(ctx);
+
+ FileStatus[] filsStatuses2 = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+ String mobFile2 = filsStatuses2[0].getPath().getName();
+ //new mob file is generated, old one has been archived
+ assertEquals(1, filsStatuses2.length);
+ assertEquals(false, mobFile2.equalsIgnoreCase(mobFile1));
+
+ //test sequence file
+ String workingPath = configuration.get("mob.compaction.visited.dir");
+ FileStatus[] statuses = TEST_UTIL.getTestFileSystem().listStatus(new Path(workingPath));
+ Set<String> files = new TreeSet<String>();
+ for (FileStatus st : statuses) {
+ files.addAll(getKeyFromSequenceFile(TEST_UTIL.getTestFileSystem(),
+ st.getPath(), configuration));
+ }
+ assertEquals(1, files.size());
+ assertEquals(true, files.contains(mobFile1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
new file mode 100644
index 0000000..c43bceb
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestMobSweeper {
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private String tableName;
+ private final static String row = "row_";
+ private final static String family = "family";
+ private final static String column = "column";
+ private static HTable table;
+ private static Admin admin;
+
+ private Random random = new Random();
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+
+ TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+
+ TEST_UTIL.startMiniCluster();
+
+ TEST_UTIL.startMiniMapReduceCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ TEST_UTIL.shutdownMiniMapReduceCluster();
+ }
+
+ @SuppressWarnings("deprecation")
+ @Before
+ public void setUp() throws Exception {
+ long tid = System.currentTimeMillis();
+ tableName = "testSweeper" + tid;
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+ hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(3L));
+ hcd.setMaxVersions(4);
+ desc.addFamily(hcd);
+
+ admin = TEST_UTIL.getHBaseAdmin();
+ admin.createTable(desc);
+ table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+ table.setAutoFlush(false);
+
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ admin.disableTable(TableName.valueOf(tableName));
+ admin.deleteTable(TableName.valueOf(tableName));
+ admin.close();
+ }
+
+ private Path getMobFamilyPath(Configuration conf, String tableNameStr,
+ String familyName) {
+ Path p = new Path(MobUtils.getMobRegionPath(conf, TableName.valueOf(tableNameStr)),
+ familyName);
+ return p;
+ }
+
+
+ private String mergeString(Set<String> set) {
+ StringBuilder sb = new StringBuilder();
+ for (String s : set)
+ sb.append(s);
+ return sb.toString();
+ }
+
+
+ private void generateMobTable(int count, int flushStep)
+ throws IOException, InterruptedException {
+ if (count <= 0 || flushStep <= 0)
+ return;
+ int index = 0;
+ for (int i = 0; i < count; i++) {
+ byte[] mobVal = new byte[101*1024];
+ random.nextBytes(mobVal);
+
+ Put put = new Put(Bytes.toBytes(row + i));
+ put.add(Bytes.toBytes(family), Bytes.toBytes(column), mobVal);
+ table.put(put);
+ if (index++ % flushStep == 0) {
+ table.flushCommits();
+ admin.flush(TableName.valueOf(tableName));
+ }
+
+
+ }
+ table.flushCommits();
+ admin.flush(TableName.valueOf(tableName));
+ }
+
+ @Test
+ public void testSweeper() throws Exception {
+
+ int count = 10;
+ //create table and generate 10 mob files
+ generateMobTable(count, 1);
+
+ //get mob files
+ Path mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);
+ FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+ // mobFileSet0 stores the orignal mob files
+ TreeSet<String> mobFilesSet = new TreeSet<String>();
+ for (FileStatus status : fileStatuses) {
+ mobFilesSet.add(status.getPath().getName());
+ }
+
+ //scan the table, retreive the references
+ Scan scan = new Scan();
+ scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+ scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
+ ResultScanner rs = table.getScanner(scan);
+ TreeSet<String> mobFilesScanned = new TreeSet<String>();
+ for (Result res : rs) {
+ byte[] valueBytes = res.getValue(Bytes.toBytes(family),
+ Bytes.toBytes(column));
+ mobFilesScanned.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT,
+ valueBytes.length - Bytes.SIZEOF_INT));
+ }
+
+ //there should be 10 mob files
+ assertEquals(10, mobFilesScanned.size());
+ //check if we store the correct reference of mob files
+ assertEquals(mergeString(mobFilesSet), mergeString(mobFilesScanned));
+
+
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setLong(SweepJob.MOB_COMPACTION_DELAY, 24 * 60 * 60 * 1000);
+
+ String[] args = new String[2];
+ args[0] = tableName;
+ args[1] = family;
+ ToolRunner.run(conf, new Sweeper(), args);
+
+
+ mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);
+ fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+ mobFilesSet = new TreeSet<String>();
+ for (FileStatus status : fileStatuses) {
+ mobFilesSet.add(status.getPath().getName());
+ }
+
+ assertEquals(10, mobFilesSet.size());
+
+
+ scan = new Scan();
+ scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+ scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
+ rs = table.getScanner(scan);
+ TreeSet<String> mobFilesScannedAfterJob = new TreeSet<String>();
+ for (Result res : rs) {
+ byte[] valueBytes = res.getValue(Bytes.toBytes(family), Bytes.toBytes(
+ column));
+ mobFilesScannedAfterJob.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT,
+ valueBytes.length - Bytes.SIZEOF_INT));
+ }
+
+ assertEquals(10, mobFilesScannedAfterJob.size());
+
+ fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+ mobFilesSet = new TreeSet<String>();
+ for (FileStatus status : fileStatuses) {
+ mobFilesSet.add(status.getPath().getName());
+ }
+
+ assertEquals(10, mobFilesSet.size());
+ assertEquals(true, mobFilesScannedAfterJob.iterator().next()
+ .equalsIgnoreCase(mobFilesSet.iterator().next()));
+
+ }
+
+ @Test
+ public void testCompactionDelaySweeper() throws Exception {
+
+ int count = 10;
+ //create table and generate 10 mob files
+ generateMobTable(count, 1);
+
+ //get mob files
+ Path mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);
+ FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+ // mobFileSet0 stores the orignal mob files
+ TreeSet<String> mobFilesSet = new TreeSet<String>();
+ for (FileStatus status : fileStatuses) {
+ mobFilesSet.add(status.getPath().getName());
+ }
+
+ //scan the table, retreive the references
+ Scan scan = new Scan();
+ scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+ scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
+ ResultScanner rs = table.getScanner(scan);
+ TreeSet<String> mobFilesScanned = new TreeSet<String>();
+ for (Result res : rs) {
+ byte[] valueBytes = res.getValue(Bytes.toBytes(family),
+ Bytes.toBytes(column));
+ mobFilesScanned.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT,
+ valueBytes.length - Bytes.SIZEOF_INT));
+ }
+
+ //there should be 10 mob files
+ assertEquals(10, mobFilesScanned.size());
+ //check if we store the correct reference of mob files
+ assertEquals(mergeString(mobFilesSet), mergeString(mobFilesScanned));
+
+
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setLong(SweepJob.MOB_COMPACTION_DELAY, 0);
+
+ String[] args = new String[2];
+ args[0] = tableName;
+ args[1] = family;
+ ToolRunner.run(conf, new Sweeper(), args);
+
+
+ mobFamilyPath = getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);
+ fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+ mobFilesSet = new TreeSet<String>();
+ for (FileStatus status : fileStatuses) {
+ mobFilesSet.add(status.getPath().getName());
+ }
+
+ assertEquals(1, mobFilesSet.size());
+
+
+ scan = new Scan();
+ scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+ scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
+ rs = table.getScanner(scan);
+ TreeSet<String> mobFilesScannedAfterJob = new TreeSet<String>();
+ for (Result res : rs) {
+ byte[] valueBytes = res.getValue(Bytes.toBytes(family), Bytes.toBytes(
+ column));
+ mobFilesScannedAfterJob.add(Bytes.toString(valueBytes, Bytes.SIZEOF_INT,
+ valueBytes.length - Bytes.SIZEOF_INT));
+ }
+
+ assertEquals(1, mobFilesScannedAfterJob.size());
+
+ fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+ mobFilesSet = new TreeSet<String>();
+ for (FileStatus status : fileStatuses) {
+ mobFilesSet.add(status.getPath().getName());
+ }
+
+ assertEquals(1, mobFilesSet.size());
+ assertEquals(true, mobFilesScannedAfterJob.iterator().next()
+ .equalsIgnoreCase(mobFilesSet.iterator().next()));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
index f8d6ce4..51f4de3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
@@ -53,10 +53,13 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.MobZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -70,7 +73,7 @@ public class TestMobCompaction {
@Rule
public TestName name = new TestName();
static final Log LOG = LogFactory.getLog(TestMobCompaction.class.getName());
- private HBaseTestingUtility UTIL = null;
+ private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
private Configuration conf = null;
private HRegion region = null;
@@ -84,14 +87,22 @@ public class TestMobCompaction {
private final byte[] STARTROW = Bytes.toBytes(START_KEY);
private int compactionThreshold;
- private void init(long mobThreshold) throws Exception {
- this.mobCellThreshold = mobThreshold;
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+ UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+ UTIL.startMiniCluster(1);
+ }
- UTIL = HBaseTestingUtility.createLocalHTU();
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+ private void init(long mobThreshold) throws Exception {
+ this.mobCellThreshold = mobThreshold;
conf = UTIL.getConfiguration();
compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
-
htd = UTIL.createTableDescriptor(name.getMethodName());
hcd = new HColumnDescriptor(COLUMN_FAMILY);
hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
@@ -158,7 +169,8 @@ public class TestMobCompaction {
region.getTableDesc().getFamily(COLUMN_FAMILY).setValue(
MobConstants.MOB_THRESHOLD, Bytes.toBytes(500L));
region.initialize();
- region.compactStores(true);
+ region.compactStores();
+
assertEquals("After compaction: store files", 1, countStoreFiles());
assertEquals("After compaction: mob file count", compactionThreshold, countMobFiles());
assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles());
@@ -307,7 +319,7 @@ public class TestMobCompaction {
if (!MobUtils.isMobReferenceCell(kv)) {
continue;
}
- if (!MobUtils.isValidMobRefCellValue(kv)) {
+ if (!MobUtils.hasValidMobRefCellValue(kv)) {
continue;
}
int size = MobUtils.getMobValueLength(kv);
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
index 69b9c8f..87147d1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
@@ -22,12 +22,14 @@ import java.io.IOException;
import java.util.List;
import java.util.Random;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.TableName;
@@ -40,6 +42,8 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -120,6 +124,7 @@ public class TestMobStoreScanner {
testGetFromMemStore(false);
testGetReferences(false);
testMobThreshold(false);
+ testGetFromArchive(false);
}
@Test
@@ -128,6 +133,7 @@ public class TestMobStoreScanner {
testGetFromMemStore(true);
testGetReferences(true);
testMobThreshold(true);
+ testGetFromArchive(true);
}
public void testGetFromFiles(boolean reversed) throws Exception {
@@ -282,6 +288,72 @@ public class TestMobStoreScanner {
results.close();
}
+ public void testGetFromArchive(boolean reversed) throws Exception {
+ String TN = "testGetFromArchive" + reversed;
+ setUp(defaultThreshold, TN);
+ long ts1 = System.currentTimeMillis();
+ long ts2 = ts1 + 1;
+ long ts3 = ts1 + 2;
+ byte [] value = generateMobValue((int)defaultThreshold+1);;
+ // Put some data
+ Put put1 = new Put(row1);
+ put1.add(family, qf1, ts3, value);
+ put1.add(family, qf2, ts2, value);
+ put1.add(family, qf3, ts1, value);
+ table.put(put1);
+
+ table.flushCommits();
+ admin.flush(TN);
+
+ // Get the files in the mob path
+ Path mobFamilyPath;
+ mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(),
+ TableName.valueOf(TN)), hcd.getNameAsString());
+ FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
+ FileStatus[] files = fs.listStatus(mobFamilyPath);
+
+ // Get the archive path
+ Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration());
+ Path tableDir = FSUtils.getTableDir(rootDir, TableName.valueOf(TN));
+ HRegionInfo regionInfo = MobUtils.getMobRegionInfo(TableName.valueOf(TN));
+ Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(),
+ regionInfo, tableDir, family);
+
+ // Move the files from mob path to archive path
+ fs.mkdirs(storeArchiveDir);
+ int fileCount = 0;
+ for(FileStatus file : files) {
+ fileCount++;
+ Path filePath = file.getPath();
+ Path src = new Path(mobFamilyPath, filePath.getName());
+ Path dst = new Path(storeArchiveDir, filePath.getName());
+ fs.rename(src, dst);
+ }
+
+ // Verify the moving success
+ FileStatus[] files1 = fs.listStatus(mobFamilyPath);
+ Assert.assertEquals(0, files1.length);
+ FileStatus[] files2 = fs.listStatus(storeArchiveDir);
+ Assert.assertEquals(fileCount, files2.length);
+
+ // Scan from archive
+ Scan scan = new Scan();
+ setScan(scan, reversed, false);
+ ResultScanner results = table.getScanner(scan);
+ int count = 0;
+ for (Result res : results) {
+ List<Cell> cells = res.listCells();
+ for(Cell cell : cells) {
+ // Verify the value
+ Assert.assertEquals(Bytes.toString(value),
+ Bytes.toString(CellUtil.cloneValue(cell)));
+ count++;
+ }
+ }
+ results.close();
+ Assert.assertEquals(3, count);
+ }
+
/**
* Assert the value is not store in mob.
*/
[2/2] git commit: HBASE-11644 External MOB compaction tools
(Jingcheng Du)
Posted by jm...@apache.org.
HBASE-11644 External MOB compaction tools (Jingcheng Du)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/84e957c8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/84e957c8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/84e957c8
Branch: refs/heads/hbase-11339
Commit: 84e957c875ae971578a5b147775445368ea26188
Parents: 9cf46dc
Author: Jonathan M Hsieh <jm...@apache.org>
Authored: Fri Sep 19 03:02:06 2014 -0700
Committer: Jonathan M Hsieh <jm...@apache.org>
Committed: Fri Sep 19 03:06:33 2014 -0700
----------------------------------------------------------------------
.../src/main/resources/hbase-default.xml | 34 ++
.../master/ExpiredMobFileCleanerChore.java | 70 +++
.../org/apache/hadoop/hbase/master/HMaster.java | 7 +
.../hadoop/hbase/mob/DefaultMobCompactor.java | 2 +-
.../hadoop/hbase/mob/ExpiredMobFileCleaner.java | 120 ++++
.../apache/hadoop/hbase/mob/MobConstants.java | 22 +
.../org/apache/hadoop/hbase/mob/MobUtils.java | 285 +++++++++-
.../apache/hadoop/hbase/mob/MobZookeeper.java | 270 +++++++++
.../hbase/mob/mapreduce/MemStoreWrapper.java | 184 +++++++
.../mapreduce/MobFilePathHashPartitioner.java | 41 ++
.../hadoop/hbase/mob/mapreduce/SweepJob.java | 550 +++++++++++++++++++
.../mob/mapreduce/SweepJobNodeTracker.java | 74 +++
.../hadoop/hbase/mob/mapreduce/SweepMapper.java | 84 +++
.../hbase/mob/mapreduce/SweepReducer.java | 506 +++++++++++++++++
.../hadoop/hbase/mob/mapreduce/Sweeper.java | 108 ++++
.../hadoop/hbase/regionserver/HMobStore.java | 180 +++++-
.../regionserver/MobReferenceOnlyFilter.java | 42 ++
.../hbase/mob/TestExpiredMobFileCleaner.java | 180 ++++++
.../hbase/mob/mapreduce/TestMobSweepJob.java | 168 ++++++
.../hbase/mob/mapreduce/TestMobSweepMapper.java | 100 ++++
.../mob/mapreduce/TestMobSweepReducer.java | 207 +++++++
.../hbase/mob/mapreduce/TestMobSweeper.java | 306 +++++++++++
.../hbase/regionserver/TestMobCompaction.java | 26 +-
.../hbase/regionserver/TestMobStoreScanner.java | 72 +++
24 files changed, 3594 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 872bbc5..647defd 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1483,4 +1483,38 @@ possible configurations would overwhelm and obscure the important.
The default value is 0.5f.
</description>
</property>
+ <property>
+ <name>hbase.mob.sweep.tool.compaction.ratio</name>
+ <value>0.5f</value>
+ <description>
+ If there're too many cells deleted in a mob file, it's regarded
+ as an invalid file and needs to be merged.
+ If existingCellsSize/mobFileSize is less than ratio, it's regarded
+ as an invalid file. The default value is 0.5f.
+ </description>
+ </property>
+ <property>
+ <name>hbase.mob.sweep.tool.compaction.mergeable.size</name>
+ <value>134217728</value>
+ <description>
+ If the size of a mob file is less than this value, it's regarded as a small
+ file and needs to be merged. The default value is 128MB.
+ </description>
+ </property>
+ <property>
+ <name>hbase.mob.sweep.tool.compaction.memstore.flush.size</name>
+ <value>134217728</value>
+ <description>
+ The flush size for the memstore used by sweep job. Each sweep reducer owns such a memstore.
+ The default value is 128MB.
+ </description>
+ </property>
+ <property>
+ <name>hbase.master.mob.ttl.cleaner.period</name>
+ <value>86400000</value>
+ <description>
+ The period that ExpiredMobFileCleanerChore runs. The unit is millisecond.
+ The default value is one day.
+ </description>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
new file mode 100644
index 0000000..98fe236
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.mob.ExpiredMobFileCleaner;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.Threads;
+
+/**
+ * The Class ExpiredMobFileCleanerChore for running cleaner regularly to remove the expired
+ * mob files.
+ */
+@InterfaceAudience.Private
+public class ExpiredMobFileCleanerChore extends Chore {
+
+ private static final Log LOG = LogFactory.getLog(ExpiredMobFileCleanerChore.class);
+ private final HMaster master;
+ private ExpiredMobFileCleaner cleaner;
+
+ public ExpiredMobFileCleanerChore(HMaster master) {
+ super(master.getServerName() + "-ExpiredMobFileCleanerChore", master.getConfiguration().getInt(
+ MobConstants.MOB_CLEANER_PERIOD, MobConstants.DEFAULT_MOB_CLEANER_PERIOD), master);
+ this.master = master;
+ cleaner = new ExpiredMobFileCleaner();
+ }
+
+ @Override
+ protected void chore() {
+ try {
+ TableDescriptors htds = master.getTableDescriptors();
+ Map<String, HTableDescriptor> map = htds.getAll();
+ for (HTableDescriptor htd : map.values()) {
+ for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
+ if (MobUtils.isMobFamily(hcd) && hcd.getMinVersions() == 0) {
+ cleaner.cleanExpiredMobFiles(htd.getTableName().getNameAsString(), hcd);
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Fail to clean the expired mob files", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 714b5a8..4ff3592 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -208,6 +208,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
CatalogJanitor catalogJanitorChore;
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
+ private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
MasterCoprocessorHost cpHost;
@@ -610,6 +611,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
// master initialization. See HBASE-5916.
this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
+ this.expiredMobFileCleanerChore = new ExpiredMobFileCleanerChore(this);
+ Threads.setDaemonThreadRunning(expiredMobFileCleanerChore.getThread());
+
if (this.cpHost != null) {
// don't let cp initialization errors kill the master
try {
@@ -856,6 +860,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
}
private void stopChores() {
+ if (this.expiredMobFileCleanerChore != null) {
+ this.expiredMobFileCleanerChore.interrupt();
+ }
if (this.balancerChore != null) {
this.balancerChore.interrupt();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
index 5f13502..fd35a15 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
@@ -152,7 +152,7 @@ public class DefaultMobCompactor extends DefaultCompactor {
// to the store file.
writer.append(kv);
} else if (MobUtils.isMobReferenceCell(kv)) {
- if (MobUtils.isValidMobRefCellValue(kv)) {
+ if (MobUtils.hasValidMobRefCellValue(kv)) {
int size = MobUtils.getMobValueLength(kv);
if (size > mobSizeThreshold) {
// If the value size is larger than the threshold, it's regarded as a mob. Since
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
new file mode 100644
index 0000000..d3c11ad
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
@@ -0,0 +1,120 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.google.protobuf.ServiceException;
+
+/**
+ * The cleaner to delete the expired MOB files.
+ */
+@InterfaceAudience.Private
+public class ExpiredMobFileCleaner extends Configured implements Tool {
+
+ private static final Log LOG = LogFactory.getLog(ExpiredMobFileCleaner.class);
+ /**
+ * Cleans the MOB files when they're expired and their min versions are 0.
+ * If the latest timestamp of Cells in a MOB file is older than the TTL in the column family,
+ * it's regarded as expired. This cleaner deletes them.
+ * At a time T0, the cells in a mob file M0 are expired. If a user starts a scan before T0, those
+ * mob cells are visible, this scan still runs after T0. At that time T1, this mob file M0
+ * is expired, meanwhile a cleaner starts, the M0 is archived and can be read in the archive
+ * directory.
+ * @param tableName The current table name.
+ * @param family The current family.
+ * @throws ServiceException
+ * @throws IOException
+ */
+ public void cleanExpiredMobFiles(String tableName, HColumnDescriptor family)
+ throws ServiceException, IOException {
+ Configuration conf = getConf();
+ TableName tn = TableName.valueOf(tableName);
+ FileSystem fs = FileSystem.get(conf);
+ LOG.info("Cleaning the expired MOB files of " + family.getNameAsString() + " in " + tableName);
+ // disable the block cache.
+ Configuration copyOfConf = new Configuration(conf);
+ copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
+ CacheConfig cacheConfig = new CacheConfig(copyOfConf);
+ MobUtils.cleanExpiredMobFiles(fs, conf, tn, family, cacheConfig,
+ EnvironmentEdgeManager.currentTime());
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ ToolRunner.run(conf, new ExpiredMobFileCleaner(), args);
+ }
+
+ private void printUsage() {
+ System.err.println("Usage:\n" + "--------------------------\n"
+ + ExpiredMobFileCleaner.class.getName() + " tableName familyName");
+ System.err.println(" tableName The table name");
+ System.err.println(" familyName The column family name");
+ }
+
+ public int run(String[] args) throws Exception {
+ if (args.length != 2) {
+ printUsage();
+ return 1;
+ }
+ String tableName = args[0];
+ String familyName = args[1];
+ TableName tn = TableName.valueOf(tableName);
+ HBaseAdmin.checkHBaseAvailable(getConf());
+ HBaseAdmin admin = new HBaseAdmin(getConf());
+ try {
+ HTableDescriptor htd = admin.getTableDescriptor(tn);
+ HColumnDescriptor family = htd.getFamily(Bytes.toBytes(familyName));
+ if (family == null || !MobUtils.isMobFamily(family)) {
+ throw new IOException("Column family " + familyName + " is not a MOB column family");
+ }
+ if (family.getMinVersions() > 0) {
+ throw new IOException(
+ "The minVersions of the column family is not 0, could not be handled by this cleaner");
+ }
+ cleanExpiredMobFiles(tableName, family);
+ return 0;
+ } finally {
+ try {
+ admin.close();
+ } catch (IOException e) {
+ LOG.error("Fail to close the HBaseAdmin.", e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
index 9978afd..4e3e7c8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
@@ -38,6 +38,7 @@ public class MobConstants {
public static final String MOB_SCAN_RAW = "hbase.mob.scan.raw";
public static final String MOB_CACHE_BLOCKS = "hbase.mob.cache.blocks";
+ public static final String MOB_SCAN_REF_ONLY = "hbase.mob.scan.ref.only";
public static final String MOB_FILE_CACHE_SIZE_KEY = "hbase.mob.file.cache.size";
public static final int DEFAULT_MOB_FILE_CACHE_SIZE = 1000;
@@ -46,6 +47,26 @@ public class MobConstants {
public static final String MOB_REGION_NAME = ".mob";
public static final byte[] MOB_REGION_NAME_BYTES = Bytes.toBytes(MOB_REGION_NAME);
+ public static final String MOB_CLEANER_PERIOD = "hbase.master.mob.ttl.cleaner.period";
+ public static final int DEFAULT_MOB_CLEANER_PERIOD = 24 * 60 * 60 * 1000; // one day
+
+ public static final String MOB_SWEEP_TOOL_COMPACTION_START_DATE =
+ "hbase.mob.sweep.tool.compaction.start.date";
+ public static final String MOB_SWEEP_TOOL_COMPACTION_RATIO =
+ "hbase.mob.sweep.tool.compaction.ratio";
+ public static final String MOB_SWEEP_TOOL_COMPACTION_MERGEABLE_SIZE =
+ "hbase.mob.sweep.tool.compaction.mergeable.size";
+
+ public static final float DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO = 0.5f;
+ public static final long DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE = 128 * 1024 * 1024;
+
+ public static final String MOB_SWEEP_TOOL_COMPACTION_TEMP_DIR_NAME = "mobcompaction";
+
+ public static final String MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE =
+ "hbase.mob.sweep.tool.compaction.memstore.flush.size";
+ public static final long DEFAULT_MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE =
+ 1024 * 1024 * 128; // 128M
+
public static final String MOB_CACHE_EVICT_PERIOD = "hbase.mob.cache.evict.period";
public static final String MOB_CACHE_EVICT_REMAIN_RATIO = "hbase.mob.cache.evict.remain.ratio";
public static final Tag MOB_REF_TAG = new Tag(TagType.MOB_REFERENCE_TAG_TYPE,
@@ -55,6 +76,7 @@ public class MobConstants {
public static final long DEFAULT_MOB_CACHE_EVICT_PERIOD = 3600l;
public final static String TEMP_DIR_NAME = ".tmp";
+ public final static String EMPTY_STRING = "";
private MobConstants() {
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
index e52d336..e49d3ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -18,13 +18,22 @@
*/
package org.apache.hadoop.hbase.mob;
+import java.io.FileNotFoundException;
+import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Date;
import java.util.List;
+import java.util.UUID;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -34,7 +43,16 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -44,6 +62,9 @@ import org.apache.hadoop.hbase.util.FSUtils;
@InterfaceAudience.Private
public class MobUtils {
+ private static final Log LOG = LogFactory.getLog(MobUtils.class);
+ private static final String COMPACTION_WORKING_DIR_NAME = "working";
+
private static final ThreadLocal<SimpleDateFormat> LOCAL_FORMAT =
new ThreadLocal<SimpleDateFormat>() {
@Override
@@ -143,6 +164,22 @@ public class MobUtils {
}
/**
+ * Indicates whether it's a reference only scan.
+ * The information is set in the attribute "hbase.mob.scan.ref.only" of scan.
+ * If it's a ref only scan, only the cells with ref tag are returned.
+ * @param scan The current scan.
+ * @return True if it's a ref only scan.
+ */
+ public static boolean isRefOnlyScan(Scan scan) {
+ byte[] refOnly = scan.getAttribute(MobConstants.MOB_SCAN_REF_ONLY);
+ try {
+ return refOnly != null && Bytes.toBoolean(refOnly);
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ }
+
+ /**
* Indicates whether the scan contains the information of caching blocks.
* The information is set in the attribute "hbase.mob.cache.blocks" of scan.
* @param scan The current scan.
@@ -172,6 +209,91 @@ public class MobUtils {
}
/**
+ * Cleans the expired mob files.
+ * Cleans the files whose creation date is older than (current - columnFamily.ttl), and
+ * the minVersions of that column family is 0.
+ * @param fs The current file system.
+ * @param conf The current configuration.
+ * @param tableName The current table name.
+ * @param columnDescriptor The descriptor of the current column family.
+ * @param cacheConfig The cacheConfig that disables the block cache.
+ * @param current The current time.
+ * @throws IOException
+ */
+ public static void cleanExpiredMobFiles(FileSystem fs, Configuration conf, TableName tableName,
+ HColumnDescriptor columnDescriptor, CacheConfig cacheConfig, long current)
+ throws IOException {
+ long timeToLive = columnDescriptor.getTimeToLive();
+ if (Integer.MAX_VALUE == timeToLive) {
+ // no need to clean, because the TTL is not set.
+ return;
+ }
+
+ Date expireDate = new Date(current - timeToLive * 1000);
+ expireDate = new Date(expireDate.getYear(), expireDate.getMonth(), expireDate.getDate());
+ LOG.info("MOB HFiles older than " + expireDate.toGMTString() + " will be deleted!");
+
+ FileStatus[] stats = null;
+ Path mobTableDir = FSUtils.getTableDir(getMobHome(conf), tableName);
+ Path path = getMobFamilyPath(conf, tableName, columnDescriptor.getNameAsString());
+ try {
+ stats = fs.listStatus(path);
+ } catch (FileNotFoundException e) {
+ LOG.warn("Fail to find the mob file " + path, e);
+ }
+ if (null == stats) {
+ // no file found
+ return;
+ }
+ List<StoreFile> filesToClean = new ArrayList<StoreFile>();
+ int deletedFileCount = 0;
+ for (FileStatus file : stats) {
+ String fileName = file.getPath().getName();
+ try {
+ MobFileName mobFileName = null;
+ if (!HFileLink.isHFileLink(file.getPath())) {
+ mobFileName = MobFileName.create(fileName);
+ } else {
+ HFileLink hfileLink = new HFileLink(conf, file.getPath());
+ mobFileName = MobFileName.create(hfileLink.getOriginPath().getName());
+ }
+ Date fileDate = parseDate(mobFileName.getDate());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checking file " + fileName);
+ }
+ if (fileDate.getTime() < expireDate.getTime()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(fileName + " is an expired file");
+ }
+ filesToClean.add(new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE));
+ }
+ } catch (Exception e) {
+ LOG.error("Cannot parse the fileName " + fileName, e);
+ }
+ }
+ if (!filesToClean.isEmpty()) {
+ try {
+ removeMobFiles(conf, fs, tableName, mobTableDir, columnDescriptor.getName(),
+ filesToClean);
+ deletedFileCount = filesToClean.size();
+ } catch (IOException e) {
+ LOG.error("Fail to delete the mob files " + filesToClean, e);
+ }
+ }
+ LOG.info(deletedFileCount + " expired mob files are deleted");
+ }
+
+ /**
+ * Gets the znode name of column family.
+ * @param tableName The current table name.
+ * @param familyName The name of the current column family.
+ * @return The znode name of column family.
+ */
+ public static String getColumnFamilyZNodeName(String tableName, String familyName) {
+ return tableName + ":" + familyName;
+ }
+
+ /**
* Gets the root dir of the mob files.
* It's {HBASE_DIR}/mobdir.
* @param conf The current configuration.
@@ -183,6 +305,19 @@ public class MobUtils {
}
/**
+ * Gets the qualified root dir of the mob files.
+ * @param conf The current configuration.
+ * @return The qualified root dir.
+ * @throws IOException
+ */
+ public static Path getQualifiedMobRootDir(Configuration conf) throws IOException {
+ Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
+ Path mobRootDir = new Path(hbaseDir, MobConstants.MOB_DIR_NAME);
+ FileSystem fs = mobRootDir.getFileSystem(conf);
+ return mobRootDir.makeQualified(fs);
+ }
+
+ /**
* Gets the region dir of the mob files.
* It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}.
* @param conf The current configuration.
@@ -190,7 +325,7 @@ public class MobUtils {
* @return The region dir of the mob files.
*/
public static Path getMobRegionPath(Configuration conf, TableName tableName) {
- Path tablePath = FSUtils.getTableDir(MobUtils.getMobHome(conf), tableName);
+ Path tablePath = FSUtils.getTableDir(getMobHome(conf), tableName);
HRegionInfo regionInfo = getMobRegionInfo(tableName);
return new Path(tablePath, regionInfo.getEncodedName());
}
@@ -232,36 +367,160 @@ public class MobUtils {
}
/**
+ * Gets whether the current HRegionInfo is a mob one.
+ * @param regionInfo The current HRegionInfo.
+ * @return If true, the current HRegionInfo is a mob one.
+ */
+ public static boolean isMobRegionInfo(HRegionInfo regionInfo) {
+ return regionInfo == null ? false : getMobRegionInfo(regionInfo.getTable()).getEncodedName()
+ .equals(regionInfo.getEncodedName());
+ }
+
+ /**
+ * Gets the working directory of the mob compaction.
+ * @param root The root directory of the mob compaction.
+ * @param jobName The current job name.
+ * @return The directory of the mob compaction for the current job.
+ */
+ public static Path getCompactionWorkingPath(Path root, String jobName) {
+ Path parent = new Path(root, jobName);
+ return new Path(parent, COMPACTION_WORKING_DIR_NAME);
+ }
+
+ /**
+ * Archives the mob files.
+ * @param conf The current configuration.
+ * @param fs The current file system.
+ * @param tableName The table name.
+ * @param tableDir The table directory.
+ * @param family The name of the column family.
+ * @param storeFiles The files to be deleted.
+ * @throws IOException
+ */
+ public static void removeMobFiles(Configuration conf, FileSystem fs, TableName tableName,
+ Path tableDir, byte[] family, Collection<StoreFile> storeFiles) throws IOException {
+ HFileArchiver.archiveStoreFiles(conf, fs, getMobRegionInfo(tableName), tableDir, family,
+ storeFiles);
+ }
+
+ /**
* Creates a mob reference KeyValue.
* The value of the mob reference KeyValue is mobCellValueSize + mobFileName.
- * @param kv The original KeyValue.
+ * @param cell The original Cell.
* @param fileName The mob file name where the mob reference KeyValue is written.
* @param tableNameTag The tag of the current table name. It's very important in
* cloning the snapshot.
* @return The mob reference KeyValue.
*/
- public static KeyValue createMobRefKeyValue(KeyValue kv, byte[] fileName, Tag tableNameTag) {
+ public static KeyValue createMobRefKeyValue(Cell cell, byte[] fileName, Tag tableNameTag) {
// Append the tags to the KeyValue.
// The key is same, the value is the filename of the mob file
- List<Tag> existingTags = Tag.asList(kv.getTagsArray(), kv.getTagsOffset(), kv.getTagsLength());
- existingTags.add(MobConstants.MOB_REF_TAG);
+ List<Tag> tags = new ArrayList<Tag>();
+ // Add the ref tag as the 1st one.
+ tags.add(MobConstants.MOB_REF_TAG);
+ // Add the existing tags.
+ tags.addAll(Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()));
// Add the tag of the source table name, this table is where this mob file is flushed
// from.
// It's very useful in cloning the snapshot. When reading from the cloning table, we need to
// find the original mob files by this table name. For details please see cloning
// snapshot for mob files.
- existingTags.add(tableNameTag);
- int valueLength = kv.getValueLength();
+ tags.add(tableNameTag);
+ int valueLength = cell.getValueLength();
byte[] refValue = Bytes.add(Bytes.toBytes(valueLength), fileName);
- KeyValue reference = new KeyValue(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
- kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(),
- kv.getQualifierOffset(), kv.getQualifierLength(), kv.getTimestamp(), KeyValue.Type.Put,
- refValue, 0, refValue.length, existingTags);
- reference.setSequenceId(kv.getSequenceId());
+ KeyValue reference = new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+ cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+ cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+ cell.getTimestamp(), KeyValue.Type.Put, refValue, 0, refValue.length, tags);
+ reference.setSequenceId(cell.getSequenceId());
return reference;
}
/**
+ * Creates a directory of mob files for flushing.
+ * @param conf The current configuration.
+ * @param fs The current file system.
+ * @param family The descriptor of the current column family.
+ * @param date The date string, its format is yyyymmmdd.
+ * @param basePath The basic path for a temp directory.
+ * @param maxKeyCount The key count.
+ * @param compression The compression algorithm.
+ * @param startKey The hex string of the start key.
+ * @param cacheConfig The current cache config.
+ * @return The writer for the mob file.
+ * @throws IOException
+ */
+ public static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
+ HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
+ Compression.Algorithm compression, String startKey, CacheConfig cacheConfig)
+ throws IOException {
+ MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString()
+ .replaceAll("-", ""));
+ HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
+ .withIncludesMvcc(false).withIncludesTags(true)
+ .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
+ .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
+ .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true)
+ .withDataBlockEncoding(family.getDataBlockEncoding()).build();
+
+ StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs)
+ .withFilePath(new Path(basePath, mobFileName.getFileName()))
+ .withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE)
+ .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
+ return w;
+ }
+
+ /**
+ * Commits the mob file.
+ * @param @param conf The current configuration.
+ * @param fs The current file system.
+ * @param path The path where the mob file is saved.
+ * @param targetPath The directory path where the source file is renamed to.
+ * @param cacheConfig The current cache config.
+ * @throws IOException
+ */
+ public static void commitFile(Configuration conf, FileSystem fs, final Path sourceFile,
+ Path targetPath, CacheConfig cacheConfig) throws IOException {
+ if (sourceFile == null) {
+ return;
+ }
+ Path dstPath = new Path(targetPath, sourceFile.getName());
+ validateMobFile(conf, fs, sourceFile, cacheConfig);
+ String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
+ LOG.info(msg);
+ Path parent = dstPath.getParent();
+ if (!fs.exists(parent)) {
+ fs.mkdirs(parent);
+ }
+ if (!fs.rename(sourceFile, dstPath)) {
+ throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
+ }
+ }
+
+ /**
+ * Validates a mob file by opening and closing it.
+ * @param conf The current configuration.
+ * @param fs The current file system.
+ * @param path The path where the mob file is saved.
+ * @param cacheConfig The current cache config.
+ */
+ private static void validateMobFile(Configuration conf, FileSystem fs, Path path,
+ CacheConfig cacheConfig) throws IOException {
+ StoreFile storeFile = null;
+ try {
+ storeFile = new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE);
+ storeFile.createReader();
+ } catch (IOException e) {
+ LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e);
+ throw e;
+ } finally {
+ if (storeFile != null) {
+ storeFile.closeReader(false);
+ }
+ }
+ }
+
+ /**
* Indicates whether the current mob ref cell has a valid value.
* A mob ref cell has a mob reference tag.
* The value of a mob ref cell consists of two parts, real mob value length and mob file name.
@@ -270,7 +529,7 @@ public class MobUtils {
* @param cell The mob ref cell.
* @return True if the cell has a valid value.
*/
- public static boolean isValidMobRefCellValue(Cell cell) {
+ public static boolean hasValidMobRefCellValue(Cell cell) {
return cell.getValueLength() > Bytes.SIZEOF_INT;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java
new file mode 100644
index 0000000..a9557d7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java
@@ -0,0 +1,270 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * The zookeeper used for MOB.
+ * This zookeeper is used to synchronize the HBase major compaction and sweep tool.
+ * The structure of the nodes for mob in zookeeper.
+ * |--baseNode
+ * |--MOB
+ * |--tableName:columnFamilyName-lock // locks for the mob column family
+ * |--tableName:columnFamilyName-sweeper // when a sweep tool runs, such a node is added
+ * |--tableName:columnFamilyName-majorCompaction
+ * |--UUID //when a major compaction occurs, such a node is added.
+ * In order to synchronize the operations between the sweep tool and HBase major compaction, these
+ * actions need to acquire the tableName:columnFamilyName-lock before the sweep tool and major
+ * compaction run.
+ * In sweep tool.
+ * 1. If it acquires the lock successfully. It check whether the sweeper node exists, if exist the
+ * current running is aborted. If not it it checks whether there're major compaction nodes, if yes
+ * the current running is aborted, if not it adds a sweep node to the zookeeper.
+ * 2. If it could not obtain the lock, the current running is aborted.
+ * In the HBase compaction.
+ * 1. If it's a minor compaction, continue the compaction.
+ * 2. If it's a major compaction, it acquires a lock in zookeeper.
+ * A. If it obtains the lock, it checks whether there's sweep node, if yes it converts itself
+ * to a minor one and continue, if no it adds a major compaction node to the zookeeper.
+ * B. If it could not obtain the lock, it converts itself to a minor one and continue the
+ * compaction.
+ */
+@InterfaceAudience.Private
+public class MobZookeeper {
+ // TODO Will remove this class before the mob is merged back to master.
+ private static final Log LOG = LogFactory.getLog(MobZookeeper.class);
+
+ private ZooKeeperWatcher zkw;
+ private String mobZnode;
+ private static final String LOCK_EPHEMERAL = "-lock";
+ private static final String SWEEPER_EPHEMERAL = "-sweeper";
+ private static final String MAJOR_COMPACTION_EPHEMERAL = "-majorCompaction";
+
+ private MobZookeeper(Configuration conf, String identifier) throws IOException,
+ KeeperException {
+ this.zkw = new ZooKeeperWatcher(conf, identifier, new DummyMobAbortable());
+ mobZnode = ZKUtil.joinZNode(zkw.baseZNode, "MOB");
+ if (ZKUtil.checkExists(zkw, mobZnode) == -1) {
+ ZKUtil.createWithParents(zkw, mobZnode);
+ }
+ }
+
+ /**
+ * Creates an new instance of MobZookeeper.
+ * @param conf The current configuration.
+ * @param identifier string that is passed to RecoverableZookeeper to be used as
+ * identifier for this instance.
+ * @return A new instance of MobZookeeper.
+ * @throws IOException
+ * @throws KeeperException
+ */
+ public static MobZookeeper newInstance(Configuration conf, String identifier) throws IOException,
+ KeeperException {
+ return new MobZookeeper(conf, identifier);
+ }
+
+ /**
+ * Acquire a lock on the current column family.
+ * All the threads try to access the column family acquire a lock which is actually create an
+ * ephemeral node in the zookeeper.
+ * @param tableName The current table name.
+ * @param familyName The current column family name.
+ * @return True if the lock is obtained successfully. Otherwise false is returned.
+ */
+ public boolean lockColumnFamily(String tableName, String familyName) {
+ String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+ boolean locked = false;
+ try {
+ locked = ZKUtil.createEphemeralNodeAndWatch(zkw,
+ ZKUtil.joinZNode(mobZnode, znodeName + LOCK_EPHEMERAL), null);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(locked ? "Locked the column family " + znodeName
+ : "Can not lock the column family " + znodeName);
+ }
+ } catch (KeeperException e) {
+ LOG.error("Fail to lock the column family " + znodeName, e);
+ }
+ return locked;
+ }
+
+ /**
+ * Release the lock on the current column family.
+ * @param tableName The current table name.
+ * @param familyName The current column family name.
+ */
+ public void unlockColumnFamily(String tableName, String familyName) {
+ String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unlocking the column family " + znodeName);
+ }
+ try {
+ ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(mobZnode, znodeName + LOCK_EPHEMERAL));
+ } catch (KeeperException e) {
+ LOG.warn("Fail to unlock the column family " + znodeName, e);
+ }
+ }
+
+ /**
+ * Adds a node to zookeeper which indicates that a sweep tool is running.
+ * @param tableName The current table name.
+ * @param familyName The current columnFamilyName name.
+ * @param data the data of the ephemeral node.
+ * @return True if the node is created successfully. Otherwise false is returned.
+ */
+ public boolean addSweeperZNode(String tableName, String familyName, byte[] data) {
+ boolean add = false;
+ String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+ try {
+ add = ZKUtil.createEphemeralNodeAndWatch(zkw,
+ ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL), data);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(add ? "Added a znode for sweeper " + znodeName
+ : "Cannot add a znode for sweeper " + znodeName);
+ }
+ } catch (KeeperException e) {
+ LOG.error("Fail to add a znode for sweeper " + znodeName, e);
+ }
+ return add;
+ }
+
+ /**
+ * Gets the path of the sweeper znode in zookeeper.
+ * @param tableName The current table name.
+ * @param familyName The current columnFamilyName name.
+ * @return The path of the sweeper znode in zookeper.
+ */
+ public String getSweeperZNodePath(String tableName, String familyName) {
+ String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+ return ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL);
+ }
+
+ /**
+ * Deletes the node from zookeeper which indicates that a sweep tool is finished.
+ * @param tableName The current table name.
+ * @param familyName The current column family name.
+ */
+ public void deleteSweeperZNode(String tableName, String familyName) {
+ String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+ try {
+ ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL));
+ } catch (KeeperException e) {
+ LOG.error("Fail to delete a znode for sweeper " + znodeName, e);
+ }
+ }
+
+ /**
+ * Checks whether the znode exists in the Zookeeper.
+ * If the node exists, it means a sweep tool is running.
+ * Otherwise, the sweep tool is not.
+ * @param tableName The current table name.
+ * @param familyName The current column family name.
+ * @return True if this node doesn't exist. Otherwise false is returned.
+ * @throws KeeperException
+ */
+ public boolean isSweeperZNodeExist(String tableName, String familyName) throws KeeperException {
+ String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+ return ZKUtil.checkExists(zkw, ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL)) >= 0;
+ }
+
+ /**
+ * Checks whether there're major compactions nodes in the zookeeper.
+ * If there're such nodes, it means there're major compactions in progress now.
+ * Otherwise there're not.
+ * @param tableName The current table name.
+ * @param familyName The current column family name.
+ * @return True if there're major compactions in progress. Otherwise false is returned.
+ * @throws KeeperException
+ */
+ public boolean hasMajorCompactionChildren(String tableName, String familyName)
+ throws KeeperException {
+ String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+ String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL);
+ List<String> children = ZKUtil.listChildrenNoWatch(zkw, mcPath);
+ return children != null && !children.isEmpty();
+ }
+
+ /**
+ * Creates a node of a major compaction to the Zookeeper.
+ * Before a HBase major compaction, such a node is created to the Zookeeper. It tells others that
+ * there're major compaction in progress, the sweep tool could not be run at this time.
+ * @param tableName The current table name.
+ * @param familyName The current column family name.
+ * @param compactionName The current compaction name.
+ * @return True if the node is created successfully. Otherwise false is returned.
+ * @throws KeeperException
+ */
+ public boolean addMajorCompactionZNode(String tableName, String familyName,
+ String compactionName) throws KeeperException {
+ String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+ String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL);
+ ZKUtil.createNodeIfNotExistsAndWatch(zkw, mcPath, null);
+ String eachMcPath = ZKUtil.joinZNode(mcPath, compactionName);
+ return ZKUtil.createEphemeralNodeAndWatch(zkw, eachMcPath, null);
+ }
+
+ /**
+ * Deletes a major compaction node from the Zookeeper.
+ * @param tableName The current table name.
+ * @param familyName The current column family name.
+ * @param compactionName The current compaction name.
+ * @throws KeeperException
+ */
+ public void deleteMajorCompactionZNode(String tableName, String familyName,
+ String compactionName) throws KeeperException {
+ String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+ String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL);
+ String eachMcPath = ZKUtil.joinZNode(mcPath, compactionName);
+ ZKUtil.deleteNode(zkw, eachMcPath);
+ }
+
+ /**
+ * Closes the MobZookeeper.
+ */
+ public void close() {
+ this.zkw.close();
+ }
+
+ /**
+ * An dummy abortable. It's used for the MobZookeeper.
+ */
+ public static class DummyMobAbortable implements Abortable {
+
+ private boolean abort = false;
+
+ public void abort(String why, Throwable e) {
+ abort = true;
+ }
+
+ public boolean isAborted() {
+ return abort;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
new file mode 100644
index 0000000..b0d4c9d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
@@ -0,0 +1,184 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepReducer.SweepPartitionId;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.MemStore;
+import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Reducer.Context;
+
+/**
+ * The wrapper of a DefaultMemStore.
+ * This wrapper is used in the sweep reducer to buffer and sort the cells written from
+ * the invalid and small mob files.
+ * It's flushed when it's full, the mob data are written to the mob files, and their file names
+ * are written back to store files of HBase.
+ * This memStore is used to sort the cells in mob files.
+ * In a reducer of sweep tool, the mob files are grouped by the same prefix (start key and date),
+ * in each group, the reducer iterates the files and read the cells to a new and bigger mob file.
+ * The cells in the same mob file are ordered, but cells across mob files are not.
+ * So we need this MemStoreWrapper to sort those cells come from different mob files before
+ * flushing them to the disk, when the memStore is big enough it's flushed as a new mob file.
+ */
+@InterfaceAudience.Private
+public class MemStoreWrapper {
+
+ private static final Log LOG = LogFactory.getLog(MemStoreWrapper.class);
+
+ private MemStore memstore;
+ private long flushSize;
+ private SweepPartitionId partitionId;
+ private Context context;
+ private Configuration conf;
+ private HTable table;
+ private HColumnDescriptor hcd;
+ private Path mobFamilyDir;
+ private FileSystem fs;
+ private CacheConfig cacheConfig;
+
+ public MemStoreWrapper(Context context, FileSystem fs, HTable table, HColumnDescriptor hcd,
+ MemStore memstore, CacheConfig cacheConfig) throws IOException {
+ this.memstore = memstore;
+ this.context = context;
+ this.fs = fs;
+ this.table = table;
+ this.hcd = hcd;
+ this.conf = context.getConfiguration();
+ this.cacheConfig = cacheConfig;
+ flushSize = this.conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE,
+ MobConstants.DEFAULT_MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE);
+ mobFamilyDir = MobUtils.getMobFamilyPath(conf, table.getName(), hcd.getNameAsString());
+ }
+
+ public void setPartitionId(SweepPartitionId partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ /**
+ * Flushes the memstore if the size is large enough.
+ * @throws IOException
+ */
+ private void flushMemStoreIfNecessary() throws IOException {
+ if (memstore.heapSize() >= flushSize) {
+ flushMemStore();
+ }
+ }
+
+ /**
+ * Flushes the memstore anyway.
+ * @throws IOException
+ */
+ public void flushMemStore() throws IOException {
+ MemStoreSnapshot snapshot = memstore.snapshot();
+ internalFlushCache(snapshot);
+ memstore.clearSnapshot(snapshot.getId());
+ }
+
+ /**
+ * Flushes the snapshot of the memstore.
+ * Flushes the mob data to the mob files, and flushes the name of these mob files to HBase.
+ * @param snapshot The snapshot of the memstore.
+ * @throws IOException
+ */
+ private void internalFlushCache(final MemStoreSnapshot snapshot)
+ throws IOException {
+ if (snapshot.getSize() == 0) {
+ return;
+ }
+ // generate the files into a temp directory.
+ String tempPathString = context.getConfiguration().get(SweepJob.WORKING_FILES_DIR_KEY);
+ StoreFile.Writer mobFileWriter = MobUtils.createWriter(conf, fs, hcd,
+ partitionId.getDate(), new Path(tempPathString), snapshot.getCellsCount(),
+ hcd.getCompactionCompression(), partitionId.getStartKey(), cacheConfig);
+
+ String relativePath = mobFileWriter.getPath().getName();
+ LOG.info("Create files under a temp directory " + mobFileWriter.getPath().toString());
+
+ byte[] referenceValue = Bytes.toBytes(relativePath);
+ int keyValueCount = 0;
+ KeyValueScanner scanner = snapshot.getScanner();
+ Cell cell = null;
+ while (null != (cell = scanner.next())) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ mobFileWriter.append(kv);
+ keyValueCount++;
+ }
+ scanner.close();
+ // Write out the log sequence number that corresponds to this output
+ // hfile. The hfile is current up to and including logCacheFlushId.
+ mobFileWriter.appendMetadata(Long.MAX_VALUE, false);
+ mobFileWriter.close();
+
+ MobUtils.commitFile(conf, fs, mobFileWriter.getPath(), mobFamilyDir, cacheConfig);
+ context.getCounter(SweepCounter.FILE_AFTER_MERGE_OR_CLEAN).increment(1);
+ // write reference/fileName back to the store files of HBase.
+ scanner = snapshot.getScanner();
+ scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
+ cell = null;
+ Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, this.table.getTableName());
+ while (null != (cell = scanner.next())) {
+ KeyValue reference = MobUtils.createMobRefKeyValue(cell, referenceValue, tableNameTag);
+ Put put =
+ new Put(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength());
+ put.add(reference);
+ table.put(put);
+ context.getCounter(SweepCounter.RECORDS_UPDATED).increment(1);
+ }
+ if (keyValueCount > 0) {
+ table.flushCommits();
+ }
+ scanner.close();
+ }
+
+ /**
+ * Adds a KeyValue into the memstore.
+ * @param kv The KeyValue to be added.
+ * @throws IOException
+ */
+ public void addToMemstore(KeyValue kv) throws IOException {
+ memstore.add(kv);
+ // flush the memstore if it's full.
+ flushMemStoreIfNecessary();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java
new file mode 100644
index 0000000..bdec887
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java
@@ -0,0 +1,41 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.mob.MobFileName;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+/**
+ * The partitioner for the sweep job.
+ * The key is a mob file name. We bucket by date.
+ */
+@InterfaceAudience.Private
+public class MobFilePathHashPartitioner extends Partitioner<Text, KeyValue> {
+
+ @Override
+ public int getPartition(Text fileName, KeyValue kv, int numPartitions) {
+ MobFileName mobFileName = MobFileName.create(fileName.toString());
+ String date = mobFileName.getDate();
+ int hash = date.hashCode();
+ return (hash & Integer.MAX_VALUE) % numPartitions;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
new file mode 100644
index 0000000..1d048bb
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
@@ -0,0 +1,550 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.MobZookeeper;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.JavaSerialization;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * The sweep job.
+ * Run map reduce to merge the smaller mob files into bigger ones and cleans the unused ones.
+ */
+@InterfaceAudience.Private
+public class SweepJob {
+
+ private final FileSystem fs;
+ private final Configuration conf;
+ private static final Log LOG = LogFactory.getLog(SweepJob.class);
+ static final String SWEEP_JOB_ID = "mob.compaction.id";
+ static final String SWEEPER_NODE = "mob.compaction.sweep.node";
+ static final String WORKING_DIR_KEY = "mob.compaction.dir";
+ static final String WORKING_ALLNAMES_FILE_KEY = "mob.compaction.all.file";
+ static final String WORKING_VISITED_DIR_KEY = "mob.compaction.visited.dir";
+ static final String WORKING_ALLNAMES_DIR = "all";
+ static final String WORKING_VISITED_DIR = "visited";
+ public static final String WORKING_FILES_DIR_KEY = "mob.compaction.files.dir";
+ //the MOB_COMPACTION_DELAY is ONE_DAY by default. Its value is only changed when testing.
+ public static final String MOB_COMPACTION_DELAY = "hbase.mob.compaction.delay";
+ protected static long ONE_DAY = 24 * 60 * 60 * 1000;
+ private long compactionStartTime = EnvironmentEdgeManager.currentTime();
+ public final static String CREDENTIALS_LOCATION = "credentials_location";
+ private CacheConfig cacheConfig;
+ static final int SCAN_CACHING = 10000;
+
+ public SweepJob(Configuration conf, FileSystem fs) {
+ this.conf = conf;
+ this.fs = fs;
+ // disable the block cache.
+ Configuration copyOfConf = new Configuration(conf);
+ copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
+ cacheConfig = new CacheConfig(copyOfConf);
+ }
+
+ /**
+ * Runs MapReduce to do the sweeping on the mob files.
+ * There's a MobReferenceOnlyFilter so that the mappers only get the cells that have mob
+ * references from 'normal' regions' rows.
+ * The running of the sweep tool on the same column family are mutually exclusive.
+ * The HBase major compaction and running of the sweep tool on the same column family
+ * are mutually exclusive.
+ * The synchronization is done by the Zookeeper.
+ * So in the beginning of the running, we need to make sure only this sweep tool is the only one
+ * that is currently running in this column family, and in this column family there're no major
+ * compaction in progress.
+ * @param tn The current table name.
+ * @param family The descriptor of the current column family.
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ public void sweep(TableName tn, HColumnDescriptor family) throws IOException,
+ ClassNotFoundException, InterruptedException, KeeperException {
+ Configuration conf = new Configuration(this.conf);
+ // check whether the current user is the same one with the owner of hbase root
+ String currentUserName = UserGroupInformation.getCurrentUser().getShortUserName();
+ FileStatus[] hbaseRootFileStat = fs.listStatus(new Path(conf.get(HConstants.HBASE_DIR)));
+ if (hbaseRootFileStat.length > 0) {
+ String owner = hbaseRootFileStat[0].getOwner();
+ if (!owner.equals(currentUserName)) {
+ String errorMsg = "The current user[" + currentUserName
+ + "] doesn't have hbase root credentials."
+ + " Please make sure the user is the root of the target HBase";
+ LOG.error(errorMsg);
+ throw new IOException(errorMsg);
+ }
+ } else {
+ LOG.error("The target HBase doesn't exist");
+ throw new IOException("The target HBase doesn't exist");
+ }
+ String familyName = family.getNameAsString();
+ String id = "SweepJob" + UUID.randomUUID().toString().replace("-", "");
+ MobZookeeper zk = MobZookeeper.newInstance(conf, id);
+ try {
+ // Try to obtain the lock. Use this lock to synchronize all the query, creation/deletion
+ // in the Zookeeper.
+ if (!zk.lockColumnFamily(tn.getNameAsString(), familyName)) {
+ LOG.warn("Can not lock the store " + familyName
+ + ". The major compaction in HBase may be in-progress. Please re-run the job.");
+ return;
+ }
+ try {
+ // Checks whether there're HBase major compaction now.
+ boolean hasChildren = zk.hasMajorCompactionChildren(tn.getNameAsString(), familyName);
+ if (hasChildren) {
+ LOG.warn("The major compaction in HBase may be in-progress."
+ + " Please re-run the job.");
+ return;
+ } else {
+ // Checks whether there's sweep tool in progress.
+ boolean hasSweeper = zk.isSweeperZNodeExist(tn.getNameAsString(), familyName);
+ if (hasSweeper) {
+ LOG.warn("Another sweep job is running");
+ return;
+ } else {
+ // add the sweeper node, mark that there's one sweep tool in progress.
+ // All the HBase major compaction and sweep tool in this column family could not
+ // run until this sweep tool is finished.
+ zk.addSweeperZNode(tn.getNameAsString(), familyName, Bytes.toBytes(id));
+ }
+ }
+ } finally {
+ zk.unlockColumnFamily(tn.getNameAsString(), familyName);
+ }
+ Job job = null;
+ try {
+ Scan scan = new Scan();
+ scan.addFamily(family.getName());
+ // Do not retrieve the mob data when scanning
+ scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+ scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
+ scan.setCaching(SCAN_CACHING);
+ scan.setCacheBlocks(false);
+ scan.setMaxVersions(family.getMaxVersions());
+ conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
+ JavaSerialization.class.getName() + "," + WritableSerialization.class.getName());
+ conf.set(SWEEP_JOB_ID, id);
+ conf.set(SWEEPER_NODE, zk.getSweeperZNodePath(tn.getNameAsString(), familyName));
+ job = prepareJob(tn, familyName, scan, conf);
+ job.getConfiguration().set(TableInputFormat.SCAN_COLUMN_FAMILY, familyName);
+ // Record the compaction start time.
+ // In the sweep tool, only the mob file whose modification time is older than
+ // (startTime - delay) could be handled by this tool.
+ // The delay is one day. It could be configured as well, but this is only used
+ // in the test.
+ job.getConfiguration().setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE,
+ compactionStartTime);
+
+ job.setPartitionerClass(MobFilePathHashPartitioner.class);
+ submit(job, tn, familyName);
+ if (job.waitForCompletion(true)) {
+ // Archive the unused mob files.
+ removeUnusedFiles(job, tn, family);
+ }
+ } finally {
+ cleanup(job, tn, familyName);
+ zk.deleteSweeperZNode(tn.getNameAsString(), familyName);
+ }
+ } finally {
+ zk.close();
+ }
+ }
+
+ /**
+ * Prepares a map reduce job.
+ * @param tn The current table name.
+ * @param familyName The current family name.
+ * @param scan The current scan.
+ * @param conf The current configuration.
+ * @return A map reduce job.
+ * @throws IOException
+ */
+ private Job prepareJob(TableName tn, String familyName, Scan scan, Configuration conf)
+ throws IOException {
+ Job job = Job.getInstance(conf);
+ job.setJarByClass(SweepMapper.class);
+ TableMapReduceUtil.initTableMapperJob(tn.getNameAsString(), scan,
+ SweepMapper.class, Text.class, Writable.class, job);
+
+ job.setInputFormatClass(TableInputFormat.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(KeyValue.class);
+ job.setReducerClass(SweepReducer.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ String jobName = getCustomJobName(this.getClass().getSimpleName(), tn.getNameAsString(),
+ familyName);
+ job.setJobName(jobName);
+ if (StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
+ String fileLoc = conf.get(CREDENTIALS_LOCATION);
+ Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf);
+ job.getCredentials().addAll(cred);
+ }
+ return job;
+ }
+
+ /**
+ * Gets a customized job name.
+ * It's className-mapperClassName-reducerClassName-tableName-familyName.
+ * @param className The current class name.
+ * @param tableName The current table name.
+ * @param familyName The current family name.
+ * @return The customized job name.
+ */
+ private static String getCustomJobName(String className, String tableName, String familyName) {
+ StringBuilder name = new StringBuilder();
+ name.append(className);
+ name.append('-').append(SweepMapper.class.getSimpleName());
+ name.append('-').append(SweepReducer.class.getSimpleName());
+ name.append('-').append(tableName);
+ name.append('-').append(familyName);
+ return name.toString();
+ }
+
+ /**
+ * Submits a job.
+ * @param job The current job.
+ * @param tn The current table name.
+ * @param familyName The current family name.
+ * @throws IOException
+ */
+ private void submit(Job job, TableName tn, String familyName) throws IOException {
+ // delete the temp directory of the mob files in case the failure in the previous
+ // execution.
+ Path tempDir =
+ new Path(MobUtils.getMobHome(job.getConfiguration()), MobConstants.TEMP_DIR_NAME);
+ Path mobCompactionTempDir =
+ new Path(tempDir, MobConstants.MOB_SWEEP_TOOL_COMPACTION_TEMP_DIR_NAME);
+ Path workingPath = MobUtils.getCompactionWorkingPath(mobCompactionTempDir, job.getJobName());
+ job.getConfiguration().set(WORKING_DIR_KEY, workingPath.toString());
+ // delete the working directory in case it'not deleted by the last running.
+ fs.delete(workingPath, true);
+ // create the working directory.
+ fs.mkdirs(workingPath);
+ // create a sequence file which contains the names of all the existing files.
+ Path workingPathOfFiles = new Path(workingPath, "files");
+ Path workingPathOfNames = new Path(workingPath, "names");
+ job.getConfiguration().set(WORKING_FILES_DIR_KEY, workingPathOfFiles.toString());
+ Path allFileNamesPath = new Path(workingPathOfNames, WORKING_ALLNAMES_DIR);
+ job.getConfiguration().set(WORKING_ALLNAMES_FILE_KEY, allFileNamesPath.toString());
+ Path vistiedFileNamesPath = new Path(workingPathOfNames, WORKING_VISITED_DIR);
+ job.getConfiguration().set(WORKING_VISITED_DIR_KEY, vistiedFileNamesPath.toString());
+ // create a file includes all the existing mob files whose creation time is older than
+ // (now - oneDay)
+ fs.create(allFileNamesPath, true);
+ // create a directory where the files contain names of visited mob files are saved.
+ fs.mkdirs(vistiedFileNamesPath);
+ Path mobStorePath = MobUtils.getMobFamilyPath(job.getConfiguration(), tn, familyName);
+ // Find all the files whose creation time are older than one day.
+ // Write those file names to a file.
+ // In each reducer there's a writer, it write the visited file names to a file which is saved
+ // in WORKING_VISITED_DIR.
+ // After the job is finished, compare those files, then find out the unused mob files and
+ // archive them.
+ FileStatus[] files = fs.listStatus(mobStorePath);
+ Set<String> fileNames = new TreeSet<String>();
+ long mobCompactionDelay = job.getConfiguration().getLong(MOB_COMPACTION_DELAY, ONE_DAY);
+ for (FileStatus fileStatus : files) {
+ if (fileStatus.isFile() && !HFileLink.isHFileLink(fileStatus.getPath())) {
+ if (compactionStartTime - fileStatus.getModificationTime() > mobCompactionDelay) {
+ // only record the potentially unused files older than one day.
+ fileNames.add(fileStatus.getPath().getName());
+ }
+ }
+ }
+ // write the names to a sequence file
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, job.getConfiguration(),
+ allFileNamesPath, String.class, String.class);
+ try {
+ for (String fileName : fileNames) {
+ writer.append(fileName, MobConstants.EMPTY_STRING);
+ }
+ } finally {
+ IOUtils.closeStream(writer);
+ }
+ }
+
+ /**
+ * Gets the unused mob files.
+ * Compare the file which contains all the existing mob files and the visited files,
+ * find out the unused mob file and archive them.
+ * @param conf The current configuration.
+ * @return The unused mob files.
+ * @throws IOException
+ */
+ List<String> getUnusedFiles(Configuration conf) throws IOException {
+ // find out the unused files and archive them
+ Path allFileNamesPath = new Path(conf.get(WORKING_ALLNAMES_FILE_KEY));
+ SequenceFile.Reader allNamesReader = null;
+ MergeSortReader visitedNamesReader = null;
+ List<String> toBeArchived = new ArrayList<String>();
+ try {
+ allNamesReader = new SequenceFile.Reader(fs, allFileNamesPath, conf);
+ visitedNamesReader = new MergeSortReader(fs, conf,
+ new Path(conf.get(WORKING_VISITED_DIR_KEY)));
+ String nextAll = (String) allNamesReader.next((String) null);
+ String nextVisited = visitedNamesReader.next();
+ do {
+ if (nextAll != null) {
+ if (nextVisited != null) {
+ int compare = nextAll.compareTo(nextVisited);
+ if (compare < 0) {
+ toBeArchived.add(nextAll);
+ nextAll = (String) allNamesReader.next((String) null);
+ } else if (compare > 0) {
+ nextVisited = visitedNamesReader.next();
+ } else {
+ nextAll = (String) allNamesReader.next((String) null);
+ nextVisited = visitedNamesReader.next();
+ }
+ } else {
+ toBeArchived.add(nextAll);
+ nextAll = (String) allNamesReader.next((String) null);
+ }
+ } else {
+ break;
+ }
+ } while (nextAll != null || nextVisited != null);
+ } finally {
+ if (allNamesReader != null) {
+ allNamesReader.close();
+ }
+ if (visitedNamesReader != null) {
+ visitedNamesReader.close();
+ }
+ }
+ return toBeArchived;
+ }
+
+ /**
+ * Archives unused mob files.
+ * @param job The current job.
+ * @param tn The current table name.
+ * @param hcd The descriptor of the current column family.
+ * @throws IOException
+ */
+ private void removeUnusedFiles(Job job, TableName tn, HColumnDescriptor hcd) throws IOException {
+ // find out the unused files and archive them
+ List<StoreFile> storeFiles = new ArrayList<StoreFile>();
+ List<String> toBeArchived = getUnusedFiles(job.getConfiguration());
+ // archive them
+ Path mobStorePath = MobUtils
+ .getMobFamilyPath(job.getConfiguration(), tn, hcd.getNameAsString());
+ for (String archiveFileName : toBeArchived) {
+ Path path = new Path(mobStorePath, archiveFileName);
+ storeFiles.add(new StoreFile(fs, path, job.getConfiguration(), cacheConfig, BloomType.NONE));
+ }
+ if (!storeFiles.isEmpty()) {
+ try {
+ MobUtils.removeMobFiles(job.getConfiguration(), fs, tn,
+ FSUtils.getTableDir(MobUtils.getMobHome(conf), tn), hcd.getName(), storeFiles);
+ LOG.info(storeFiles.size() + " unused MOB files are removed");
+ } catch (Exception e) {
+ LOG.error("Fail to archive the store files " + storeFiles, e);
+ }
+ }
+ }
+
+ /**
+ * Deletes the working directory.
+ * @param job The current job.
+ * @param store The current MobFileStore.
+ * @throws IOException
+ */
+ private void cleanup(Job job, TableName tn, String familyName) throws IOException {
+ if (job != null) {
+ // delete the working directory
+ Path workingPath = new Path(job.getConfiguration().get(WORKING_DIR_KEY));
+ try {
+ fs.delete(workingPath, true);
+ } catch (IOException e) {
+ LOG.warn("Fail to delete the working directory after sweeping store " + familyName
+ + " in the table " + tn.getNameAsString(), e);
+ }
+ }
+ }
+
+ /**
+ * A result with index.
+ */
+ private class IndexedResult implements Comparable<IndexedResult> {
+ private int index;
+ private String value;
+
+ public IndexedResult(int index, String value) {
+ this.index = index;
+ this.value = value;
+ }
+
+ public int getIndex() {
+ return this.index;
+ }
+
+ public String getValue() {
+ return this.value;
+ }
+
+ @Override
+ public int compareTo(IndexedResult o) {
+ if (this.value == null) {
+ return 0;
+ } else if (o.value == null) {
+ return 1;
+ } else {
+ return this.value.compareTo(o.value);
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof IndexedResult)) {
+ return false;
+ }
+ return compareTo((IndexedResult) obj) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return value.hashCode();
+ }
+ }
+
+ /**
+ * Merge sort reader.
+ * It merges and sort the readers in different sequence files as one where
+ * the results are read in order.
+ */
+ private class MergeSortReader {
+
+ private List<SequenceFile.Reader> readers = new ArrayList<SequenceFile.Reader>();
+ private PriorityQueue<IndexedResult> results = new PriorityQueue<IndexedResult>();
+
+ public MergeSortReader(FileSystem fs, Configuration conf, Path path) throws IOException {
+ if (fs.exists(path)) {
+ FileStatus[] files = fs.listStatus(path);
+ int index = 0;
+ for (FileStatus file : files) {
+ if (file.isFile()) {
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, file.getPath(), conf);
+ String key = (String) reader.next((String) null);
+ if (key != null) {
+ results.add(new IndexedResult(index, key));
+ readers.add(reader);
+ index++;
+ }
+ }
+ }
+ }
+ }
+
+ public String next() throws IOException {
+ IndexedResult result = results.poll();
+ if (result != null) {
+ SequenceFile.Reader reader = readers.get(result.getIndex());
+ String key = (String) reader.next((String) null);
+ if (key != null) {
+ results.add(new IndexedResult(result.getIndex(), key));
+ }
+ return result.getValue();
+ }
+ return null;
+ }
+
+ public void close() {
+ for (SequenceFile.Reader reader : readers) {
+ IOUtils.closeStream(reader);
+ }
+ }
+ }
+
+ /**
+ * The counter used in sweep job.
+ */
+ public enum SweepCounter {
+
+ /**
+ * How many files are read.
+ */
+ INPUT_FILE_COUNT,
+
+ /**
+ * How many files need to be merged or cleaned.
+ */
+ FILE_TO_BE_MERGE_OR_CLEAN,
+
+ /**
+ * How many files are left after merging.
+ */
+ FILE_AFTER_MERGE_OR_CLEAN,
+
+ /**
+ * How many records are updated.
+ */
+ RECORDS_UPDATED,
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java
new file mode 100644
index 0000000..b789332
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Tracker on the sweep tool node in zookeeper.
+ * The sweep tool node is an ephemeral one, when the process dies this node is deleted,
+ * at that time MR might be still running, and if another sweep job is started, two MR
+ * for the same column family will run at the same time.
+ * This tracker watches this ephemeral node, if it's gone or it's not created by the
+ * sweep job that owns the current MR, the current process will be aborted.
+ */
+@InterfaceAudience.Private
+public class SweepJobNodeTracker extends ZooKeeperListener {
+
+ private String node;
+ private String sweepJobId;
+
+ public SweepJobNodeTracker(ZooKeeperWatcher watcher, String node, String sweepJobId) {
+ super(watcher);
+ this.node = node;
+ this.sweepJobId = sweepJobId;
+ }
+
+ /**
+ * Registers the watcher on the sweep job node.
+ * If there's no such a sweep job node, or it's not created by the sweep job that
+ * owns the current MR, the current process will be aborted.
+ */
+ public void start() throws KeeperException {
+ watcher.registerListener(this);
+ if (ZKUtil.watchAndCheckExists(watcher, node)) {
+ byte[] data = ZKUtil.getDataAndWatch(watcher, node);
+ if (data != null) {
+ if (!sweepJobId.equals(Bytes.toString(data))) {
+ System.exit(1);
+ }
+ }
+ } else {
+ System.exit(1);
+ }
+ }
+
+ @Override
+ public void nodeDeleted(String path) {
+ // If the ephemeral node is deleted, abort the current process.
+ if (node.equals(path)) {
+ System.exit(1);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java
new file mode 100644
index 0000000..f508b93
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java
@@ -0,0 +1,84 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.MobZookeeper.DummyMobAbortable;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * The mapper of a sweep job.
+ * Takes the rows from the table and their results and map to <filename:Text, mobValue:KeyValue>
+ * where mobValue is the actual cell in HBase.
+ */
+@InterfaceAudience.Private
+public class SweepMapper extends TableMapper<Text, KeyValue> {
+
+ private ZooKeeperWatcher zkw = null;
+
+ @Override
+ protected void setup(Context context) throws IOException,
+ InterruptedException {
+ String id = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID);
+ String sweeperNode = context.getConfiguration().get(SweepJob.SWEEPER_NODE);
+ zkw = new ZooKeeperWatcher(context.getConfiguration(), id,
+ new DummyMobAbortable());
+ try {
+ SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, id);
+ tracker.start();
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException,
+ InterruptedException {
+ if (zkw != null) {
+ zkw.close();
+ }
+ }
+
+ @Override
+ public void map(ImmutableBytesWritable r, Result columns, Context context) throws IOException,
+ InterruptedException {
+ if (columns == null) {
+ return;
+ }
+ KeyValue[] kvList = columns.raw();
+ if (kvList == null || kvList.length == 0) {
+ return;
+ }
+ for (KeyValue kv : kvList) {
+ if (MobUtils.hasValidMobRefCellValue(kv)) {
+ String fileName = MobUtils.getMobFileName(kv);
+ context.write(new Text(fileName), kv);
+ }
+ }
+ }
+}