You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/12/07 00:36:30 UTC
svn commit: r1418131 - in /hbase/branches/0.94/src:
main/java/org/apache/hadoop/hbase/io/hfile/
main/java/org/apache/hadoop/hbase/master/handler/
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/regionserver/compactions...
Author: stack
Date: Thu Dec 6 23:36:29 2012
New Revision: 1418131
URL: http://svn.apache.org/viewvc?rev=1418131&view=rev
Log:
HBASE-7282 Backport Compaction Tool to 0.94; RETRY
Added:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java?rev=1418131&r1=1418130&r2=1418131&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java Thu Dec 6 23:36:29 2012
@@ -645,7 +645,7 @@ public class LruBlockCache implements Bl
// Log size
long totalSize = heapSize();
long freeSize = maxSize - totalSize;
- LruBlockCache.LOG.debug("LRU Stats: " +
+ LruBlockCache.LOG.debug("Stats: " +
"total=" + StringUtils.byteDesc(totalSize) + ", " +
"free=" + StringUtils.byteDesc(freeSize) + ", " +
"max=" + StringUtils.byteDesc(this.maxSize) + ", " +
@@ -653,11 +653,11 @@ public class LruBlockCache implements Bl
"accesses=" + stats.getRequestCount() + ", " +
"hits=" + stats.getHitCount() + ", " +
"hitRatio=" +
- (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) +
+ (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
"cachingAccesses=" + stats.getRequestCachingCount() + ", " +
"cachingHits=" + stats.getHitCachingCount() + ", " +
"cachingHitsRatio=" +
- (stats.getHitCachingCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2)+ ", ")) +
+ (stats.getHitCachingCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2)+ ", ")) + ", " +
"evictions=" + stats.getEvictionCount() + ", " +
"evicted=" + stats.getEvictedCount() + ", " +
"evictedPerRun=" + stats.evictedPerEviction());
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java?rev=1418131&r1=1418130&r2=1418131&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java Thu Dec 6 23:36:29 2012
@@ -142,16 +142,12 @@ public class CreateTableHandler extends
List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
final int batchSize =
this.conf.getInt("hbase.master.createtable.batchsize", 100);
- HLog hlog = null;
for (int regionIdx = 0; regionIdx < this.newRegions.length; regionIdx++) {
HRegionInfo newRegion = this.newRegions[regionIdx];
// 1. Create HRegion
HRegion region = HRegion.createHRegion(newRegion,
this.fileSystemManager.getRootDir(), this.conf,
- this.hTableDescriptor, hlog);
- if (hlog == null) {
- hlog = region.getLog();
- }
+ this.hTableDescriptor, null, false, true);
regionInfos.add(region.getRegionInfo());
if (regionIdx % batchSize == 0) {
@@ -163,7 +159,6 @@ public class CreateTableHandler extends
// 3. Close the new region to flush to disk. Close log file too.
region.close();
}
- hlog.closeAndDelete();
if (regionInfos.size() > 0) {
MetaEditor.addRegionsToMeta(this.catalogTracker, regionInfos);
}
Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java?rev=1418131&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java Thu Dec 6 23:36:29 2012
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.LineReader;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+/*
+ * The CompactionTool allows to execute a compaction specifying a:
+ * <ul>
+ * <li>table folder (all regions and families will be compacted)
+ * <li>region folder (all families in the region will be compacted)
+ * <li>family folder (the store files will be compacted)
+ * </ul>
+ */
+@InterfaceAudience.Public
+public class CompactionTool extends Configured implements Tool {
+ private static final Log LOG = LogFactory.getLog(CompactionTool.class);
+
+ private final static String CONF_TMP_DIR = "hbase.tmp.dir";
+ private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once";
+ private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete";
+ private final static String CONF_COMPLETE_COMPACTION = "hbase.hstore.compaction.complete";
+
+ /**
+ * Class responsible to execute the Compaction on the specified path.
+ * The path can be a table, region or family directory.
+ */
+ private static class CompactionWorker {
+ private final boolean keepCompactedFiles;
+ private final boolean deleteCompacted;
+ private final Configuration conf;
+ private final FileSystem fs;
+ private final Path tmpDir;
+
+ public CompactionWorker(final FileSystem fs, final Configuration conf) {
+ this.conf = conf;
+ this.keepCompactedFiles = !conf.getBoolean(CONF_COMPLETE_COMPACTION, true);
+ this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false);
+ this.tmpDir = new Path(conf.get(CONF_TMP_DIR));
+ this.fs = fs;
+ }
+
+ /**
+ * Execute the compaction on the specified path.
+ *
+ * @param path Directory path on which run a
+ * @param compactOnce Execute just a single step of compaction.
+ */
+ public void compact(final Path path, final boolean compactOnce) throws IOException {
+ if (isFamilyDir(fs, path)) {
+ Path regionDir = path.getParent();
+ Path tableDir = regionDir.getParent();
+ HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, tableDir);
+ HRegion region = loadRegion(fs, conf, htd, regionDir);
+ compactStoreFiles(region, path, compactOnce);
+ } else if (isRegionDir(fs, path)) {
+ Path tableDir = path.getParent();
+ HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, tableDir);
+ compactRegion(htd, path, compactOnce);
+ } else if (isTableDir(fs, path)) {
+ compactTable(path, compactOnce);
+ } else {
+ throw new IOException(
+ "Specified path is not a table, region or family directory. path=" + path);
+ }
+ }
+
+ private void compactTable(final Path tableDir, final boolean compactOnce)
+ throws IOException {
+ HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, tableDir);
+ LOG.info("Compact table=" + htd.getNameAsString());
+ for (Path regionDir: FSUtils.getRegionDirs(fs, tableDir)) {
+ compactRegion(htd, regionDir, compactOnce);
+ }
+ }
+
+ private void compactRegion(final HTableDescriptor htd, final Path regionDir,
+ final boolean compactOnce) throws IOException {
+ HRegion region = loadRegion(fs, conf, htd, regionDir);
+ LOG.info("Compact table=" + htd.getNameAsString() +
+ " region=" + region.getRegionNameAsString());
+ for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
+ compactStoreFiles(region, familyDir, compactOnce);
+ }
+ }
+
+ /**
+ * Execute the actual compaction job.
+ * If the compact once flag is not specified, execute the compaction until
+ * no more compactions are needed. Uses the Configuration settings provided.
+ */
+ private void compactStoreFiles(final HRegion region, final Path familyDir,
+ final boolean compactOnce) throws IOException {
+ LOG.info("Compact table=" + region.getTableDesc().getNameAsString() +
+ " region=" + region.getRegionNameAsString() +
+ " family=" + familyDir.getName());
+ Store store = getStore(region, familyDir);
+ do {
+ CompactionRequest cr = store.requestCompaction();
+ StoreFile storeFile = store.compact(cr);
+ if (storeFile != null) {
+ if (keepCompactedFiles && deleteCompacted) {
+ fs.delete(storeFile.getPath(), false);
+ }
+ }
+ } while (store.needsCompaction() && !compactOnce);
+ }
+
+ /**
+ * Create a "mock" HStore that uses the tmpDir specified by the user and
+ * the store dir to compact as source.
+ */
+ private Store getStore(final HRegion region, final Path storeDir) throws IOException {
+ byte[] familyName = Bytes.toBytes(storeDir.getName());
+ HColumnDescriptor hcd = region.getTableDesc().getFamily(familyName);
+ // Create a Store w/ check of hbase.rootdir blanked out and return our
+ // list of files instead of have Store search its home dir.
+ return new Store(tmpDir, region, hcd, fs, conf) {
+ @Override
+ public FileStatus[] getStoreFiles() throws IOException {
+ return this.fs.listStatus(getHomedir());
+ }
+
+ @Override
+ Path createStoreHomeDir(FileSystem fs, Path homedir) throws IOException {
+ return storeDir;
+ }
+ };
+ }
+
+ private static HRegion loadRegion(final FileSystem fs, final Configuration conf,
+ final HTableDescriptor htd, final Path regionDir) throws IOException {
+ Path rootDir = regionDir.getParent().getParent();
+ HRegionInfo hri = HRegion.loadDotRegionInfoFileContent(fs, regionDir);
+ return HRegion.createHRegion(hri, rootDir, conf, htd, null, false, true);
+ }
+ }
+
+ private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException {
+ Path regionInfo = new Path(path, HRegion.REGIONINFO_FILE);
+ return fs.exists(regionInfo);
+ }
+
+ private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException {
+ return FSTableDescriptors.getTableInfoPath(fs, path) != null;
+ }
+
+ private static boolean isFamilyDir(final FileSystem fs, final Path path) throws IOException {
+ return isRegionDir(fs, path.getParent());
+ }
+
+ private static class CompactionMapper
+ extends Mapper<LongWritable, Text, NullWritable, NullWritable> {
+ private CompactionWorker compactor = null;
+ private boolean compactOnce = false;
+
+ @Override
+ public void setup(Context context) {
+ Configuration conf = context.getConfiguration();
+ compactOnce = conf.getBoolean(CONF_COMPACT_ONCE, false);
+
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ this.compactor = new CompactionWorker(fs, conf);
+ } catch (IOException e) {
+ throw new RuntimeException("Could not get the input FileSystem", e);
+ }
+ }
+
+ @Override
+ public void map(LongWritable key, Text value, Context context)
+ throws InterruptedException, IOException {
+ Path path = new Path(value.toString());
+ this.compactor.compact(path, compactOnce);
+ }
+ }
+
+ /**
+ * Input format that uses store files block location as input split locality.
+ */
+ private static class CompactionInputFormat extends TextInputFormat {
+ @Override
+ protected boolean isSplitable(JobContext context, Path file) {
+ return true;
+ }
+
+ /**
+ * Returns a split for each store files directory using the block location
+ * of each file as locality reference.
+ */
+ @Override
+ public List<InputSplit> getSplits(JobContext job) throws IOException {
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ List<FileStatus> files = listStatus(job);
+
+ Text key = new Text();
+ for (FileStatus file: files) {
+ Path path = file.getPath();
+ FileSystem fs = path.getFileSystem(job.getConfiguration());
+ LineReader reader = new LineReader(fs.open(path));
+ long pos = 0;
+ int n;
+ try {
+ while ((n = reader.readLine(key)) > 0) {
+ String[] hosts = getStoreDirHosts(fs, path);
+ splits.add(new FileSplit(path, pos, n, hosts));
+ pos += n;
+ }
+ } finally {
+ reader.close();
+ }
+ }
+
+ return splits;
+ }
+
+ /**
+ * return the top hosts of the store files, used by the Split
+ */
+ private static String[] getStoreDirHosts(final FileSystem fs, final Path path)
+ throws IOException {
+ FileStatus[] files = FSUtils.listStatus(fs, path, null);
+ if (files == null) {
+ return new String[] {};
+ }
+
+ HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
+ for (FileStatus hfileStatus: files) {
+ HDFSBlocksDistribution storeFileBlocksDistribution =
+ FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0, hfileStatus.getLen());
+ hdfsBlocksDistribution.add(storeFileBlocksDistribution);
+ }
+
+ List<String> hosts = hdfsBlocksDistribution.getTopHosts();
+ return hosts.toArray(new String[hosts.size()]);
+ }
+
+ /**
+ * Create the input file for the given directories to compact.
+ * The file is a TextFile with each line corrisponding to a
+ * store files directory to compact.
+ */
+ public static void createInputFile(final FileSystem fs, final Path path,
+ final Set<Path> toCompactDirs) throws IOException {
+ // Extract the list of store dirs
+ List<Path> storeDirs = new LinkedList<Path>();
+ for (Path compactDir: toCompactDirs) {
+ if (isFamilyDir(fs, compactDir)) {
+ storeDirs.add(compactDir);
+ } else if (isRegionDir(fs, compactDir)) {
+ for (Path familyDir: FSUtils.getFamilyDirs(fs, compactDir)) {
+ storeDirs.add(familyDir);
+ }
+ } else if (isTableDir(fs, compactDir)) {
+ // Lookup regions
+ for (Path regionDir: FSUtils.getRegionDirs(fs, compactDir)) {
+ for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
+ storeDirs.add(familyDir);
+ }
+ }
+ } else {
+ throw new IOException(
+ "Specified path is not a table, region or family directory. path=" + compactDir);
+ }
+ }
+
+ // Write Input File
+ FSDataOutputStream stream = fs.create(path);
+ LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact.");
+ try {
+ final byte[] newLine = Bytes.toBytes("\n");
+ for (Path storeDir: storeDirs) {
+ stream.write(Bytes.toBytes(storeDir.toString()));
+ stream.write(newLine);
+ }
+ } finally {
+ stream.close();
+ }
+ }
+ }
+
+ /**
+ * Execute compaction, using a Map-Reduce job.
+ */
+ private int doMapReduce(final FileSystem fs, final Set<Path> toCompactDirs,
+ final boolean compactOnce) throws Exception {
+ Configuration conf = getConf();
+ conf.setBoolean(CONF_COMPACT_ONCE, compactOnce);
+
+ Job job = new Job(conf);
+ job.setJobName("CompactionTool");
+ job.setJarByClass(CompactionTool.class);
+ job.setMapperClass(CompactionMapper.class);
+ job.setInputFormatClass(CompactionInputFormat.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ job.setMapSpeculativeExecution(false);
+ job.setNumReduceTasks(0);
+
+ String stagingName = "compact-" + EnvironmentEdgeManager.currentTimeMillis();
+ Path stagingDir = new Path(conf.get(CONF_TMP_DIR), stagingName);
+ fs.mkdirs(stagingDir);
+ try {
+ // Create input file with the store dirs
+ Path inputPath = new Path(stagingDir, stagingName);
+ CompactionInputFormat.createInputFile(fs, inputPath, toCompactDirs);
+ CompactionInputFormat.addInputPath(job, inputPath);
+
+ // Initialize credential for secure cluster
+ TableMapReduceUtil.initCredentials(job);
+
+ // Start the MR Job and wait
+ return job.waitForCompletion(true) ? 0 : 1;
+ } finally {
+ fs.delete(stagingDir, true);
+ }
+ }
+
+ /**
+ * Execute compaction, from this client, one path at the time.
+ */
+ private int doClient(final FileSystem fs, final Set<Path> toCompactDirs,
+ final boolean compactOnce) throws IOException {
+ CompactionWorker worker = new CompactionWorker(fs, getConf());
+ for (Path path: toCompactDirs) {
+ worker.compact(path, compactOnce);
+ }
+ return 0;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Set<Path> toCompactDirs = new HashSet<Path>();
+ boolean compactOnce = false;
+ boolean mapred = false;
+
+ Configuration conf = getConf();
+ FileSystem fs = FileSystem.get(conf);
+
+ try {
+ for (int i = 0; i < args.length; ++i) {
+ String opt = args[i];
+ if (opt.equals("-compactOnce")) {
+ compactOnce = true;
+ } else if (opt.equals("-mapred")) {
+ mapred = true;
+ } else if (!opt.startsWith("-")) {
+ Path path = new Path(opt);
+ FileStatus status = fs.getFileStatus(path);
+ if (!status.isDir()) {
+ printUsage("Specified path is not a directory. path=" + path);
+ return 1;
+ }
+ toCompactDirs.add(path);
+ } else {
+ printUsage();
+ }
+ }
+ } catch (Exception e) {
+ printUsage(e.getMessage());
+ return 1;
+ }
+
+ if (toCompactDirs.size() == 0) {
+ printUsage("No directories to compact specified.");
+ return 1;
+ }
+
+ // Execute compaction!
+ if (mapred) {
+ return doMapReduce(fs, toCompactDirs, compactOnce);
+ } else {
+ return doClient(fs, toCompactDirs, compactOnce);
+ }
+ }
+
+ private void printUsage() {
+ printUsage(null);
+ }
+
+ private void printUsage(final String message) {
+ if (message != null && message.length() > 0) {
+ System.err.println(message);
+ }
+ System.err.println("Usage: java " + this.getClass().getName() + " \\");
+ System.err.println(" [-compactOnce] [-mapred] [-D<property=value>]* files...");
+ System.err.println();
+ System.err.println("Options:");
+ System.err.println(" mapred Use MapReduce to run compaction.");
+ System.err.println(" compactOnce Execute just one compaction step. (default: while needed)");
+ System.err.println();
+ System.err.println("Note: -D properties will be applied to the conf used. ");
+ System.err.println("For example: ");
+ System.err.println(" To preserve input files, pass -D"+CONF_COMPLETE_COMPACTION+"=false");
+ System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false");
+ System.err.println(" To set tmp dir, pass -D"+CONF_TMP_DIR+"=ALTERNATE_DIR");
+ System.err.println();
+ System.err.println("Examples:");
+ System.err.println(" To compact the full 'TestTable' using MapReduce:");
+ System.err.println(" $ bin/hbase " + this.getClass().getName() + " -mapred hdfs:///hbase/TestTable");
+ System.err.println();
+ System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':");
+ System.err.println(" $ bin/hbase " + this.getClass().getName() + " hdfs:///hbase/TestTable/abc/x");
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(HBaseConfiguration.create(), new CompactionTool(), args));
+ }
+}
\ No newline at end of file
Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java?rev=1418131&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java Thu Dec 6 23:36:29 2012
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Compact passed set of files.
+ * Create an instance and then call {@ink #compact(Store, Collection, boolean, long)}.
+ */
+@InterfaceAudience.Private
+class Compactor extends Configured {
+ private static final Log LOG = LogFactory.getLog(Compactor.class);
+ private CompactionProgress progress;
+
+ Compactor(final Configuration c) {
+ super(c);
+ }
+
+ /**
+ * Do a minor/major compaction on an explicit set of storefiles from a Store.
+ *
+ * @param store Store the files belong to
+ * @param filesToCompact which files to compact
+ * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
+ * @param maxId Readers maximum sequence id.
+ * @return Product of compaction or null if all cells expired or deleted and
+ * nothing made it through the compaction.
+ * @throws IOException
+ */
+ StoreFile.Writer compact(final Store store,
+ final Collection<StoreFile> filesToCompact,
+ final boolean majorCompaction, final long maxId)
+ throws IOException {
+ // Calculate maximum key count after compaction (for blooms)
+ // Also calculate earliest put timestamp if major compaction
+ int maxKeyCount = 0;
+ long earliestPutTs = HConstants.LATEST_TIMESTAMP;
+ for (StoreFile file : filesToCompact) {
+ StoreFile.Reader r = file.getReader();
+ if (r == null) {
+ LOG.warn("Null reader for " + file.getPath());
+ continue;
+ }
+ // NOTE: getFilterEntries could cause under-sized blooms if the user
+ // switches bloom type (e.g. from ROW to ROWCOL)
+ long keyCount = (r.getBloomFilterType() == store.getFamily().getBloomFilterType()) ?
+ r.getFilterEntries() : r.getEntries();
+ maxKeyCount += keyCount;
+ // For major compactions calculate the earliest put timestamp
+ // of all involved storefiles. This is used to remove
+ // family delete marker during the compaction.
+ if (majorCompaction) {
+ byte[] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
+ if (tmp == null) {
+ // There's a file with no information, must be an old one
+ // assume we have very old puts
+ earliestPutTs = HConstants.OLDEST_TIMESTAMP;
+ } else {
+ earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Compacting " + file +
+ ", keycount=" + keyCount +
+ ", bloomtype=" + r.getBloomFilterType().toString() +
+ ", size=" + StringUtils.humanReadableInt(r.length()) +
+ ", encoding=" + r.getHFileReader().getEncodingOnDisk() +
+ (majorCompaction? ", earliestPutTs=" + earliestPutTs: ""));
+ }
+ }
+
+ // keep track of compaction progress
+ this.progress = new CompactionProgress(maxKeyCount);
+ // Get some configs
+ int compactionKVMax = getConf().getInt("hbase.hstore.compaction.kv.max", 10);
+ Compression.Algorithm compression = store.getFamily().getCompression();
+ // Avoid overriding compression setting for major compactions if the user
+ // has not specified it separately
+ Compression.Algorithm compactionCompression =
+ (store.getFamily().getCompactionCompression() != Compression.Algorithm.NONE) ?
+ store.getFamily().getCompactionCompression(): compression;
+
+ // For each file, obtain a scanner:
+ List<StoreFileScanner> scanners = StoreFileScanner
+ .getScannersForStoreFiles(filesToCompact, false, false, true);
+
+ // Make the instantiation lazy in case compaction produces no product; i.e.
+ // where all source cells are expired or deleted.
+ StoreFile.Writer writer = null;
+ // Find the smallest read point across all the Scanners.
+ long smallestReadPoint = store.getHRegion().getSmallestReadPoint();
+ MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
+ try {
+ InternalScanner scanner = null;
+ try {
+ if (store.getHRegion().getCoprocessorHost() != null) {
+ scanner = store.getHRegion()
+ .getCoprocessorHost()
+ .preCompactScannerOpen(store, scanners,
+ majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs);
+ }
+ if (scanner == null) {
+ Scan scan = new Scan();
+ scan.setMaxVersions(store.getFamily().getMaxVersions());
+ /* Include deletes, unless we are doing a major compaction */
+ scanner = new StoreScanner(store, store.getScanInfo(), scan, scanners,
+ majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
+ smallestReadPoint, earliestPutTs);
+ }
+ if (store.getHRegion().getCoprocessorHost() != null) {
+ InternalScanner cpScanner =
+ store.getHRegion().getCoprocessorHost().preCompact(store, scanner);
+ // NULL scanner returned from coprocessor hooks means skip normal processing
+ if (cpScanner == null) {
+ return null;
+ }
+ scanner = cpScanner;
+ }
+
+ int bytesWritten = 0;
+ // since scanner.next() can return 'false' but still be delivering data,
+ // we have to use a do/while loop.
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+ // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
+ boolean hasMore;
+ do {
+ hasMore = scanner.next(kvs, compactionKVMax);
+ if (writer == null && !kvs.isEmpty()) {
+ writer = store.createWriterInTmp(maxKeyCount, compactionCompression, true);
+ }
+ if (writer != null) {
+ // output to writer:
+ for (KeyValue kv : kvs) {
+ if (kv.getMemstoreTS() <= smallestReadPoint) {
+ kv.setMemstoreTS(0);
+ }
+ writer.append(kv);
+ // update progress per key
+ ++progress.currentCompactedKVs;
+
+ // check periodically to see if a system stop is requested
+ if (Store.closeCheckInterval > 0) {
+ bytesWritten += kv.getLength();
+ if (bytesWritten > Store.closeCheckInterval) {
+ bytesWritten = 0;
+ isInterrupted(store, writer);
+ }
+ }
+ }
+ }
+ kvs.clear();
+ } while (hasMore);
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ }
+ } finally {
+ if (writer != null) {
+ writer.appendMetadata(maxId, majorCompaction);
+ writer.close();
+ }
+ }
+ return writer;
+ }
+
+ void isInterrupted(final Store store, final StoreFile.Writer writer)
+ throws IOException {
+ if (store.getHRegion().areWritesEnabled()) return;
+ // Else cleanup.
+ writer.close();
+ store.getFileSystem().delete(writer.getPath(), false);
+ throw new InterruptedIOException( "Aborting compaction of store " + store +
+ " in region " + store.getHRegion() + " because user requested stop.");
+ }
+
+ CompactionProgress getProgress() {
+ return this.progress;
+ }
+}
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1418131&r1=1418130&r2=1418131&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Dec 6 23:36:29 2012
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.EOFException;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException;
@@ -62,6 +63,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -230,12 +232,12 @@ public class HRegion implements HeapSize
* The directory for the table this region is part of.
* This directory contains the directory for this region.
*/
- final Path tableDir;
+ private final Path tableDir;
- final HLog log;
- final FileSystem fs;
- final Configuration conf;
- final int rowLockWaitDuration;
+ private final HLog log;
+ private final FileSystem fs;
+ private final Configuration conf;
+ private final int rowLockWaitDuration;
static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
// The internal wait duration to acquire a lock before read/update
@@ -256,8 +258,8 @@ public class HRegion implements HeapSize
// purge timeout, when a RPC call will be terminated by the RPC engine.
final long maxBusyWaitDuration;
- final HRegionInfo regionInfo;
- final Path regiondir;
+ private final HRegionInfo regionInfo;
+ private final Path regiondir;
KeyValue.KVComparator comparator;
private ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
@@ -724,7 +726,7 @@ public class HRegion implements HeapSize
public long addAndGetGlobalMemstoreSize(long memStoreSize) {
if (this.rsAccounting != null) {
rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
- }
+ }
return this.memstoreSize.getAndAdd(memStoreSize);
}
@@ -750,7 +752,7 @@ public class HRegion implements HeapSize
// and then create the file
Path tmpPath = new Path(getTmpDir(), REGIONINFO_FILE);
-
+
// if datanode crashes or if the RS goes down just before the close is called while trying to
// close the created regioninfo file in the .tmp directory then on next
// creation we will be getting AlreadyCreatedException.
@@ -758,7 +760,7 @@ public class HRegion implements HeapSize
if (FSUtils.isExists(fs, tmpPath)) {
FSUtils.delete(fs, tmpPath, true);
}
-
+
FSDataOutputStream out = FSUtils.create(fs, tmpPath, perms);
try {
@@ -775,6 +777,26 @@ public class HRegion implements HeapSize
}
}
+ /**
+ * @param fs
+ * @param dir
+ * @return An HRegionInfo instance gotten from the <code>.regioninfo</code> file under region dir
+ * @throws IOException
+ */
+ public static HRegionInfo loadDotRegionInfoFileContent(final FileSystem fs, final Path dir)
+ throws IOException {
+ Path regioninfo = new Path(dir, HRegion.REGIONINFO_FILE);
+ if (!fs.exists(regioninfo)) throw new FileNotFoundException(regioninfo.toString());
+ FSDataInputStream in = fs.open(regioninfo);
+ try {
+ HRegionInfo hri = new HRegionInfo();
+ hri.readFields(in);
+ return hri;
+ } finally {
+ in.close();
+ }
+ }
+
/** @return a HRegionInfo object for this region */
public HRegionInfo getRegionInfo() {
return this.regionInfo;
@@ -1021,19 +1043,16 @@ public class HRegion implements HeapSize
return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
}
- private ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
+ static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
final String threadNamePrefix) {
- ThreadPoolExecutor openAndCloseThreadPool = Threads
- .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
- new ThreadFactory() {
- private int count = 1;
-
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, threadNamePrefix + "-" + count++);
- return t;
- }
- });
- return openAndCloseThreadPool;
+ return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
+ new ThreadFactory() {
+ private int count = 1;
+
+ public Thread newThread(Runnable r) {
+ return new Thread(r, threadNamePrefix + "-" + count++);
+ }
+ });
}
/**
@@ -1979,7 +1998,7 @@ public class HRegion implements HeapSize
System.arraycopy(putsAndLocks, 0, mutationsAndLocks, 0, putsAndLocks.length);
return batchMutate(mutationsAndLocks);
}
-
+
/**
* Perform a batch of mutations.
* It supports only Put and Delete mutations and will ignore other types passed.
@@ -2333,7 +2352,7 @@ public class HRegion implements HeapSize
// do after lock
final long netTimeMs = EnvironmentEdgeManager.currentTimeMillis()- startTimeMs;
-
+
// See if the column families were consistent through the whole thing.
// if they were then keep them. If they were not then pass a null.
// null will be treated as unknown.
@@ -2636,7 +2655,7 @@ public class HRegion implements HeapSize
// do after lock
final long after = EnvironmentEdgeManager.currentTimeMillis();
this.opMetrics.updatePutMetrics(familyMap.keySet(), after - now);
-
+
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@@ -3759,6 +3778,7 @@ public class HRegion implements HeapSize
* @param conf
* @param hTableDescriptor
* @param hlog shared HLog
+ * @param boolean initialize - true to initialize the region
* @return new HRegion
*
* @throws IOException
@@ -3766,7 +3786,36 @@ public class HRegion implements HeapSize
public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
final Configuration conf,
final HTableDescriptor hTableDescriptor,
- final HLog hlog)
+ final HLog hlog,
+ final boolean initialize)
+ throws IOException {
+ return createHRegion(info, rootDir, conf, hTableDescriptor,
+ hlog, initialize, false);
+ }
+
+ /**
+ * Convenience method creating new HRegions. Used by createTable.
+ * The {@link HLog} for the created region needs to be closed
+ * explicitly, if it is not null.
+ * Use {@link HRegion#getLog()} to get access.
+ *
+ * @param info Info for region to create.
+ * @param rootDir Root directory for HBase instance
+ * @param conf
+ * @param hTableDescriptor
+ * @param hlog shared HLog
+ * @param boolean initialize - true to initialize the region
+ * @param boolean ignoreHLog
+ - true to skip generate new hlog if it is null, mostly for createTable
+ * @return new HRegion
+ *
+ * @throws IOException
+ */
+ public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
+ final Configuration conf,
+ final HTableDescriptor hTableDescriptor,
+ final HLog hlog,
+ final boolean initialize, final boolean ignoreHLog)
throws IOException {
LOG.info("creating HRegion " + info.getTableNameAsString()
+ " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
@@ -3778,16 +3827,26 @@ public class HRegion implements HeapSize
FileSystem fs = FileSystem.get(conf);
fs.mkdirs(regionDir);
HLog effectiveHLog = hlog;
- if (hlog == null) {
+ if (hlog == null && !ignoreHLog) {
effectiveHLog = new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME),
new Path(regionDir, HConstants.HREGION_OLDLOGDIR_NAME), conf);
}
HRegion region = HRegion.newHRegion(tableDir,
effectiveHLog, fs, conf, info, hTableDescriptor, null);
- region.initialize();
+ if (initialize) {
+ region.initialize();
+ }
return region;
}
+ public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
+ final Configuration conf,
+ final HTableDescriptor hTableDescriptor,
+ final HLog hlog)
+ throws IOException {
+ return createHRegion(info, rootDir, conf, hTableDescriptor, hlog, true);
+ }
+
/**
* Open a Region.
* @param info Info for region to be opened.
@@ -4299,7 +4358,7 @@ public class HRegion implements HeapSize
// do after lock
final long after = EnvironmentEdgeManager.currentTimeMillis();
this.opMetrics.updateGetMetrics(get.familySet(), after - now);
-
+
return results;
}
@@ -4627,10 +4686,10 @@ public class HRegion implements HeapSize
closeRegionOperation();
}
-
+
long after = EnvironmentEdgeManager.currentTimeMillis();
this.opMetrics.updateAppendMetrics(append.getFamilyMap().keySet(), after - before);
-
+
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@@ -4755,7 +4814,7 @@ public class HRegion implements HeapSize
long after = EnvironmentEdgeManager.currentTimeMillis();
this.opMetrics.updateIncrementMetrics(increment.getFamilyMap().keySet(), after - before);
}
-
+
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@@ -5248,7 +5307,7 @@ public class HRegion implements HeapSize
*/
private void recordPutWithoutWal(final Map<byte [], List<KeyValue>> familyMap) {
if (numPutsWithoutWAL.getAndIncrement() == 0) {
- LOG.info("writing data to region " + this +
+ LOG.info("writing data to region " + this +
" with WAL disabled. Data may be lost in the event of a crash.");
}
@@ -5360,11 +5419,11 @@ public class HRegion implements HeapSize
final HLog log = new HLog(fs, logdir, oldLogDir, c);
try {
processTable(fs, tableDir, log, c, majorCompact);
- } finally {
+ } finally {
log.close();
// TODO: is this still right?
BlockCache bc = new CacheConfig(c).getBlockCache();
if (bc != null) bc.shutdown();
- }
+ }
}
}
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1418131&r1=1418130&r2=1418131&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Dec 6 23:36:29 2012
@@ -20,7 +20,6 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -132,9 +131,6 @@ public class Store extends SchemaConfigu
private volatile long totalUncompressedBytes = 0L;
private final Object flushLock = new Object();
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- private final String storeNameStr;
- private CompactionProgress progress;
- private final int compactionKVMax;
private final boolean verifyBulkLoads;
/* The default priority for user-specified compaction requests.
@@ -158,10 +154,6 @@ public class Store extends SchemaConfigu
new CopyOnWriteArraySet<ChangedReadersObserver>();
private final int blocksize;
- /** Compression algorithm for flush files and minor compaction */
- private final Compression.Algorithm compression;
- /** Compression algorithm for major compaction */
- private final Compression.Algorithm compactionCompression;
private HFileDataBlockEncoder dataBlockEncoder;
/** Checksum configuration */
@@ -171,6 +163,8 @@ public class Store extends SchemaConfigu
// Comparing KeyValues
final KeyValue.KVComparator comparator;
+ private final Compactor compactor;
+
/**
* Constructor
* @param basedir qualified path under which the region directory lives;
@@ -185,25 +179,16 @@ public class Store extends SchemaConfigu
protected Store(Path basedir, HRegion region, HColumnDescriptor family,
FileSystem fs, Configuration conf)
throws IOException {
- super(conf, region.getTableDesc().getNameAsString(),
+ super(conf, region.getRegionInfo().getTableNameAsString(),
Bytes.toString(family.getName()));
- HRegionInfo info = region.regionInfo;
+ HRegionInfo info = region.getRegionInfo();
this.fs = fs;
- this.homedir = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
- if (!this.fs.exists(this.homedir)) {
- if (!this.fs.mkdirs(this.homedir))
- throw new IOException("Failed create of: " + this.homedir.toString());
- }
+ Path p = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
+ this.homedir = createStoreHomeDir(this.fs, p);
this.region = region;
this.family = family;
this.conf = conf;
this.blocksize = family.getBlocksize();
- this.compression = family.getCompression();
- // avoid overriding compression setting for major compactions if the user
- // has not specified it separately
- this.compactionCompression =
- (family.getCompactionCompression() != Compression.Algorithm.NONE) ?
- family.getCompactionCompression() : this.compression;
this.dataBlockEncoder =
new HFileDataBlockEncoderImpl(family.getDataBlockEncodingOnDisk(),
@@ -228,7 +213,6 @@ public class Store extends SchemaConfigu
"ms in store " + this);
scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
this.memstore = new MemStore(conf, this.comparator);
- this.storeNameStr = getColumnFamilyName();
// By default, compact if storefile.count >= minFilesToCompact
this.minFilesToCompact = Math.max(2,
@@ -245,10 +229,8 @@ public class Store extends SchemaConfigu
this.region.memstoreFlushSize);
this.maxCompactSize
= conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
- this.compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
- this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify",
- false);
+ this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
if (Store.closeCheckInterval == 0) {
Store.closeCheckInterval = conf.getInt(
@@ -260,6 +242,47 @@ public class Store extends SchemaConfigu
this.checksumType = getChecksumType(conf);
// initilize bytes per checksum
this.bytesPerChecksum = getBytesPerChecksum(conf);
+ // Create a compaction tool instance
+ this.compactor = new Compactor(this.conf);
+ }
+
+ /**
+ * @param family
+ * @return
+ */
+ long getTTL(final HColumnDescriptor family) {
+ // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds.
+ long ttl = family.getTimeToLive();
+ if (ttl == HConstants.FOREVER) {
+ // Default is unlimited ttl.
+ ttl = Long.MAX_VALUE;
+ } else if (ttl == -1) {
+ ttl = Long.MAX_VALUE;
+ } else {
+ // Second -> ms adjust for user data
+ ttl *= 1000;
+ }
+ return ttl;
+ }
+
+ /**
+ * Create this store's homedir
+ * @param fs
+ * @param homedir
+ * @return Return <code>homedir</code>
+ * @throws IOException
+ */
+ Path createStoreHomeDir(final FileSystem fs,
+ final Path homedir) throws IOException {
+ if (!fs.exists(homedir)) {
+ if (!fs.mkdirs(homedir))
+ throw new IOException("Failed create of: " + homedir.toString());
+ }
+ return homedir;
+ }
+
+ FileSystem getFileSystem() {
+ return this.fs;
}
/**
@@ -320,7 +343,7 @@ public class Store extends SchemaConfigu
* Return the directory in which this store stores its
* StoreFiles
*/
- public Path getHomedir() {
+ Path getHomedir() {
return homedir;
}
@@ -339,6 +362,10 @@ public class Store extends SchemaConfigu
this.dataBlockEncoder = blockEncoder;
}
+ FileStatus [] getStoreFiles() throws IOException {
+ return FSUtils.listStatus(this.fs, this.homedir, null);
+ }
+
/**
* Creates an unsorted list of StoreFile loaded in parallel
* from the given directory.
@@ -346,7 +373,7 @@ public class Store extends SchemaConfigu
*/
private List<StoreFile> loadStoreFiles() throws IOException {
ArrayList<StoreFile> results = new ArrayList<StoreFile>();
- FileStatus files[] = FSUtils.listStatus(this.fs, this.homedir, null);
+ FileStatus files[] = getStoreFiles();
if (files == null || files.length == 0) {
return results;
@@ -637,7 +664,7 @@ public class Store extends SchemaConfigu
storeFileCloserThreadPool.shutdownNow();
}
}
- LOG.debug("closed " + this.storeNameStr);
+ LOG.info("Closed " + this);
return result;
} finally {
this.lock.writeLock().unlock();
@@ -723,6 +750,7 @@ public class Store extends SchemaConfigu
scanner = cpScanner;
}
try {
+ int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
// TODO: We can fail in the below block before we complete adding this
// flush to list of store files. Add cleanup of anything put on filesystem
// if we fail.
@@ -736,7 +764,7 @@ public class Store extends SchemaConfigu
List<KeyValue> kvs = new ArrayList<KeyValue>();
boolean hasMore;
do {
- hasMore = scanner.next(kvs, this.compactionKVMax);
+ hasMore = scanner.next(kvs, compactionKVMax);
if (!kvs.isEmpty()) {
for (KeyValue kv : kvs) {
// If we know that this KV is going to be included always, then let us
@@ -828,7 +856,7 @@ public class Store extends SchemaConfigu
*/
private StoreFile.Writer createWriterInTmp(int maxKeyCount)
throws IOException {
- return createWriterInTmp(maxKeyCount, this.compression, false);
+ return createWriterInTmp(maxKeyCount, this.family.getCompression(), false);
}
/*
@@ -981,16 +1009,12 @@ public class Store extends SchemaConfigu
* @param cr
* compaction details obtained from requestCompaction()
* @throws IOException
+ * @return Storefile we compacted into or null if we failed or opted out early.
*/
- void compact(CompactionRequest cr) throws IOException {
- if (cr == null || cr.getFiles().isEmpty()) {
- return;
- }
- Preconditions.checkArgument(cr.getStore().toString()
- .equals(this.toString()));
-
+ StoreFile compact(CompactionRequest cr) throws IOException {
+ if (cr == null || cr.getFiles().isEmpty()) return null;
+ Preconditions.checkArgument(cr.getStore().toString().equals(this.toString()));
List<StoreFile> filesToCompact = cr.getFiles();
-
synchronized (filesCompacting) {
// sanity check: we're compacting files that this store knows about
// TODO: change this to LOG.error() after more debugging
@@ -1002,19 +1026,26 @@ public class Store extends SchemaConfigu
// Ready to go. Have list of files to compact.
LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
- + this.storeNameStr + " of "
+ + this + " of "
+ this.region.getRegionInfo().getRegionNameAsString()
+ " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
+ StringUtils.humanReadableInt(cr.getSize()));
StoreFile sf = null;
try {
- StoreFile.Writer writer = compactStore(filesToCompact, cr.isMajor(),
- maxId);
+ StoreFile.Writer writer =
+ this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId);
// Move the compaction into place.
- sf = completeCompaction(filesToCompact, writer);
- if (region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().postCompact(this, sf);
+ if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
+ sf = completeCompaction(filesToCompact, writer);
+ if (region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().postCompact(this, sf);
+ }
+ } else {
+ // Create storefile around what we wrote with a reader on it.
+ sf = new StoreFile(this.fs, writer.getPath(), this.conf, this.cacheConf,
+ this.family.getBloomFilterType(), this.dataBlockEncoder);
+ sf.createReader();
}
} finally {
synchronized (filesCompacting) {
@@ -1023,7 +1054,7 @@ public class Store extends SchemaConfigu
}
LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
- + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
+ + filesToCompact.size() + " file(s) in " + this + " of "
+ this.region.getRegionInfo().getRegionNameAsString()
+ " into " +
(sf == null ? "none" : sf.getPath().getName()) +
@@ -1031,6 +1062,7 @@ public class Store extends SchemaConfigu
StringUtils.humanReadableInt(sf.getReader().length()))
+ "; total size for store is "
+ StringUtils.humanReadableInt(storeSize));
+ return sf;
}
/**
@@ -1070,7 +1102,8 @@ public class Store extends SchemaConfigu
try {
// Ready to go. Have list of files to compact.
- StoreFile.Writer writer = compactStore(filesToCompact, isMajor, maxId);
+ StoreFile.Writer writer =
+ this.compactor.compact(this, filesToCompact, isMajor, maxId);
// Move the compaction into place.
StoreFile sf = completeCompaction(filesToCompact, writer);
if (region.getCoprocessorHost() != null) {
@@ -1119,10 +1152,10 @@ public class Store extends SchemaConfigu
}
/** getter for CompactionProgress object
- * @return CompactionProgress object
+ * @return CompactionProgress object; can be null
*/
public CompactionProgress getCompactionProgress() {
- return this.progress;
+ return this.compactor.getProgress();
}
/*
@@ -1174,19 +1207,19 @@ public class Store extends SchemaConfigu
if (sf.isMajorCompaction() &&
(this.ttl == HConstants.FOREVER || oldest < this.ttl)) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping major compaction of " + this.storeNameStr +
+ LOG.debug("Skipping major compaction of " + this +
" because one (major) compacted file only and oldestTime " +
oldest + "ms is < ttl=" + this.ttl);
}
} else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) {
- LOG.debug("Major compaction triggered on store " + this.storeNameStr +
+ LOG.debug("Major compaction triggered on store " + this +
", because keyvalues outdated; time since last major compaction " +
(now - lowTimestamp) + "ms");
result = true;
}
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("Major compaction triggered on store " + this.storeNameStr +
+ LOG.debug("Major compaction triggered on store " + this +
"; time since last major compaction " + (now - lowTimestamp) + "ms");
}
result = true;
@@ -1376,12 +1409,12 @@ public class Store extends SchemaConfigu
compactSelection.getFilesToCompact().get(pos).getReader().length()
> maxCompactSize &&
!compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;
- compactSelection.clearSubList(0, pos);
+ if (pos != 0) compactSelection.clearSubList(0, pos);
}
if (compactSelection.getFilesToCompact().isEmpty()) {
LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
- this.storeNameStr + ": no store files to compact");
+ this + ": no store files to compact");
compactSelection.emptyFileList();
return compactSelection;
}
@@ -1468,7 +1501,7 @@ public class Store extends SchemaConfigu
// if we don't have enough files to compact, just wait
if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Skipped compaction of " + this.storeNameStr
+ LOG.debug("Skipped compaction of " + this
+ ". Only " + (end - start) + " file(s) of size "
+ StringUtils.humanReadableInt(totalSize)
+ " have met compaction criteria.");
@@ -1495,149 +1528,6 @@ public class Store extends SchemaConfigu
}
/**
- * Do a minor/major compaction on an explicit set of storefiles in a Store.
- * Uses the scan infrastructure to make it easy.
- *
- * @param filesToCompact which files to compact
- * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
- * @param maxId Readers maximum sequence id.
- * @return Product of compaction or null if all cells expired or deleted and
- * nothing made it through the compaction.
- * @throws IOException
- */
- StoreFile.Writer compactStore(final Collection<StoreFile> filesToCompact,
- final boolean majorCompaction, final long maxId)
- throws IOException {
- // calculate maximum key count after compaction (for blooms)
- int maxKeyCount = 0;
- long earliestPutTs = HConstants.LATEST_TIMESTAMP;
- for (StoreFile file : filesToCompact) {
- StoreFile.Reader r = file.getReader();
- if (r != null) {
- // NOTE: getFilterEntries could cause under-sized blooms if the user
- // switches bloom type (e.g. from ROW to ROWCOL)
- long keyCount = (r.getBloomFilterType() == family.getBloomFilterType())
- ? r.getFilterEntries() : r.getEntries();
- maxKeyCount += keyCount;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Compacting " + file +
- ", keycount=" + keyCount +
- ", bloomtype=" + r.getBloomFilterType().toString() +
- ", size=" + StringUtils.humanReadableInt(r.length()) +
- ", encoding=" + r.getHFileReader().getEncodingOnDisk());
- }
- }
- // For major compactions calculate the earliest put timestamp
- // of all involved storefiles. This is used to remove
- // family delete marker during the compaction.
- if (majorCompaction) {
- byte[] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
- if (tmp == null) {
- // there's a file with no information, must be an old one
- // assume we have very old puts
- earliestPutTs = HConstants.OLDEST_TIMESTAMP;
- } else {
- earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
- }
- }
- }
-
- // keep track of compaction progress
- progress = new CompactionProgress(maxKeyCount);
-
- // For each file, obtain a scanner:
- List<StoreFileScanner> scanners = StoreFileScanner
- .getScannersForStoreFiles(filesToCompact, false, false, true);
-
- // Make the instantiation lazy in case compaction produces no product; i.e.
- // where all source cells are expired or deleted.
- StoreFile.Writer writer = null;
- // Find the smallest read point across all the Scanners.
- long smallestReadPoint = region.getSmallestReadPoint();
- MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
- try {
- InternalScanner scanner = null;
- try {
- if (getHRegion().getCoprocessorHost() != null) {
- scanner = getHRegion()
- .getCoprocessorHost()
- .preCompactScannerOpen(this, scanners,
- majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs);
- }
- if (scanner == null) {
- Scan scan = new Scan();
- scan.setMaxVersions(getFamily().getMaxVersions());
- /* Include deletes, unless we are doing a major compaction */
- scanner = new StoreScanner(this, getScanInfo(), scan, scanners,
- majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
- smallestReadPoint, earliestPutTs);
- }
- if (getHRegion().getCoprocessorHost() != null) {
- InternalScanner cpScanner =
- getHRegion().getCoprocessorHost().preCompact(this, scanner);
- // NULL scanner returned from coprocessor hooks means skip normal processing
- if (cpScanner == null) {
- return null;
- }
- scanner = cpScanner;
- }
-
- int bytesWritten = 0;
- // since scanner.next() can return 'false' but still be delivering data,
- // we have to use a do/while loop.
- ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
- // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
- boolean hasMore;
- do {
- hasMore = scanner.next(kvs, this.compactionKVMax);
- if (writer == null && !kvs.isEmpty()) {
- writer = createWriterInTmp(maxKeyCount, this.compactionCompression,
- true);
- }
- if (writer != null) {
- // output to writer:
- for (KeyValue kv : kvs) {
- if (kv.getMemstoreTS() <= smallestReadPoint) {
- kv.setMemstoreTS(0);
- }
- writer.append(kv);
- // update progress per key
- ++progress.currentCompactedKVs;
-
- // check periodically to see if a system stop is requested
- if (Store.closeCheckInterval > 0) {
- bytesWritten += kv.getLength();
- if (bytesWritten > Store.closeCheckInterval) {
- bytesWritten = 0;
- if (!this.region.areWritesEnabled()) {
- writer.close();
- fs.delete(writer.getPath(), false);
- throw new InterruptedIOException(
- "Aborting compaction of store " + this +
- " in region " + this.region +
- " because user requested stop.");
- }
- }
- }
- }
- }
- kvs.clear();
- } while (hasMore);
- } finally {
- if (scanner != null) {
- scanner.close();
- }
- }
- } finally {
- if (writer != null) {
- writer.appendMetadata(maxId, majorCompaction);
- writer.close();
- }
- }
- return writer;
- }
-
- /**
* Validates a store file by opening and closing it. In HFileV2 this should
* not be an expensive operation.
*
@@ -1741,7 +1631,7 @@ public class Store extends SchemaConfigu
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
- LOG.error("Failed replacing compacted files in " + this.storeNameStr +
+ LOG.error("Failed replacing compacted files in " + this +
". Compacted file is " + (result == null? "none": result.toString()) +
". Files replaced " + compactedFiles.toString() +
" some of which may have been already removed", e);
@@ -2027,7 +1917,7 @@ public class Store extends SchemaConfigu
return mk.getRow();
}
} catch(IOException e) {
- LOG.warn("Failed getting store size for " + this.storeNameStr, e);
+ LOG.warn("Failed getting store size for " + this, e);
} finally {
this.lock.readLock().unlock();
}
@@ -2080,7 +1970,7 @@ public class Store extends SchemaConfigu
@Override
public String toString() {
- return this.storeNameStr;
+ return getColumnFamilyName();
}
/**
@@ -2196,7 +2086,7 @@ public class Store extends SchemaConfigu
}
HRegionInfo getHRegionInfo() {
- return this.region.regionInfo;
+ return this.region.getRegionInfo();
}
/**
@@ -2324,8 +2214,8 @@ public class Store extends SchemaConfigu
public static final long FIXED_OVERHEAD =
ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
- + (20 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
- + (6 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
+ + (17 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
+ + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
+ ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java?rev=1418131&r1=1418130&r2=1418131&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java Thu Dec 6 23:36:29 2012
@@ -49,5 +49,4 @@ public class CompactionProgress {
public float getProgressPct() {
return currentCompactedKVs / totalCompactingKVs;
}
-
}
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java?rev=1418131&r1=1418130&r2=1418131&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java Thu Dec 6 23:36:29 2012
@@ -25,9 +25,6 @@ import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.util.ChecksumFactory;
-
/**
* Checksum types. The Checksum type is a one byte number
* that stores a representation of the checksum algorithm
@@ -70,7 +67,7 @@ public enum ChecksumType {
ctor = ChecksumFactory.newConstructor(PURECRC32);
LOG.info("Checksum using " + PURECRC32);
} catch (Exception e) {
- LOG.info(PURECRC32 + " not available.");
+ LOG.trace(PURECRC32 + " not available.");
}
try {
// The default checksum class name is java.util.zip.CRC32.
@@ -80,7 +77,7 @@ public enum ChecksumType {
LOG.info("Checksum can use " + JDKCRC);
}
} catch (Exception e) {
- LOG.warn(JDKCRC + " not available. ", e);
+ LOG.trace(JDKCRC + " not available.");
}
}
@@ -113,7 +110,7 @@ public enum ChecksumType {
ctor = ChecksumFactory.newConstructor(PURECRC32C);
LOG.info("Checksum can use " + PURECRC32C);
} catch (Exception e) {
- LOG.info(PURECRC32C + " not available. ");
+ LOG.trace(PURECRC32C + " not available.");
}
}
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=1418131&r1=1418130&r2=1418131&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java Thu Dec 6 23:36:29 2012
@@ -151,7 +151,7 @@ public abstract class FSUtils {
*/
public static FSDataOutputStream create(FileSystem fs, Path path,
FsPermission perm, boolean overwrite) throws IOException {
- LOG.debug("Creating file:" + path + "with permission:" + perm);
+ LOG.debug("Creating file=" + path + " with permission=" + perm);
return fs.create(path, perm, overwrite,
fs.getConf().getInt("io.file.buffer.size", 4096),
@@ -1013,6 +1013,25 @@ public abstract class FSUtils {
}
/**
+ * Given a particular region dir, return all the familydirs inside it
+ *
+ * @param fs A file system for the Path
+ * @param regionDir Path to a specific region directory
+ * @return List of paths to valid family directories in region dir.
+ * @throws IOException
+ */
+ public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
+ // assumes we are in a region dir.
+ FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
+ List<Path> familyDirs = new ArrayList<Path>(fds.length);
+ for (FileStatus fdfs: fds) {
+ Path fdPath = fdfs.getPath();
+ familyDirs.add(fdPath);
+ }
+ return familyDirs;
+ }
+
+ /**
* Filter for HFiles that excludes reference files.
*/
public static class HFileFilter implements PathFilter {
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1418131&r1=1418130&r2=1418131&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Thu Dec 6 23:36:29 2012
@@ -587,8 +587,10 @@ public class TestCompaction extends HBas
List<StoreFile> storeFiles = store.getStorefiles();
long maxId = StoreFile.getMaxSequenceIdInList(storeFiles);
+ Compactor tool = new Compactor(this.conf);
- StoreFile.Writer compactedFile = store.compactStore(storeFiles, false, maxId);
+ StoreFile.Writer compactedFile =
+ tool.compact(store, storeFiles, false, maxId);
// Now lets corrupt the compacted file.
FileSystem fs = FileSystem.get(conf);