You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2014/09/19 12:11:02 UTC
[2/2] git commit: HBASE-11644 External MOB compaction tools
(Jingcheng Du)
HBASE-11644 External MOB compaction tools (Jingcheng Du)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/84e957c8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/84e957c8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/84e957c8
Branch: refs/heads/hbase-11339
Commit: 84e957c875ae971578a5b147775445368ea26188
Parents: 9cf46dc
Author: Jonathan M Hsieh <jm...@apache.org>
Authored: Fri Sep 19 03:02:06 2014 -0700
Committer: Jonathan M Hsieh <jm...@apache.org>
Committed: Fri Sep 19 03:06:33 2014 -0700
----------------------------------------------------------------------
.../src/main/resources/hbase-default.xml | 34 ++
.../master/ExpiredMobFileCleanerChore.java | 70 +++
.../org/apache/hadoop/hbase/master/HMaster.java | 7 +
.../hadoop/hbase/mob/DefaultMobCompactor.java | 2 +-
.../hadoop/hbase/mob/ExpiredMobFileCleaner.java | 120 ++++
.../apache/hadoop/hbase/mob/MobConstants.java | 22 +
.../org/apache/hadoop/hbase/mob/MobUtils.java | 285 +++++++++-
.../apache/hadoop/hbase/mob/MobZookeeper.java | 270 +++++++++
.../hbase/mob/mapreduce/MemStoreWrapper.java | 184 +++++++
.../mapreduce/MobFilePathHashPartitioner.java | 41 ++
.../hadoop/hbase/mob/mapreduce/SweepJob.java | 550 +++++++++++++++++++
.../mob/mapreduce/SweepJobNodeTracker.java | 74 +++
.../hadoop/hbase/mob/mapreduce/SweepMapper.java | 84 +++
.../hbase/mob/mapreduce/SweepReducer.java | 506 +++++++++++++++++
.../hadoop/hbase/mob/mapreduce/Sweeper.java | 108 ++++
.../hadoop/hbase/regionserver/HMobStore.java | 180 +++++-
.../regionserver/MobReferenceOnlyFilter.java | 42 ++
.../hbase/mob/TestExpiredMobFileCleaner.java | 180 ++++++
.../hbase/mob/mapreduce/TestMobSweepJob.java | 168 ++++++
.../hbase/mob/mapreduce/TestMobSweepMapper.java | 100 ++++
.../mob/mapreduce/TestMobSweepReducer.java | 207 +++++++
.../hbase/mob/mapreduce/TestMobSweeper.java | 306 +++++++++++
.../hbase/regionserver/TestMobCompaction.java | 26 +-
.../hbase/regionserver/TestMobStoreScanner.java | 72 +++
24 files changed, 3594 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 872bbc5..647defd 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1483,4 +1483,38 @@ possible configurations would overwhelm and obscure the important.
The default value is 0.5f.
</description>
</property>
+ <property>
+ <name>hbase.mob.sweep.tool.compaction.ratio</name>
+ <value>0.5f</value>
+ <description>
+ If there're too many cells deleted in a mob file, it's regarded
+ as an invalid file and needs to be merged.
+ If existingCellsSize/mobFileSize is less than ratio, it's regarded
+ as an invalid file. The default value is 0.5f.
+ </description>
+ </property>
+ <property>
+ <name>hbase.mob.sweep.tool.compaction.mergeable.size</name>
+ <value>134217728</value>
+ <description>
+ If the size of a mob file is less than this value, it's regarded as a small
+ file and needs to be merged. The default value is 128MB.
+ </description>
+ </property>
+ <property>
+ <name>hbase.mob.sweep.tool.compaction.memstore.flush.size</name>
+ <value>134217728</value>
+ <description>
+ The flush size for the memstore used by sweep job. Each sweep reducer owns such a memstore.
+ The default value is 128MB.
+ </description>
+ </property>
+ <property>
+ <name>hbase.master.mob.ttl.cleaner.period</name>
+ <value>86400000</value>
+ <description>
+ The period that ExpiredMobFileCleanerChore runs. The unit is millisecond.
+ The default value is one day.
+ </description>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
new file mode 100644
index 0000000..98fe236
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.mob.ExpiredMobFileCleaner;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.Threads;
+
+/**
+ * The Class ExpiredMobFileCleanerChore for running cleaner regularly to remove the expired
+ * mob files.
+ */
+@InterfaceAudience.Private
+public class ExpiredMobFileCleanerChore extends Chore {
+
+ private static final Log LOG = LogFactory.getLog(ExpiredMobFileCleanerChore.class);
+ private final HMaster master;
+ private ExpiredMobFileCleaner cleaner;
+
+ public ExpiredMobFileCleanerChore(HMaster master) {
+ super(master.getServerName() + "-ExpiredMobFileCleanerChore", master.getConfiguration().getInt(
+ MobConstants.MOB_CLEANER_PERIOD, MobConstants.DEFAULT_MOB_CLEANER_PERIOD), master);
+ this.master = master;
+ cleaner = new ExpiredMobFileCleaner();
+ }
+
+ @Override
+ protected void chore() {
+ try {
+ TableDescriptors htds = master.getTableDescriptors();
+ Map<String, HTableDescriptor> map = htds.getAll();
+ for (HTableDescriptor htd : map.values()) {
+ for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
+ if (MobUtils.isMobFamily(hcd) && hcd.getMinVersions() == 0) {
+ cleaner.cleanExpiredMobFiles(htd.getTableName().getNameAsString(), hcd);
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Fail to clean the expired mob files", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 714b5a8..4ff3592 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -208,6 +208,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
CatalogJanitor catalogJanitorChore;
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
+ private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
MasterCoprocessorHost cpHost;
@@ -610,6 +611,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
// master initialization. See HBASE-5916.
this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
+ this.expiredMobFileCleanerChore = new ExpiredMobFileCleanerChore(this);
+ Threads.setDaemonThreadRunning(expiredMobFileCleanerChore.getThread());
+
if (this.cpHost != null) {
// don't let cp initialization errors kill the master
try {
@@ -856,6 +860,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
}
private void stopChores() {
+ if (this.expiredMobFileCleanerChore != null) {
+ this.expiredMobFileCleanerChore.interrupt();
+ }
if (this.balancerChore != null) {
this.balancerChore.interrupt();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
index 5f13502..fd35a15 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
@@ -152,7 +152,7 @@ public class DefaultMobCompactor extends DefaultCompactor {
// to the store file.
writer.append(kv);
} else if (MobUtils.isMobReferenceCell(kv)) {
- if (MobUtils.isValidMobRefCellValue(kv)) {
+ if (MobUtils.hasValidMobRefCellValue(kv)) {
int size = MobUtils.getMobValueLength(kv);
if (size > mobSizeThreshold) {
// If the value size is larger than the threshold, it's regarded as a mob. Since
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
new file mode 100644
index 0000000..d3c11ad
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
@@ -0,0 +1,120 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.google.protobuf.ServiceException;
+
+/**
+ * The cleaner to delete the expired MOB files.
+ */
+@InterfaceAudience.Private
+public class ExpiredMobFileCleaner extends Configured implements Tool {
+
+ private static final Log LOG = LogFactory.getLog(ExpiredMobFileCleaner.class);
+ /**
+ * Cleans the MOB files when they're expired and their min versions are 0.
+ * If the latest timestamp of Cells in a MOB file is older than the TTL in the column family,
+ * it's regarded as expired. This cleaner deletes them.
+ * At a time T0, the cells in a mob file M0 are expired. If a user starts a scan before T0, those
+ * mob cells are visible, this scan still runs after T0. At that time T1, this mob file M0
+ * is expired, meanwhile a cleaner starts, the M0 is archived and can be read in the archive
+ * directory.
+ * @param tableName The current table name.
+ * @param family The current family.
+ * @throws ServiceException
+ * @throws IOException
+ */
+ public void cleanExpiredMobFiles(String tableName, HColumnDescriptor family)
+ throws ServiceException, IOException {
+ Configuration conf = getConf();
+ TableName tn = TableName.valueOf(tableName);
+ FileSystem fs = FileSystem.get(conf);
+ LOG.info("Cleaning the expired MOB files of " + family.getNameAsString() + " in " + tableName);
+ // disable the block cache.
+ Configuration copyOfConf = new Configuration(conf);
+ copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
+ CacheConfig cacheConfig = new CacheConfig(copyOfConf);
+ MobUtils.cleanExpiredMobFiles(fs, conf, tn, family, cacheConfig,
+ EnvironmentEdgeManager.currentTime());
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ ToolRunner.run(conf, new ExpiredMobFileCleaner(), args);
+ }
+
+ private void printUsage() {
+ System.err.println("Usage:\n" + "--------------------------\n"
+ + ExpiredMobFileCleaner.class.getName() + " tableName familyName");
+ System.err.println(" tableName The table name");
+ System.err.println(" familyName The column family name");
+ }
+
+ public int run(String[] args) throws Exception {
+ if (args.length != 2) {
+ printUsage();
+ return 1;
+ }
+ String tableName = args[0];
+ String familyName = args[1];
+ TableName tn = TableName.valueOf(tableName);
+ HBaseAdmin.checkHBaseAvailable(getConf());
+ HBaseAdmin admin = new HBaseAdmin(getConf());
+ try {
+ HTableDescriptor htd = admin.getTableDescriptor(tn);
+ HColumnDescriptor family = htd.getFamily(Bytes.toBytes(familyName));
+ if (family == null || !MobUtils.isMobFamily(family)) {
+ throw new IOException("Column family " + familyName + " is not a MOB column family");
+ }
+ if (family.getMinVersions() > 0) {
+ throw new IOException(
+ "The minVersions of the column family is not 0, could not be handled by this cleaner");
+ }
+ cleanExpiredMobFiles(tableName, family);
+ return 0;
+ } finally {
+ try {
+ admin.close();
+ } catch (IOException e) {
+ LOG.error("Fail to close the HBaseAdmin.", e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
index 9978afd..4e3e7c8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
@@ -38,6 +38,7 @@ public class MobConstants {
public static final String MOB_SCAN_RAW = "hbase.mob.scan.raw";
public static final String MOB_CACHE_BLOCKS = "hbase.mob.cache.blocks";
+ public static final String MOB_SCAN_REF_ONLY = "hbase.mob.scan.ref.only";
public static final String MOB_FILE_CACHE_SIZE_KEY = "hbase.mob.file.cache.size";
public static final int DEFAULT_MOB_FILE_CACHE_SIZE = 1000;
@@ -46,6 +47,26 @@ public class MobConstants {
public static final String MOB_REGION_NAME = ".mob";
public static final byte[] MOB_REGION_NAME_BYTES = Bytes.toBytes(MOB_REGION_NAME);
+ public static final String MOB_CLEANER_PERIOD = "hbase.master.mob.ttl.cleaner.period";
+ public static final int DEFAULT_MOB_CLEANER_PERIOD = 24 * 60 * 60 * 1000; // one day
+
+ public static final String MOB_SWEEP_TOOL_COMPACTION_START_DATE =
+ "hbase.mob.sweep.tool.compaction.start.date";
+ public static final String MOB_SWEEP_TOOL_COMPACTION_RATIO =
+ "hbase.mob.sweep.tool.compaction.ratio";
+ public static final String MOB_SWEEP_TOOL_COMPACTION_MERGEABLE_SIZE =
+ "hbase.mob.sweep.tool.compaction.mergeable.size";
+
+ public static final float DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO = 0.5f;
+ public static final long DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE = 128 * 1024 * 1024;
+
+ public static final String MOB_SWEEP_TOOL_COMPACTION_TEMP_DIR_NAME = "mobcompaction";
+
+ public static final String MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE =
+ "hbase.mob.sweep.tool.compaction.memstore.flush.size";
+ public static final long DEFAULT_MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE =
+ 1024 * 1024 * 128; // 128M
+
public static final String MOB_CACHE_EVICT_PERIOD = "hbase.mob.cache.evict.period";
public static final String MOB_CACHE_EVICT_REMAIN_RATIO = "hbase.mob.cache.evict.remain.ratio";
public static final Tag MOB_REF_TAG = new Tag(TagType.MOB_REFERENCE_TAG_TYPE,
@@ -55,6 +76,7 @@ public class MobConstants {
public static final long DEFAULT_MOB_CACHE_EVICT_PERIOD = 3600l;
public final static String TEMP_DIR_NAME = ".tmp";
+ public final static String EMPTY_STRING = "";
private MobConstants() {
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
index e52d336..e49d3ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -18,13 +18,22 @@
*/
package org.apache.hadoop.hbase.mob;
+import java.io.FileNotFoundException;
+import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Date;
import java.util.List;
+import java.util.UUID;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -34,7 +43,16 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -44,6 +62,9 @@ import org.apache.hadoop.hbase.util.FSUtils;
@InterfaceAudience.Private
public class MobUtils {
+ private static final Log LOG = LogFactory.getLog(MobUtils.class);
+ private static final String COMPACTION_WORKING_DIR_NAME = "working";
+
private static final ThreadLocal<SimpleDateFormat> LOCAL_FORMAT =
new ThreadLocal<SimpleDateFormat>() {
@Override
@@ -143,6 +164,22 @@ public class MobUtils {
}
/**
+ * Indicates whether it's a reference only scan.
+ * The information is set in the attribute "hbase.mob.scan.ref.only" of scan.
+ * If it's a ref only scan, only the cells with ref tag are returned.
+ * @param scan The current scan.
+ * @return True if it's a ref only scan.
+ */
+ public static boolean isRefOnlyScan(Scan scan) {
+ byte[] refOnly = scan.getAttribute(MobConstants.MOB_SCAN_REF_ONLY);
+ try {
+ return refOnly != null && Bytes.toBoolean(refOnly);
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ }
+
+ /**
* Indicates whether the scan contains the information of caching blocks.
* The information is set in the attribute "hbase.mob.cache.blocks" of scan.
* @param scan The current scan.
@@ -172,6 +209,91 @@ public class MobUtils {
}
/**
+ * Cleans the expired mob files.
+ * Cleans the files whose creation date is older than (current - columnFamily.ttl), and
+ * the minVersions of that column family is 0.
+ * @param fs The current file system.
+ * @param conf The current configuration.
+ * @param tableName The current table name.
+ * @param columnDescriptor The descriptor of the current column family.
+ * @param cacheConfig The cacheConfig that disables the block cache.
+ * @param current The current time.
+ * @throws IOException
+ */
+ public static void cleanExpiredMobFiles(FileSystem fs, Configuration conf, TableName tableName,
+ HColumnDescriptor columnDescriptor, CacheConfig cacheConfig, long current)
+ throws IOException {
+ long timeToLive = columnDescriptor.getTimeToLive();
+ if (Integer.MAX_VALUE == timeToLive) {
+ // no need to clean, because the TTL is not set.
+ return;
+ }
+
+ Date expireDate = new Date(current - timeToLive * 1000);
+ expireDate = new Date(expireDate.getYear(), expireDate.getMonth(), expireDate.getDate());
+ LOG.info("MOB HFiles older than " + expireDate.toGMTString() + " will be deleted!");
+
+ FileStatus[] stats = null;
+ Path mobTableDir = FSUtils.getTableDir(getMobHome(conf), tableName);
+ Path path = getMobFamilyPath(conf, tableName, columnDescriptor.getNameAsString());
+ try {
+ stats = fs.listStatus(path);
+ } catch (FileNotFoundException e) {
+ LOG.warn("Fail to find the mob file " + path, e);
+ }
+ if (null == stats) {
+ // no file found
+ return;
+ }
+ List<StoreFile> filesToClean = new ArrayList<StoreFile>();
+ int deletedFileCount = 0;
+ for (FileStatus file : stats) {
+ String fileName = file.getPath().getName();
+ try {
+ MobFileName mobFileName = null;
+ if (!HFileLink.isHFileLink(file.getPath())) {
+ mobFileName = MobFileName.create(fileName);
+ } else {
+ HFileLink hfileLink = new HFileLink(conf, file.getPath());
+ mobFileName = MobFileName.create(hfileLink.getOriginPath().getName());
+ }
+ Date fileDate = parseDate(mobFileName.getDate());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checking file " + fileName);
+ }
+ if (fileDate.getTime() < expireDate.getTime()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(fileName + " is an expired file");
+ }
+ filesToClean.add(new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE));
+ }
+ } catch (Exception e) {
+ LOG.error("Cannot parse the fileName " + fileName, e);
+ }
+ }
+ if (!filesToClean.isEmpty()) {
+ try {
+ removeMobFiles(conf, fs, tableName, mobTableDir, columnDescriptor.getName(),
+ filesToClean);
+ deletedFileCount = filesToClean.size();
+ } catch (IOException e) {
+ LOG.error("Fail to delete the mob files " + filesToClean, e);
+ }
+ }
+ LOG.info(deletedFileCount + " expired mob files are deleted");
+ }
+
+ /**
+ * Gets the znode name of column family.
+ * @param tableName The current table name.
+ * @param familyName The name of the current column family.
+ * @return The znode name of column family.
+ */
+ public static String getColumnFamilyZNodeName(String tableName, String familyName) {
+ return tableName + ":" + familyName;
+ }
+
+ /**
* Gets the root dir of the mob files.
* It's {HBASE_DIR}/mobdir.
* @param conf The current configuration.
@@ -183,6 +305,19 @@ public class MobUtils {
}
/**
+ * Gets the qualified root dir of the mob files.
+ * @param conf The current configuration.
+ * @return The qualified root dir.
+ * @throws IOException
+ */
+ public static Path getQualifiedMobRootDir(Configuration conf) throws IOException {
+ Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
+ Path mobRootDir = new Path(hbaseDir, MobConstants.MOB_DIR_NAME);
+ FileSystem fs = mobRootDir.getFileSystem(conf);
+ return mobRootDir.makeQualified(fs);
+ }
+
+ /**
* Gets the region dir of the mob files.
* It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}.
* @param conf The current configuration.
@@ -190,7 +325,7 @@ public class MobUtils {
* @return The region dir of the mob files.
*/
public static Path getMobRegionPath(Configuration conf, TableName tableName) {
- Path tablePath = FSUtils.getTableDir(MobUtils.getMobHome(conf), tableName);
+ Path tablePath = FSUtils.getTableDir(getMobHome(conf), tableName);
HRegionInfo regionInfo = getMobRegionInfo(tableName);
return new Path(tablePath, regionInfo.getEncodedName());
}
@@ -232,36 +367,160 @@ public class MobUtils {
}
/**
+ * Gets whether the current HRegionInfo is a mob one.
+ * @param regionInfo The current HRegionInfo.
+ * @return If true, the current HRegionInfo is a mob one.
+ */
+ public static boolean isMobRegionInfo(HRegionInfo regionInfo) {
+ return regionInfo == null ? false : getMobRegionInfo(regionInfo.getTable()).getEncodedName()
+ .equals(regionInfo.getEncodedName());
+ }
+
+ /**
+ * Gets the working directory of the mob compaction.
+ * @param root The root directory of the mob compaction.
+ * @param jobName The current job name.
+ * @return The directory of the mob compaction for the current job.
+ */
+ public static Path getCompactionWorkingPath(Path root, String jobName) {
+ Path parent = new Path(root, jobName);
+ return new Path(parent, COMPACTION_WORKING_DIR_NAME);
+ }
+
+ /**
+ * Archives the mob files.
+ * @param conf The current configuration.
+ * @param fs The current file system.
+ * @param tableName The table name.
+ * @param tableDir The table directory.
+ * @param family The name of the column family.
+ * @param storeFiles The files to be deleted.
+ * @throws IOException
+ */
+ public static void removeMobFiles(Configuration conf, FileSystem fs, TableName tableName,
+ Path tableDir, byte[] family, Collection<StoreFile> storeFiles) throws IOException {
+ HFileArchiver.archiveStoreFiles(conf, fs, getMobRegionInfo(tableName), tableDir, family,
+ storeFiles);
+ }
+
+ /**
* Creates a mob reference KeyValue.
* The value of the mob reference KeyValue is mobCellValueSize + mobFileName.
- * @param kv The original KeyValue.
+ * @param cell The original Cell.
* @param fileName The mob file name where the mob reference KeyValue is written.
* @param tableNameTag The tag of the current table name. It's very important in
* cloning the snapshot.
* @return The mob reference KeyValue.
*/
- public static KeyValue createMobRefKeyValue(KeyValue kv, byte[] fileName, Tag tableNameTag) {
+ public static KeyValue createMobRefKeyValue(Cell cell, byte[] fileName, Tag tableNameTag) {
// Append the tags to the KeyValue.
// The key is same, the value is the filename of the mob file
- List<Tag> existingTags = Tag.asList(kv.getTagsArray(), kv.getTagsOffset(), kv.getTagsLength());
- existingTags.add(MobConstants.MOB_REF_TAG);
+ List<Tag> tags = new ArrayList<Tag>();
+ // Add the ref tag as the 1st one.
+ tags.add(MobConstants.MOB_REF_TAG);
+ // Add the existing tags.
+ tags.addAll(Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()));
// Add the tag of the source table name, this table is where this mob file is flushed
// from.
// It's very useful in cloning the snapshot. When reading from the cloning table, we need to
// find the original mob files by this table name. For details please see cloning
// snapshot for mob files.
- existingTags.add(tableNameTag);
- int valueLength = kv.getValueLength();
+ tags.add(tableNameTag);
+ int valueLength = cell.getValueLength();
byte[] refValue = Bytes.add(Bytes.toBytes(valueLength), fileName);
- KeyValue reference = new KeyValue(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
- kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(),
- kv.getQualifierOffset(), kv.getQualifierLength(), kv.getTimestamp(), KeyValue.Type.Put,
- refValue, 0, refValue.length, existingTags);
- reference.setSequenceId(kv.getSequenceId());
+ KeyValue reference = new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+ cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+ cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+ cell.getTimestamp(), KeyValue.Type.Put, refValue, 0, refValue.length, tags);
+ reference.setSequenceId(cell.getSequenceId());
return reference;
}
/**
+ * Creates a directory of mob files for flushing.
+ * @param conf The current configuration.
+ * @param fs The current file system.
+ * @param family The descriptor of the current column family.
+ * @param date The date string, its format is yyyymmmdd.
+ * @param basePath The basic path for a temp directory.
+ * @param maxKeyCount The key count.
+ * @param compression The compression algorithm.
+ * @param startKey The hex string of the start key.
+ * @param cacheConfig The current cache config.
+ * @return The writer for the mob file.
+ * @throws IOException
+ */
+ public static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
+ HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
+ Compression.Algorithm compression, String startKey, CacheConfig cacheConfig)
+ throws IOException {
+ MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString()
+ .replaceAll("-", ""));
+ HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
+ .withIncludesMvcc(false).withIncludesTags(true)
+ .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
+ .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
+ .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true)
+ .withDataBlockEncoding(family.getDataBlockEncoding()).build();
+
+ StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs)
+ .withFilePath(new Path(basePath, mobFileName.getFileName()))
+ .withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE)
+ .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
+ return w;
+ }
+
+ /**
+ * Commits the mob file.
+ * @param @param conf The current configuration.
+ * @param fs The current file system.
+ * @param path The path where the mob file is saved.
+ * @param targetPath The directory path where the source file is renamed to.
+ * @param cacheConfig The current cache config.
+ * @throws IOException
+ */
+ public static void commitFile(Configuration conf, FileSystem fs, final Path sourceFile,
+ Path targetPath, CacheConfig cacheConfig) throws IOException {
+ if (sourceFile == null) {
+ return;
+ }
+ Path dstPath = new Path(targetPath, sourceFile.getName());
+ validateMobFile(conf, fs, sourceFile, cacheConfig);
+ String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
+ LOG.info(msg);
+ Path parent = dstPath.getParent();
+ if (!fs.exists(parent)) {
+ fs.mkdirs(parent);
+ }
+ if (!fs.rename(sourceFile, dstPath)) {
+ throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
+ }
+ }
+
+ /**
+ * Validates a mob file by opening and closing it.
+ * @param conf The current configuration.
+ * @param fs The current file system.
+ * @param path The path where the mob file is saved.
+ * @param cacheConfig The current cache config.
+ */
+ private static void validateMobFile(Configuration conf, FileSystem fs, Path path,
+ CacheConfig cacheConfig) throws IOException {
+ StoreFile storeFile = null;
+ try {
+ storeFile = new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE);
+ storeFile.createReader();
+ } catch (IOException e) {
+ LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e);
+ throw e;
+ } finally {
+ if (storeFile != null) {
+ storeFile.closeReader(false);
+ }
+ }
+ }
+
+ /**
* Indicates whether the current mob ref cell has a valid value.
* A mob ref cell has a mob reference tag.
* The value of a mob ref cell consists of two parts, real mob value length and mob file name.
@@ -270,7 +529,7 @@ public class MobUtils {
* @param cell The mob ref cell.
* @return True if the cell has a valid value.
*/
- public static boolean isValidMobRefCellValue(Cell cell) {
+ public static boolean hasValidMobRefCellValue(Cell cell) {
return cell.getValueLength() > Bytes.SIZEOF_INT;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java
new file mode 100644
index 0000000..a9557d7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java
@@ -0,0 +1,270 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * The zookeeper used for MOB.
+ * This zookeeper is used to synchronize the HBase major compaction and sweep tool.
+ * The structure of the nodes for mob in zookeeper.
+ * |--baseNode
+ * |--MOB
+ * |--tableName:columnFamilyName-lock // locks for the mob column family
+ * |--tableName:columnFamilyName-sweeper // when a sweep tool runs, such a node is added
+ * |--tableName:columnFamilyName-majorCompaction
+ * |--UUID //when a major compaction occurs, such a node is added.
+ * In order to synchronize the operations between the sweep tool and HBase major compaction, these
+ * actions need to acquire the tableName:columnFamilyName-lock before the sweep tool and major
+ * compaction run.
+ * In sweep tool.
+ * 1. If it acquires the lock successfully. It check whether the sweeper node exists, if exist the
+ * current running is aborted. If not it it checks whether there're major compaction nodes, if yes
+ * the current running is aborted, if not it adds a sweep node to the zookeeper.
+ * 2. If it could not obtain the lock, the current running is aborted.
+ * In the HBase compaction.
+ * 1. If it's a minor compaction, continue the compaction.
+ * 2. If it's a major compaction, it acquires a lock in zookeeper.
+ * A. If it obtains the lock, it checks whether there's sweep node, if yes it converts itself
+ * to a minor one and continue, if no it adds a major compaction node to the zookeeper.
+ * B. If it could not obtain the lock, it converts itself to a minor one and continue the
+ * compaction.
+ */
+@InterfaceAudience.Private
+public class MobZookeeper {
+ // TODO Will remove this class before the mob is merged back to master.
+ private static final Log LOG = LogFactory.getLog(MobZookeeper.class);
+
+ private ZooKeeperWatcher zkw;
+ private String mobZnode;
+ private static final String LOCK_EPHEMERAL = "-lock";
+ private static final String SWEEPER_EPHEMERAL = "-sweeper";
+ private static final String MAJOR_COMPACTION_EPHEMERAL = "-majorCompaction";
+
+ private MobZookeeper(Configuration conf, String identifier) throws IOException,
+ KeeperException {
+ this.zkw = new ZooKeeperWatcher(conf, identifier, new DummyMobAbortable());
+ mobZnode = ZKUtil.joinZNode(zkw.baseZNode, "MOB");
+ if (ZKUtil.checkExists(zkw, mobZnode) == -1) {
+ ZKUtil.createWithParents(zkw, mobZnode);
+ }
+ }
+
+ /**
+ * Creates an new instance of MobZookeeper.
+ * @param conf The current configuration.
+ * @param identifier string that is passed to RecoverableZookeeper to be used as
+ * identifier for this instance.
+ * @return A new instance of MobZookeeper.
+ * @throws IOException
+ * @throws KeeperException
+ */
+ public static MobZookeeper newInstance(Configuration conf, String identifier) throws IOException,
+ KeeperException {
+ return new MobZookeeper(conf, identifier);
+ }
+
+ /**
+ * Acquire a lock on the current column family.
+ * All the threads try to access the column family acquire a lock which is actually create an
+ * ephemeral node in the zookeeper.
+ * @param tableName The current table name.
+ * @param familyName The current column family name.
+ * @return True if the lock is obtained successfully. Otherwise false is returned.
+ */
+ public boolean lockColumnFamily(String tableName, String familyName) {
+ String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+ boolean locked = false;
+ try {
+ locked = ZKUtil.createEphemeralNodeAndWatch(zkw,
+ ZKUtil.joinZNode(mobZnode, znodeName + LOCK_EPHEMERAL), null);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(locked ? "Locked the column family " + znodeName
+ : "Can not lock the column family " + znodeName);
+ }
+ } catch (KeeperException e) {
+ LOG.error("Fail to lock the column family " + znodeName, e);
+ }
+ return locked;
+ }
+
+ /**
+ * Release the lock on the current column family.
+ * @param tableName The current table name.
+ * @param familyName The current column family name.
+ */
+ public void unlockColumnFamily(String tableName, String familyName) {
+ String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unlocking the column family " + znodeName);
+ }
+ try {
+ ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(mobZnode, znodeName + LOCK_EPHEMERAL));
+ } catch (KeeperException e) {
+ LOG.warn("Fail to unlock the column family " + znodeName, e);
+ }
+ }
+
+ /**
+ * Adds a node to zookeeper which indicates that a sweep tool is running.
+ * @param tableName The current table name.
+ * @param familyName The current columnFamilyName name.
+ * @param data the data of the ephemeral node.
+ * @return True if the node is created successfully. Otherwise false is returned.
+ */
+ public boolean addSweeperZNode(String tableName, String familyName, byte[] data) {
+ boolean add = false;
+ String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+ try {
+ add = ZKUtil.createEphemeralNodeAndWatch(zkw,
+ ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL), data);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(add ? "Added a znode for sweeper " + znodeName
+ : "Cannot add a znode for sweeper " + znodeName);
+ }
+ } catch (KeeperException e) {
+ LOG.error("Fail to add a znode for sweeper " + znodeName, e);
+ }
+ return add;
+ }
+
+ /**
+ * Gets the path of the sweeper znode in zookeeper.
+ * @param tableName The current table name.
+ * @param familyName The current columnFamilyName name.
+ * @return The path of the sweeper znode in zookeper.
+ */
+ public String getSweeperZNodePath(String tableName, String familyName) {
+ String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+ return ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL);
+ }
+
+ /**
+ * Deletes the node from zookeeper which indicates that a sweep tool is finished.
+ * @param tableName The current table name.
+ * @param familyName The current column family name.
+ */
+ public void deleteSweeperZNode(String tableName, String familyName) {
+ String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+ try {
+ ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL));
+ } catch (KeeperException e) {
+ LOG.error("Fail to delete a znode for sweeper " + znodeName, e);
+ }
+ }
+
+ /**
+ * Checks whether the znode exists in the Zookeeper.
+ * If the node exists, it means a sweep tool is running.
+ * Otherwise, the sweep tool is not.
+ * @param tableName The current table name.
+ * @param familyName The current column family name.
+ * @return True if this node doesn't exist. Otherwise false is returned.
+ * @throws KeeperException
+ */
+ public boolean isSweeperZNodeExist(String tableName, String familyName) throws KeeperException {
+ String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+ return ZKUtil.checkExists(zkw, ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL)) >= 0;
+ }
+
+ /**
+ * Checks whether there're major compactions nodes in the zookeeper.
+ * If there're such nodes, it means there're major compactions in progress now.
+ * Otherwise there're not.
+ * @param tableName The current table name.
+ * @param familyName The current column family name.
+ * @return True if there're major compactions in progress. Otherwise false is returned.
+ * @throws KeeperException
+ */
+ public boolean hasMajorCompactionChildren(String tableName, String familyName)
+ throws KeeperException {
+ String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+ String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL);
+ List<String> children = ZKUtil.listChildrenNoWatch(zkw, mcPath);
+ return children != null && !children.isEmpty();
+ }
+
+ /**
+ * Creates a node of a major compaction to the Zookeeper.
+ * Before a HBase major compaction, such a node is created to the Zookeeper. It tells others that
+ * there're major compaction in progress, the sweep tool could not be run at this time.
+ * @param tableName The current table name.
+ * @param familyName The current column family name.
+ * @param compactionName The current compaction name.
+ * @return True if the node is created successfully. Otherwise false is returned.
+ * @throws KeeperException
+ */
+ public boolean addMajorCompactionZNode(String tableName, String familyName,
+ String compactionName) throws KeeperException {
+ String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+ String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL);
+ ZKUtil.createNodeIfNotExistsAndWatch(zkw, mcPath, null);
+ String eachMcPath = ZKUtil.joinZNode(mcPath, compactionName);
+ return ZKUtil.createEphemeralNodeAndWatch(zkw, eachMcPath, null);
+ }
+
+ /**
+ * Deletes a major compaction node from the Zookeeper.
+ * @param tableName The current table name.
+ * @param familyName The current column family name.
+ * @param compactionName The current compaction name.
+ * @throws KeeperException
+ */
+ public void deleteMajorCompactionZNode(String tableName, String familyName,
+ String compactionName) throws KeeperException {
+ String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
+ String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL);
+ String eachMcPath = ZKUtil.joinZNode(mcPath, compactionName);
+ ZKUtil.deleteNode(zkw, eachMcPath);
+ }
+
+ /**
+ * Closes the MobZookeeper.
+ */
+ public void close() {
+ this.zkw.close();
+ }
+
+ /**
+ * An dummy abortable. It's used for the MobZookeeper.
+ */
+ public static class DummyMobAbortable implements Abortable {
+
+ private boolean abort = false;
+
+ public void abort(String why, Throwable e) {
+ abort = true;
+ }
+
+ public boolean isAborted() {
+ return abort;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
new file mode 100644
index 0000000..b0d4c9d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
@@ -0,0 +1,184 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepReducer.SweepPartitionId;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.MemStore;
+import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Reducer.Context;
+
+/**
+ * The wrapper of a DefaultMemStore.
+ * This wrapper is used in the sweep reducer to buffer and sort the cells written from
+ * the invalid and small mob files.
+ * It's flushed when it's full, the mob data are written to the mob files, and their file names
+ * are written back to store files of HBase.
+ * This memStore is used to sort the cells in mob files.
+ * In a reducer of sweep tool, the mob files are grouped by the same prefix (start key and date),
+ * in each group, the reducer iterates the files and read the cells to a new and bigger mob file.
+ * The cells in the same mob file are ordered, but cells across mob files are not.
+ * So we need this MemStoreWrapper to sort those cells come from different mob files before
+ * flushing them to the disk, when the memStore is big enough it's flushed as a new mob file.
+ */
+@InterfaceAudience.Private
+public class MemStoreWrapper {
+
+ private static final Log LOG = LogFactory.getLog(MemStoreWrapper.class);
+
+ private MemStore memstore;
+ private long flushSize;
+ private SweepPartitionId partitionId;
+ private Context context;
+ private Configuration conf;
+ private HTable table;
+ private HColumnDescriptor hcd;
+ private Path mobFamilyDir;
+ private FileSystem fs;
+ private CacheConfig cacheConfig;
+
+ public MemStoreWrapper(Context context, FileSystem fs, HTable table, HColumnDescriptor hcd,
+ MemStore memstore, CacheConfig cacheConfig) throws IOException {
+ this.memstore = memstore;
+ this.context = context;
+ this.fs = fs;
+ this.table = table;
+ this.hcd = hcd;
+ this.conf = context.getConfiguration();
+ this.cacheConfig = cacheConfig;
+ flushSize = this.conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE,
+ MobConstants.DEFAULT_MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE);
+ mobFamilyDir = MobUtils.getMobFamilyPath(conf, table.getName(), hcd.getNameAsString());
+ }
+
+ public void setPartitionId(SweepPartitionId partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ /**
+ * Flushes the memstore if the size is large enough.
+ * @throws IOException
+ */
+ private void flushMemStoreIfNecessary() throws IOException {
+ if (memstore.heapSize() >= flushSize) {
+ flushMemStore();
+ }
+ }
+
+ /**
+ * Flushes the memstore anyway.
+ * @throws IOException
+ */
+ public void flushMemStore() throws IOException {
+ MemStoreSnapshot snapshot = memstore.snapshot();
+ internalFlushCache(snapshot);
+ memstore.clearSnapshot(snapshot.getId());
+ }
+
+ /**
+ * Flushes the snapshot of the memstore.
+ * Flushes the mob data to the mob files, and flushes the name of these mob files to HBase.
+ * @param snapshot The snapshot of the memstore.
+ * @throws IOException
+ */
+ private void internalFlushCache(final MemStoreSnapshot snapshot)
+ throws IOException {
+ if (snapshot.getSize() == 0) {
+ return;
+ }
+ // generate the files into a temp directory.
+ String tempPathString = context.getConfiguration().get(SweepJob.WORKING_FILES_DIR_KEY);
+ StoreFile.Writer mobFileWriter = MobUtils.createWriter(conf, fs, hcd,
+ partitionId.getDate(), new Path(tempPathString), snapshot.getCellsCount(),
+ hcd.getCompactionCompression(), partitionId.getStartKey(), cacheConfig);
+
+ String relativePath = mobFileWriter.getPath().getName();
+ LOG.info("Create files under a temp directory " + mobFileWriter.getPath().toString());
+
+ byte[] referenceValue = Bytes.toBytes(relativePath);
+ int keyValueCount = 0;
+ KeyValueScanner scanner = snapshot.getScanner();
+ Cell cell = null;
+ while (null != (cell = scanner.next())) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ mobFileWriter.append(kv);
+ keyValueCount++;
+ }
+ scanner.close();
+ // Write out the log sequence number that corresponds to this output
+ // hfile. The hfile is current up to and including logCacheFlushId.
+ mobFileWriter.appendMetadata(Long.MAX_VALUE, false);
+ mobFileWriter.close();
+
+ MobUtils.commitFile(conf, fs, mobFileWriter.getPath(), mobFamilyDir, cacheConfig);
+ context.getCounter(SweepCounter.FILE_AFTER_MERGE_OR_CLEAN).increment(1);
+ // write reference/fileName back to the store files of HBase.
+ scanner = snapshot.getScanner();
+ scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
+ cell = null;
+ Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, this.table.getTableName());
+ while (null != (cell = scanner.next())) {
+ KeyValue reference = MobUtils.createMobRefKeyValue(cell, referenceValue, tableNameTag);
+ Put put =
+ new Put(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength());
+ put.add(reference);
+ table.put(put);
+ context.getCounter(SweepCounter.RECORDS_UPDATED).increment(1);
+ }
+ if (keyValueCount > 0) {
+ table.flushCommits();
+ }
+ scanner.close();
+ }
+
+ /**
+ * Adds a KeyValue into the memstore.
+ * @param kv The KeyValue to be added.
+ * @throws IOException
+ */
+ public void addToMemstore(KeyValue kv) throws IOException {
+ memstore.add(kv);
+ // flush the memstore if it's full.
+ flushMemStoreIfNecessary();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java
new file mode 100644
index 0000000..bdec887
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java
@@ -0,0 +1,41 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.mob.MobFileName;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+/**
+ * The partitioner for the sweep job.
+ * The key is a mob file name. We bucket by date.
+ */
+@InterfaceAudience.Private
+public class MobFilePathHashPartitioner extends Partitioner<Text, KeyValue> {
+
+ @Override
+ public int getPartition(Text fileName, KeyValue kv, int numPartitions) {
+ MobFileName mobFileName = MobFileName.create(fileName.toString());
+ String date = mobFileName.getDate();
+ int hash = date.hashCode();
+ return (hash & Integer.MAX_VALUE) % numPartitions;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
new file mode 100644
index 0000000..1d048bb
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
@@ -0,0 +1,550 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.MobZookeeper;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.JavaSerialization;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * The sweep job.
+ * Run map reduce to merge the smaller mob files into bigger ones and cleans the unused ones.
+ */
+@InterfaceAudience.Private
+public class SweepJob {
+
+ private final FileSystem fs;
+ private final Configuration conf;
+ private static final Log LOG = LogFactory.getLog(SweepJob.class);
+ static final String SWEEP_JOB_ID = "mob.compaction.id";
+ static final String SWEEPER_NODE = "mob.compaction.sweep.node";
+ static final String WORKING_DIR_KEY = "mob.compaction.dir";
+ static final String WORKING_ALLNAMES_FILE_KEY = "mob.compaction.all.file";
+ static final String WORKING_VISITED_DIR_KEY = "mob.compaction.visited.dir";
+ static final String WORKING_ALLNAMES_DIR = "all";
+ static final String WORKING_VISITED_DIR = "visited";
+ public static final String WORKING_FILES_DIR_KEY = "mob.compaction.files.dir";
+ //the MOB_COMPACTION_DELAY is ONE_DAY by default. Its value is only changed when testing.
+ public static final String MOB_COMPACTION_DELAY = "hbase.mob.compaction.delay";
+ protected static long ONE_DAY = 24 * 60 * 60 * 1000;
+ private long compactionStartTime = EnvironmentEdgeManager.currentTime();
+ public final static String CREDENTIALS_LOCATION = "credentials_location";
+ private CacheConfig cacheConfig;
+ static final int SCAN_CACHING = 10000;
+
+ public SweepJob(Configuration conf, FileSystem fs) {
+ this.conf = conf;
+ this.fs = fs;
+ // disable the block cache.
+ Configuration copyOfConf = new Configuration(conf);
+ copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
+ cacheConfig = new CacheConfig(copyOfConf);
+ }
+
+ /**
+ * Runs MapReduce to do the sweeping on the mob files.
+ * There's a MobReferenceOnlyFilter so that the mappers only get the cells that have mob
+ * references from 'normal' regions' rows.
+ * The running of the sweep tool on the same column family are mutually exclusive.
+ * The HBase major compaction and running of the sweep tool on the same column family
+ * are mutually exclusive.
+ * The synchronization is done by the Zookeeper.
+ * So in the beginning of the running, we need to make sure only this sweep tool is the only one
+ * that is currently running in this column family, and in this column family there're no major
+ * compaction in progress.
+ * @param tn The current table name.
+ * @param family The descriptor of the current column family.
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ public void sweep(TableName tn, HColumnDescriptor family) throws IOException,
+ ClassNotFoundException, InterruptedException, KeeperException {
+ Configuration conf = new Configuration(this.conf);
+ // check whether the current user is the same one with the owner of hbase root
+ String currentUserName = UserGroupInformation.getCurrentUser().getShortUserName();
+ FileStatus[] hbaseRootFileStat = fs.listStatus(new Path(conf.get(HConstants.HBASE_DIR)));
+ if (hbaseRootFileStat.length > 0) {
+ String owner = hbaseRootFileStat[0].getOwner();
+ if (!owner.equals(currentUserName)) {
+ String errorMsg = "The current user[" + currentUserName
+ + "] doesn't have hbase root credentials."
+ + " Please make sure the user is the root of the target HBase";
+ LOG.error(errorMsg);
+ throw new IOException(errorMsg);
+ }
+ } else {
+ LOG.error("The target HBase doesn't exist");
+ throw new IOException("The target HBase doesn't exist");
+ }
+ String familyName = family.getNameAsString();
+ String id = "SweepJob" + UUID.randomUUID().toString().replace("-", "");
+ MobZookeeper zk = MobZookeeper.newInstance(conf, id);
+ try {
+ // Try to obtain the lock. Use this lock to synchronize all the query, creation/deletion
+ // in the Zookeeper.
+ if (!zk.lockColumnFamily(tn.getNameAsString(), familyName)) {
+ LOG.warn("Can not lock the store " + familyName
+ + ". The major compaction in HBase may be in-progress. Please re-run the job.");
+ return;
+ }
+ try {
+ // Checks whether there're HBase major compaction now.
+ boolean hasChildren = zk.hasMajorCompactionChildren(tn.getNameAsString(), familyName);
+ if (hasChildren) {
+ LOG.warn("The major compaction in HBase may be in-progress."
+ + " Please re-run the job.");
+ return;
+ } else {
+ // Checks whether there's sweep tool in progress.
+ boolean hasSweeper = zk.isSweeperZNodeExist(tn.getNameAsString(), familyName);
+ if (hasSweeper) {
+ LOG.warn("Another sweep job is running");
+ return;
+ } else {
+ // add the sweeper node, mark that there's one sweep tool in progress.
+ // All the HBase major compaction and sweep tool in this column family could not
+ // run until this sweep tool is finished.
+ zk.addSweeperZNode(tn.getNameAsString(), familyName, Bytes.toBytes(id));
+ }
+ }
+ } finally {
+ zk.unlockColumnFamily(tn.getNameAsString(), familyName);
+ }
+ Job job = null;
+ try {
+ Scan scan = new Scan();
+ scan.addFamily(family.getName());
+ // Do not retrieve the mob data when scanning
+ scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+ scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
+ scan.setCaching(SCAN_CACHING);
+ scan.setCacheBlocks(false);
+ scan.setMaxVersions(family.getMaxVersions());
+ conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
+ JavaSerialization.class.getName() + "," + WritableSerialization.class.getName());
+ conf.set(SWEEP_JOB_ID, id);
+ conf.set(SWEEPER_NODE, zk.getSweeperZNodePath(tn.getNameAsString(), familyName));
+ job = prepareJob(tn, familyName, scan, conf);
+ job.getConfiguration().set(TableInputFormat.SCAN_COLUMN_FAMILY, familyName);
+ // Record the compaction start time.
+ // In the sweep tool, only the mob file whose modification time is older than
+ // (startTime - delay) could be handled by this tool.
+ // The delay is one day. It could be configured as well, but this is only used
+ // in the test.
+ job.getConfiguration().setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE,
+ compactionStartTime);
+
+ job.setPartitionerClass(MobFilePathHashPartitioner.class);
+ submit(job, tn, familyName);
+ if (job.waitForCompletion(true)) {
+ // Archive the unused mob files.
+ removeUnusedFiles(job, tn, family);
+ }
+ } finally {
+ cleanup(job, tn, familyName);
+ zk.deleteSweeperZNode(tn.getNameAsString(), familyName);
+ }
+ } finally {
+ zk.close();
+ }
+ }
+
+ /**
+ * Prepares a map reduce job.
+ * @param tn The current table name.
+ * @param familyName The current family name.
+ * @param scan The current scan.
+ * @param conf The current configuration.
+ * @return A map reduce job.
+ * @throws IOException
+ */
+ private Job prepareJob(TableName tn, String familyName, Scan scan, Configuration conf)
+ throws IOException {
+ Job job = Job.getInstance(conf);
+ job.setJarByClass(SweepMapper.class);
+ TableMapReduceUtil.initTableMapperJob(tn.getNameAsString(), scan,
+ SweepMapper.class, Text.class, Writable.class, job);
+
+ job.setInputFormatClass(TableInputFormat.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(KeyValue.class);
+ job.setReducerClass(SweepReducer.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ String jobName = getCustomJobName(this.getClass().getSimpleName(), tn.getNameAsString(),
+ familyName);
+ job.setJobName(jobName);
+ if (StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
+ String fileLoc = conf.get(CREDENTIALS_LOCATION);
+ Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf);
+ job.getCredentials().addAll(cred);
+ }
+ return job;
+ }
+
+ /**
+ * Gets a customized job name.
+ * It's className-mapperClassName-reducerClassName-tableName-familyName.
+ * @param className The current class name.
+ * @param tableName The current table name.
+ * @param familyName The current family name.
+ * @return The customized job name.
+ */
+ private static String getCustomJobName(String className, String tableName, String familyName) {
+ StringBuilder name = new StringBuilder();
+ name.append(className);
+ name.append('-').append(SweepMapper.class.getSimpleName());
+ name.append('-').append(SweepReducer.class.getSimpleName());
+ name.append('-').append(tableName);
+ name.append('-').append(familyName);
+ return name.toString();
+ }
+
+ /**
+ * Submits a job.
+ * @param job The current job.
+ * @param tn The current table name.
+ * @param familyName The current family name.
+ * @throws IOException
+ */
+ private void submit(Job job, TableName tn, String familyName) throws IOException {
+ // delete the temp directory of the mob files in case the failure in the previous
+ // execution.
+ Path tempDir =
+ new Path(MobUtils.getMobHome(job.getConfiguration()), MobConstants.TEMP_DIR_NAME);
+ Path mobCompactionTempDir =
+ new Path(tempDir, MobConstants.MOB_SWEEP_TOOL_COMPACTION_TEMP_DIR_NAME);
+ Path workingPath = MobUtils.getCompactionWorkingPath(mobCompactionTempDir, job.getJobName());
+ job.getConfiguration().set(WORKING_DIR_KEY, workingPath.toString());
+ // delete the working directory in case it'not deleted by the last running.
+ fs.delete(workingPath, true);
+ // create the working directory.
+ fs.mkdirs(workingPath);
+ // create a sequence file which contains the names of all the existing files.
+ Path workingPathOfFiles = new Path(workingPath, "files");
+ Path workingPathOfNames = new Path(workingPath, "names");
+ job.getConfiguration().set(WORKING_FILES_DIR_KEY, workingPathOfFiles.toString());
+ Path allFileNamesPath = new Path(workingPathOfNames, WORKING_ALLNAMES_DIR);
+ job.getConfiguration().set(WORKING_ALLNAMES_FILE_KEY, allFileNamesPath.toString());
+ Path vistiedFileNamesPath = new Path(workingPathOfNames, WORKING_VISITED_DIR);
+ job.getConfiguration().set(WORKING_VISITED_DIR_KEY, vistiedFileNamesPath.toString());
+ // create a file includes all the existing mob files whose creation time is older than
+ // (now - oneDay)
+ fs.create(allFileNamesPath, true);
+ // create a directory where the files contain names of visited mob files are saved.
+ fs.mkdirs(vistiedFileNamesPath);
+ Path mobStorePath = MobUtils.getMobFamilyPath(job.getConfiguration(), tn, familyName);
+ // Find all the files whose creation time are older than one day.
+ // Write those file names to a file.
+ // In each reducer there's a writer, it write the visited file names to a file which is saved
+ // in WORKING_VISITED_DIR.
+ // After the job is finished, compare those files, then find out the unused mob files and
+ // archive them.
+ FileStatus[] files = fs.listStatus(mobStorePath);
+ Set<String> fileNames = new TreeSet<String>();
+ long mobCompactionDelay = job.getConfiguration().getLong(MOB_COMPACTION_DELAY, ONE_DAY);
+ for (FileStatus fileStatus : files) {
+ if (fileStatus.isFile() && !HFileLink.isHFileLink(fileStatus.getPath())) {
+ if (compactionStartTime - fileStatus.getModificationTime() > mobCompactionDelay) {
+ // only record the potentially unused files older than one day.
+ fileNames.add(fileStatus.getPath().getName());
+ }
+ }
+ }
+ // write the names to a sequence file
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, job.getConfiguration(),
+ allFileNamesPath, String.class, String.class);
+ try {
+ for (String fileName : fileNames) {
+ writer.append(fileName, MobConstants.EMPTY_STRING);
+ }
+ } finally {
+ IOUtils.closeStream(writer);
+ }
+ }
+
+ /**
+ * Gets the unused mob files.
+ * Compare the file which contains all the existing mob files and the visited files,
+ * find out the unused mob file and archive them.
+ * @param conf The current configuration.
+ * @return The unused mob files.
+ * @throws IOException
+ */
+ List<String> getUnusedFiles(Configuration conf) throws IOException {
+ // find out the unused files and archive them
+ Path allFileNamesPath = new Path(conf.get(WORKING_ALLNAMES_FILE_KEY));
+ SequenceFile.Reader allNamesReader = null;
+ MergeSortReader visitedNamesReader = null;
+ List<String> toBeArchived = new ArrayList<String>();
+ try {
+ allNamesReader = new SequenceFile.Reader(fs, allFileNamesPath, conf);
+ visitedNamesReader = new MergeSortReader(fs, conf,
+ new Path(conf.get(WORKING_VISITED_DIR_KEY)));
+ String nextAll = (String) allNamesReader.next((String) null);
+ String nextVisited = visitedNamesReader.next();
+ do {
+ if (nextAll != null) {
+ if (nextVisited != null) {
+ int compare = nextAll.compareTo(nextVisited);
+ if (compare < 0) {
+ toBeArchived.add(nextAll);
+ nextAll = (String) allNamesReader.next((String) null);
+ } else if (compare > 0) {
+ nextVisited = visitedNamesReader.next();
+ } else {
+ nextAll = (String) allNamesReader.next((String) null);
+ nextVisited = visitedNamesReader.next();
+ }
+ } else {
+ toBeArchived.add(nextAll);
+ nextAll = (String) allNamesReader.next((String) null);
+ }
+ } else {
+ break;
+ }
+ } while (nextAll != null || nextVisited != null);
+ } finally {
+ if (allNamesReader != null) {
+ allNamesReader.close();
+ }
+ if (visitedNamesReader != null) {
+ visitedNamesReader.close();
+ }
+ }
+ return toBeArchived;
+ }
+
+ /**
+ * Archives unused mob files.
+ * @param job The current job.
+ * @param tn The current table name.
+ * @param hcd The descriptor of the current column family.
+ * @throws IOException
+ */
+ private void removeUnusedFiles(Job job, TableName tn, HColumnDescriptor hcd) throws IOException {
+ // find out the unused files and archive them
+ List<StoreFile> storeFiles = new ArrayList<StoreFile>();
+ List<String> toBeArchived = getUnusedFiles(job.getConfiguration());
+ // archive them
+ Path mobStorePath = MobUtils
+ .getMobFamilyPath(job.getConfiguration(), tn, hcd.getNameAsString());
+ for (String archiveFileName : toBeArchived) {
+ Path path = new Path(mobStorePath, archiveFileName);
+ storeFiles.add(new StoreFile(fs, path, job.getConfiguration(), cacheConfig, BloomType.NONE));
+ }
+ if (!storeFiles.isEmpty()) {
+ try {
+ MobUtils.removeMobFiles(job.getConfiguration(), fs, tn,
+ FSUtils.getTableDir(MobUtils.getMobHome(conf), tn), hcd.getName(), storeFiles);
+ LOG.info(storeFiles.size() + " unused MOB files are removed");
+ } catch (Exception e) {
+ LOG.error("Fail to archive the store files " + storeFiles, e);
+ }
+ }
+ }
+
+ /**
+ * Deletes the working directory.
+ * @param job The current job.
+ * @param store The current MobFileStore.
+ * @throws IOException
+ */
+ private void cleanup(Job job, TableName tn, String familyName) throws IOException {
+ if (job != null) {
+ // delete the working directory
+ Path workingPath = new Path(job.getConfiguration().get(WORKING_DIR_KEY));
+ try {
+ fs.delete(workingPath, true);
+ } catch (IOException e) {
+ LOG.warn("Fail to delete the working directory after sweeping store " + familyName
+ + " in the table " + tn.getNameAsString(), e);
+ }
+ }
+ }
+
+ /**
+ * A result with index.
+ */
+ private class IndexedResult implements Comparable<IndexedResult> {
+ private int index;
+ private String value;
+
+ public IndexedResult(int index, String value) {
+ this.index = index;
+ this.value = value;
+ }
+
+ public int getIndex() {
+ return this.index;
+ }
+
+ public String getValue() {
+ return this.value;
+ }
+
+ @Override
+ public int compareTo(IndexedResult o) {
+ if (this.value == null) {
+ return 0;
+ } else if (o.value == null) {
+ return 1;
+ } else {
+ return this.value.compareTo(o.value);
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof IndexedResult)) {
+ return false;
+ }
+ return compareTo((IndexedResult) obj) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return value.hashCode();
+ }
+ }
+
+ /**
+ * Merge sort reader.
+ * It merges and sort the readers in different sequence files as one where
+ * the results are read in order.
+ */
+ private class MergeSortReader {
+
+ private List<SequenceFile.Reader> readers = new ArrayList<SequenceFile.Reader>();
+ private PriorityQueue<IndexedResult> results = new PriorityQueue<IndexedResult>();
+
+ public MergeSortReader(FileSystem fs, Configuration conf, Path path) throws IOException {
+ if (fs.exists(path)) {
+ FileStatus[] files = fs.listStatus(path);
+ int index = 0;
+ for (FileStatus file : files) {
+ if (file.isFile()) {
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, file.getPath(), conf);
+ String key = (String) reader.next((String) null);
+ if (key != null) {
+ results.add(new IndexedResult(index, key));
+ readers.add(reader);
+ index++;
+ }
+ }
+ }
+ }
+ }
+
+ public String next() throws IOException {
+ IndexedResult result = results.poll();
+ if (result != null) {
+ SequenceFile.Reader reader = readers.get(result.getIndex());
+ String key = (String) reader.next((String) null);
+ if (key != null) {
+ results.add(new IndexedResult(result.getIndex(), key));
+ }
+ return result.getValue();
+ }
+ return null;
+ }
+
+ public void close() {
+ for (SequenceFile.Reader reader : readers) {
+ IOUtils.closeStream(reader);
+ }
+ }
+ }
+
+ /**
+ * The counter used in sweep job.
+ */
+ public enum SweepCounter {
+
+ /**
+ * How many files are read.
+ */
+ INPUT_FILE_COUNT,
+
+ /**
+ * How many files need to be merged or cleaned.
+ */
+ FILE_TO_BE_MERGE_OR_CLEAN,
+
+ /**
+ * How many files are left after merging.
+ */
+ FILE_AFTER_MERGE_OR_CLEAN,
+
+ /**
+ * How many records are updated.
+ */
+ RECORDS_UPDATED,
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java
new file mode 100644
index 0000000..b789332
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Tracker on the sweep tool node in zookeeper.
+ * The sweep tool node is an ephemeral one, when the process dies this node is deleted,
+ * at that time MR might be still running, and if another sweep job is started, two MR
+ * for the same column family will run at the same time.
+ * This tracker watches this ephemeral node, if it's gone or it's not created by the
+ * sweep job that owns the current MR, the current process will be aborted.
+ */
+@InterfaceAudience.Private
+public class SweepJobNodeTracker extends ZooKeeperListener {
+
+ private String node;
+ private String sweepJobId;
+
+ public SweepJobNodeTracker(ZooKeeperWatcher watcher, String node, String sweepJobId) {
+ super(watcher);
+ this.node = node;
+ this.sweepJobId = sweepJobId;
+ }
+
+ /**
+ * Registers the watcher on the sweep job node.
+ * If there's no such a sweep job node, or it's not created by the sweep job that
+ * owns the current MR, the current process will be aborted.
+ */
+ public void start() throws KeeperException {
+ watcher.registerListener(this);
+ if (ZKUtil.watchAndCheckExists(watcher, node)) {
+ byte[] data = ZKUtil.getDataAndWatch(watcher, node);
+ if (data != null) {
+ if (!sweepJobId.equals(Bytes.toString(data))) {
+ System.exit(1);
+ }
+ }
+ } else {
+ System.exit(1);
+ }
+ }
+
+ @Override
+ public void nodeDeleted(String path) {
+ // If the ephemeral node is deleted, abort the current process.
+ if (node.equals(path)) {
+ System.exit(1);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/84e957c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java
new file mode 100644
index 0000000..f508b93
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java
@@ -0,0 +1,84 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.MobZookeeper.DummyMobAbortable;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * The mapper of a sweep job.
+ * Takes the rows from the table and their results and map to <filename:Text, mobValue:KeyValue>
+ * where mobValue is the actual cell in HBase.
+ */
+@InterfaceAudience.Private
+public class SweepMapper extends TableMapper<Text, KeyValue> {
+
+ private ZooKeeperWatcher zkw = null;
+
+ @Override
+ protected void setup(Context context) throws IOException,
+ InterruptedException {
+ String id = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID);
+ String sweeperNode = context.getConfiguration().get(SweepJob.SWEEPER_NODE);
+ zkw = new ZooKeeperWatcher(context.getConfiguration(), id,
+ new DummyMobAbortable());
+ try {
+ SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, id);
+ tracker.start();
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException,
+ InterruptedException {
+ if (zkw != null) {
+ zkw.close();
+ }
+ }
+
+ @Override
+ public void map(ImmutableBytesWritable r, Result columns, Context context) throws IOException,
+ InterruptedException {
+ if (columns == null) {
+ return;
+ }
+ KeyValue[] kvList = columns.raw();
+ if (kvList == null || kvList.length == 0) {
+ return;
+ }
+ for (KeyValue kv : kvList) {
+ if (MobUtils.hasValidMobRefCellValue(kv)) {
+ String fileName = MobUtils.getMobFileName(kv);
+ context.write(new Text(fileName), kv);
+ }
+ }
+ }
+}