You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2015/02/22 21:56:20 UTC
[47/50] [abbrv] hbase git commit: Merge branch 'master' (2/11/15)
into hbase-11339
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index 3c8fa87,0000000..aba81eb
mode 100644,000000..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@@ -1,546 -1,0 +1,548 @@@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.UUID;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.master.TableLockManager;
+import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
+import org.apache.hadoop.hbase.mob.MobCacheConfig;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobFile;
+import org.apache.hadoop.hbase.mob.MobFileName;
+import org.apache.hadoop.hbase.mob.MobStoreEngine;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
++import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+
+/**
+ * The store implementation to save MOBs (medium objects), it extends the HStore.
+ * When a descriptor of a column family has the value "IS_MOB", it means this column family
+ * is a mob one. When a HRegion instantiate a store for this column family, the HMobStore is
+ * created.
+ * HMobStore is almost the same with the HStore except using different types of scanners.
+ * In the method of getScanner, the MobStoreScanner and MobReversedStoreScanner are returned.
+ * In these scanners, a additional seeks in the mob files should be performed after the seek
+ * to HBase is done.
+ * The store implements how we save MOBs by extending HStore. When a descriptor
+ * of a column family has the value "IS_MOB", it means this column family is a mob one. When a
+ * HRegion instantiate a store for this column family, the HMobStore is created. HMobStore is
+ * almost the same with the HStore except using different types of scanners. In the method of
+ * getScanner, the MobStoreScanner and MobReversedStoreScanner are returned. In these scanners, a
+ * additional seeks in the mob files should be performed after the seek in HBase is done.
+ */
+@InterfaceAudience.Private
+public class HMobStore extends HStore {
+
+ private MobCacheConfig mobCacheConfig;
+ private Path homePath;
+ private Path mobFamilyPath;
+ private volatile long mobCompactedIntoMobCellsCount = 0;
+ private volatile long mobCompactedFromMobCellsCount = 0;
+ private volatile long mobCompactedIntoMobCellsSize = 0;
+ private volatile long mobCompactedFromMobCellsSize = 0;
+ private volatile long mobFlushCount = 0;
+ private volatile long mobFlushedCellsCount = 0;
+ private volatile long mobFlushedCellsSize = 0;
+ private volatile long mobScanCellsCount = 0;
+ private volatile long mobScanCellsSize = 0;
+ private List<Path> mobDirLocations;
+ private HColumnDescriptor family;
+ private TableLockManager tableLockManager;
+ private TableName tableLockName;
+
+ public HMobStore(final HRegion region, final HColumnDescriptor family,
+ final Configuration confParam) throws IOException {
+ super(region, family, confParam);
+ this.family = family;
+ this.mobCacheConfig = (MobCacheConfig) cacheConf;
+ this.homePath = MobUtils.getMobHome(conf);
+ this.mobFamilyPath = MobUtils.getMobFamilyPath(conf, this.getTableName(),
+ family.getNameAsString());
+ mobDirLocations = new ArrayList<Path>();
+ mobDirLocations.add(mobFamilyPath);
+ TableName tn = region.getTableDesc().getTableName();
+ mobDirLocations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils
+ .getMobRegionInfo(tn).getEncodedName(), family.getNameAsString()));
+ if (region.getRegionServerServices() != null) {
+ tableLockManager = region.getRegionServerServices().getTableLockManager();
+ tableLockName = MobUtils.getTableLockName(getTableName());
+ }
+ }
+
+ /**
+ * Creates the mob cache config.
+ */
+ @Override
+ protected void createCacheConf(HColumnDescriptor family) {
+ cacheConf = new MobCacheConfig(conf, family);
+ }
+
+ /**
+ * Gets current config.
+ */
+ public Configuration getConfiguration() {
+ return this.conf;
+ }
+
+ /**
+ * Gets the MobStoreScanner or MobReversedStoreScanner. In these scanners, a additional seeks in
+ * the mob files should be performed after the seek in HBase is done.
+ */
+ @Override
+ protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
+ long readPt, KeyValueScanner scanner) throws IOException {
+ if (scanner == null) {
+ if (MobUtils.isRefOnlyScan(scan)) {
+ Filter refOnlyFilter = new MobReferenceOnlyFilter();
+ Filter filter = scan.getFilter();
+ if (filter != null) {
+ scan.setFilter(new FilterList(filter, refOnlyFilter));
+ } else {
+ scan.setFilter(refOnlyFilter);
+ }
+ }
+ scanner = scan.isReversed() ? new ReversedMobStoreScanner(this, getScanInfo(), scan,
+ targetCols, readPt) : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt);
+ }
+ return scanner;
+ }
+
+ /**
+ * Creates the mob store engine.
+ */
+ @Override
+ protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf,
+ KVComparator kvComparator) throws IOException {
+ MobStoreEngine engine = new MobStoreEngine();
+ engine.createComponents(conf, store, kvComparator);
+ return engine;
+ }
+
+ /**
+ * Gets the temp directory.
+ * @return The temp directory.
+ */
+ private Path getTempDir() {
+ return new Path(homePath, MobConstants.TEMP_DIR_NAME);
+ }
+
+ /**
+ * Creates the writer for the mob file in temp directory.
+ * @param date The latest date of written cells.
+ * @param maxKeyCount The key count.
+ * @param compression The compression algorithm.
+ * @param startKey The start key.
+ * @return The writer for the mob file.
+ * @throws IOException
+ */
+ public StoreFile.Writer createWriterInTmp(Date date, long maxKeyCount,
+ Compression.Algorithm compression, byte[] startKey) throws IOException {
+ if (startKey == null) {
+ startKey = HConstants.EMPTY_START_ROW;
+ }
+ Path path = getTempDir();
+ return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey);
+ }
+
+ /**
+ * Creates the writer for the del file in temp directory.
+ * The del file keeps tracking the delete markers. Its name has a suffix _del,
+ * the format is [0-9a-f]+(_del)?.
+ * @param date The latest date of written cells.
+ * @param maxKeyCount The key count.
+ * @param compression The compression algorithm.
+ * @param startKey The start key.
+ * @return The writer for the del file.
+ * @throws IOException
+ */
+ public StoreFile.Writer createDelFileWriterInTmp(Date date, long maxKeyCount,
+ Compression.Algorithm compression, byte[] startKey) throws IOException {
+ if (startKey == null) {
+ startKey = HConstants.EMPTY_START_ROW;
+ }
+ Path path = getTempDir();
+ String suffix = UUID
+ .randomUUID().toString().replaceAll("-", "") + "_del";
+ MobFileName mobFileName = MobFileName.create(startKey, MobUtils.formatDate(date), suffix);
+ return createWriterInTmp(mobFileName, path, maxKeyCount, compression);
+ }
+
+ /**
+ * Creates the writer for the mob file in temp directory.
+ * @param date The date string, its format is yyyymmmdd.
+ * @param basePath The basic path for a temp directory.
+ * @param maxKeyCount The key count.
+ * @param compression The compression algorithm.
+ * @param startKey The start key.
+ * @return The writer for the mob file.
+ * @throws IOException
+ */
+ public StoreFile.Writer createWriterInTmp(String date, Path basePath, long maxKeyCount,
+ Compression.Algorithm compression, byte[] startKey) throws IOException {
+ MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID()
+ .toString().replaceAll("-", ""));
+ return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression);
+ }
+
+ /**
+ * Creates the writer for the mob file in temp directory.
+ * @param mobFileName The mob file name.
+ * @param basePath The basic path for a temp directory.
+ * @param maxKeyCount The key count.
+ * @param compression The compression algorithm.
+ * @return The writer for the mob file.
+ * @throws IOException
+ */
+ public StoreFile.Writer createWriterInTmp(MobFileName mobFileName, Path basePath, long maxKeyCount,
+ Compression.Algorithm compression) throws IOException {
+ final CacheConfig writerCacheConf = mobCacheConfig;
+ HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
+ .withIncludesMvcc(false).withIncludesTags(true)
+ .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
+ .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
+ .withBlockSize(getFamily().getBlocksize())
+ .withHBaseCheckSum(true).withDataBlockEncoding(getFamily().getDataBlockEncoding()).build();
+
+ StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf, region.getFilesystem())
+ .withFilePath(new Path(basePath, mobFileName.getFileName()))
+ .withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE)
+ .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
+ return w;
+ }
+
+ /**
+ * Commits the mob file.
+ * @param sourceFile The source file.
+ * @param targetPath The directory path where the source file is renamed to.
+ * @throws IOException
+ */
+ public void commitFile(final Path sourceFile, Path targetPath) throws IOException {
+ if (sourceFile == null) {
+ return;
+ }
+ Path dstPath = new Path(targetPath, sourceFile.getName());
+ validateMobFile(sourceFile);
+ String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
+ LOG.info(msg);
+ Path parent = dstPath.getParent();
+ if (!region.getFilesystem().exists(parent)) {
+ region.getFilesystem().mkdirs(parent);
+ }
+ if (!region.getFilesystem().rename(sourceFile, dstPath)) {
+ throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
+ }
+ }
+
+ /**
+ * Validates a mob file by opening and closing it.
+ *
+ * @param path the path to the mob file
+ */
+ private void validateMobFile(Path path) throws IOException {
+ StoreFile storeFile = null;
+ try {
+ storeFile =
+ new StoreFile(region.getFilesystem(), path, conf, this.mobCacheConfig, BloomType.NONE);
+ storeFile.createReader();
+ } catch (IOException e) {
+ LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e);
+ throw e;
+ } finally {
+ if (storeFile != null) {
+ storeFile.closeReader(false);
+ }
+ }
+ }
+
+ /**
+ * Reads the cell from the mob file.
+ * @param reference The cell found in the HBase, its value is a path to a mob file.
+ * @param cacheBlocks Whether the scanner should cache blocks.
+ * @return The cell found in the mob file.
+ * @throws IOException
+ */
+ public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException {
+ Cell result = null;
+ if (MobUtils.hasValidMobRefCellValue(reference)) {
+ String fileName = MobUtils.getMobFileName(reference);
+ result = readCell(mobDirLocations, fileName, reference, cacheBlocks);
+ if (result == null) {
+ result = readClonedCell(fileName, reference, cacheBlocks);
+ }
+ }
+ if (result == null) {
+ LOG.warn("The KeyValue result is null, assemble a new KeyValue with the same row,family,"
+ + "qualifier,timestamp,type and tags but with an empty value to return.");
+ result = new KeyValue(reference.getRowArray(), reference.getRowOffset(),
+ reference.getRowLength(), reference.getFamilyArray(), reference.getFamilyOffset(),
+ reference.getFamilyLength(), reference.getQualifierArray(),
+ reference.getQualifierOffset(), reference.getQualifierLength(), reference.getTimestamp(),
+ Type.codeToType(reference.getTypeByte()), HConstants.EMPTY_BYTE_ARRAY,
+ 0, 0, reference.getTagsArray(), reference.getTagsOffset(),
+ reference.getTagsLength());
+ }
+ return result;
+ }
+
+ /**
+ * Reads the cell from a mob file.
+ * The mob file might be located in different directories.
+ * 1. The working directory.
+ * 2. The archive directory.
+ * Reads the cell from the files located in both of the above directories.
+ * @param locations The possible locations where the mob files are saved.
+ * @param fileName The file to be read.
+ * @param search The cell to be searched.
+ * @param cacheMobBlocks Whether the scanner should cache blocks.
+ * @return The found cell. Null if there's no such a cell.
+ * @throws IOException
+ */
+ private Cell readCell(List<Path> locations, String fileName, Cell search, boolean cacheMobBlocks)
+ throws IOException {
+ FileSystem fs = getFileSystem();
+ for (Path location : locations) {
+ MobFile file = null;
+ Path path = new Path(location, fileName);
+ try {
+ file = mobCacheConfig.getMobFileCache().openFile(fs, path, mobCacheConfig);
+ return file.readCell(search, cacheMobBlocks);
+ } catch (IOException e) {
+ mobCacheConfig.getMobFileCache().evictFile(fileName);
+ if (e instanceof FileNotFoundException) {
+ LOG.warn("Fail to read the cell, the mob file " + path + " doesn't exist", e);
+ } else {
+ throw e;
+ }
+ } finally {
+ if (file != null) {
+ mobCacheConfig.getMobFileCache().closeFile(file);
+ }
+ }
+ }
+ LOG.error("The mob file " + fileName + " could not be found in the locations "
+ + mobDirLocations);
+ return null;
+ }
+
+ /**
+ * Reads the cell from a mob file of source table.
+ * The table might be cloned, in this case only hfile link is created in the new table,
+ * and the mob file is located in the source table directories.
+ * 1. The working directory of the source table.
+ * 2. The archive directory of the source table.
+ * Reads the cell from the files located in both of the above directories.
+ * @param fileName The file to be read.
+ * @param search The cell to be searched.
+ * @param cacheMobBlocks Whether the scanner should cache blocks.
+ * @return The found cell. Null if there's no such a cell.
+ * @throws IOException
+ */
+ private Cell readClonedCell(String fileName, Cell search, boolean cacheMobBlocks)
+ throws IOException {
+ Tag tableNameTag = MobUtils.getTableNameTag(search);
+ if (tableNameTag == null) {
+ return null;
+ }
+ byte[] tableName = tableNameTag.getValue();
+ if (Bytes.equals(this.getTableName().getName(), tableName)) {
+ return null;
+ }
+ // the possible locations in the source table.
+ List<Path> locations = new ArrayList<Path>();
+ TableName tn = TableName.valueOf(tableName);
+ locations.add(MobUtils.getMobFamilyPath(conf, tn, family.getNameAsString()));
+ locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils.getMobRegionInfo(tn)
+ .getEncodedName(), family.getNameAsString()));
+ // read the cell from the source table.
+ return readCell(locations, fileName, search, cacheMobBlocks);
+ }
+
+ /**
+ * Gets the mob file path.
+ * @return The mob file path.
+ */
+ public Path getPath() {
+ return mobFamilyPath;
+ }
+
+ /**
+ * The compaction in the store of mob.
+ * The cells in this store contains the path of the mob files. There might be race
+ * condition between the major compaction and the sweeping in mob files.
+ * In order to avoid this, we need mutually exclude the running of the major compaction and
+ * sweeping in mob files.
+ * The minor compaction is not affected.
+ * The major compaction is marked as retainDeleteMarkers when a sweeping is in progress.
+ */
+ @Override
- public List<StoreFile> compact(CompactionContext compaction) throws IOException {
++ public List<StoreFile> compact(CompactionContext compaction,
++ CompactionThroughputController throughputController) throws IOException {
+ // If it's major compaction, try to find whether there's a sweeper is running
+ // If yes, mark the major compaction as retainDeleteMarkers
+ if (compaction.getRequest().isAllFiles()) {
+ // Use the Zookeeper to coordinate.
+ // 1. Acquire a operation lock.
+ // 1.1. If no, mark the major compaction as retainDeleteMarkers and continue the compaction.
+ // 1.2. If the lock is obtained, search the node of sweeping.
+ // 1.2.1. If the node is there, the sweeping is in progress, mark the major
+ // compaction as retainDeleteMarkers and continue the compaction.
+ // 1.2.2. If the node is not there, add a child to the major compaction node, and
+ // run the compaction directly.
+ TableLock lock = null;
+ if (tableLockManager != null) {
+ lock = tableLockManager.readLock(tableLockName, "Major compaction in HMobStore");
+ }
+ boolean tableLocked = false;
+ String tableName = getTableName().getNameAsString();
+ if (lock != null) {
+ try {
+ LOG.info("Start to acquire a read lock for the table[" + tableName
+ + "], ready to perform the major compaction");
+ lock.acquire();
+ tableLocked = true;
+ } catch (Exception e) {
+ LOG.error("Fail to lock the table " + tableName, e);
+ }
+ } else {
+ // If the tableLockManager is null, mark the tableLocked as true.
+ tableLocked = true;
+ }
+ try {
+ if (!tableLocked) {
+ LOG.warn("Cannot obtain the table lock, maybe a sweep tool is running on this table["
+ + tableName + "], forcing the delete markers to be retained");
+ compaction.getRequest().forceRetainDeleteMarkers();
+ }
- return super.compact(compaction);
++ return super.compact(compaction, throughputController);
+ } finally {
+ if (tableLocked && lock != null) {
+ try {
+ lock.release();
+ } catch (IOException e) {
+ LOG.error("Fail to release the table lock " + tableName, e);
+ }
+ }
+ }
+ } else {
+ // If it's not a major compaction, continue the compaction.
- return super.compact(compaction);
++ return super.compact(compaction, throughputController);
+ }
+ }
+
+ public void updateMobCompactedIntoMobCellsCount(long count) {
+ mobCompactedIntoMobCellsCount += count;
+ }
+
+ public long getMobCompactedIntoMobCellsCount() {
+ return mobCompactedIntoMobCellsCount;
+ }
+
+ public void updateMobCompactedFromMobCellsCount(long count) {
+ mobCompactedFromMobCellsCount += count;
+ }
+
+ public long getMobCompactedFromMobCellsCount() {
+ return mobCompactedFromMobCellsCount;
+ }
+
+ public void updateMobCompactedIntoMobCellsSize(long size) {
+ mobCompactedIntoMobCellsSize += size;
+ }
+
+ public long getMobCompactedIntoMobCellsSize() {
+ return mobCompactedIntoMobCellsSize;
+ }
+
+ public void updateMobCompactedFromMobCellsSize(long size) {
+ mobCompactedFromMobCellsSize += size;
+ }
+
+ public long getMobCompactedFromMobCellsSize() {
+ return mobCompactedFromMobCellsSize;
+ }
+
+ public void updateMobFlushCount() {
+ mobFlushCount++;
+ }
+
+ public long getMobFlushCount() {
+ return mobFlushCount;
+ }
+
+ public void updateMobFlushedCellsCount(long count) {
+ mobFlushedCellsCount += count;
+ }
+
+ public long getMobFlushedCellsCount() {
+ return mobFlushedCellsCount;
+ }
+
+ public void updateMobFlushedCellsSize(long size) {
+ mobFlushedCellsSize += size;
+ }
+
+ public long getMobFlushedCellsSize() {
+ return mobFlushedCellsSize;
+ }
+
+ public void updateMobScanCellsCount(long count) {
+ mobScanCellsCount += count;
+ }
+
+ public long getMobScanCellsCount() {
+ return mobScanCellsCount;
+ }
+
+ public void updateMobScanCellsSize(long size) {
+ mobScanCellsSize += size;
+ }
+
+ public long getMobScanCellsSize() {
+ return mobScanCellsSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index df5c900,53e732a..ab0165d
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@@ -33,9 -34,9 +34,10 @@@ import java.util.HashSet
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.NavigableSet;
+ import java.util.RandomAccess;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index e11aac2,c170a65..787828b
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@@ -347,9 -372,7 +373,9 @@@ public class HRegionServer extends HasT
private final RegionServerAccounting regionServerAccounting;
// Cache configuration and block cache reference
- final CacheConfig cacheConfig;
+ protected CacheConfig cacheConfig;
+ // Cache configuration for mob
+ final MobCacheConfig mobCacheConfig;
/** The health check chore. */
private HealthCheckChore healthCheckChore;
@@@ -831,10 -933,9 +938,10 @@@
}
}
// Send cache a shutdown.
- if (cacheConfig.isBlockCacheEnabled()) {
+ if (cacheConfig != null && cacheConfig.isBlockCacheEnabled()) {
cacheConfig.getBlockCache().shutdown();
}
+ mobCacheConfig.getMobFileCache().shutdown();
if (movedRegionsCleaner != null) {
movedRegionsCleaner.stop("Region Server stopping");
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index b9f4038,252e5e1..f5bb67a
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@@ -55,10 -55,12 +55,13 @@@ import org.apache.hadoop.hbase.HColumnD
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
- import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+ import org.apache.hadoop.hbase.Tag;
+ import org.apache.hadoop.hbase.TagType;
+ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
+ import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Cipher;
import org.apache.hadoop.hbase.io.crypto.Encryption;
@@@ -133,11 -137,11 +138,11 @@@ public class HStore implements Store
protected final MemStore memstore;
// This stores directory in the filesystem.
- private final HRegion region;
+ protected final HRegion region;
private final HColumnDescriptor family;
private final HRegionFileSystem fs;
- protected final Configuration conf;
- private Configuration conf;
- private final CacheConfig cacheConf;
++ protected Configuration conf;
+ protected CacheConfig cacheConf;
private long lastCompactSize = 0;
volatile boolean forceMajor = false;
/* how many bytes to write between status checks */
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index 4384d87,5e5590d..159ec55
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@@ -32,10 -33,11 +33,13 @@@ import org.apache.hadoop.hbase.ServerNa
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
+import org.apache.hadoop.hbase.mob.MobCacheConfig;
+import org.apache.hadoop.hbase.mob.MobFileCache;
+ import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+ import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
import org.apache.hadoop.metrics2.MetricsExecutor;
/**
@@@ -50,11 -52,10 +54,11 @@@ class MetricsRegionServerWrapperImp
private final HRegionServer regionServer;
private BlockCache blockCache;
+ private MobFileCache mobFileCache;
private volatile long numStores = 0;
- private volatile long numHLogFiles = 0;
- private volatile long hlogFileSize = 0;
+ private volatile long numWALFiles = 0;
+ private volatile long walFileSize = 0;
private volatile long numStoreFiles = 0;
private volatile long memstoreSize = 0;
private volatile long storeFileSize = 0;
@@@ -75,20 -76,7 +79,21 @@@
private volatile long flushedCellsSize = 0;
private volatile long compactedCellsSize = 0;
private volatile long majorCompactedCellsSize = 0;
+ private volatile long mobCompactedIntoMobCellsCount = 0;
+ private volatile long mobCompactedFromMobCellsCount = 0;
+ private volatile long mobCompactedIntoMobCellsSize = 0;
+ private volatile long mobCompactedFromMobCellsSize = 0;
+ private volatile long mobFlushCount = 0;
+ private volatile long mobFlushedCellsCount = 0;
+ private volatile long mobFlushedCellsSize = 0;
+ private volatile long mobScanCellsCount = 0;
+ private volatile long mobScanCellsSize = 0;
+ private volatile long mobFileCacheAccessCount = 0;
+ private volatile long mobFileCacheMissCount = 0;
+ private volatile double mobFileCacheHitRatio = 0;
+ private volatile long mobFileCacheEvictedCount = 0;
+ private volatile long mobFileCacheCount = 0;
+ private volatile long blockedRequestsCount = 0L;
private CacheStats cacheStats;
private ScheduledExecutorService executor;
@@@ -526,15 -450,7 +549,16 @@@
long tempFlushedCellsSize = 0;
long tempCompactedCellsSize = 0;
long tempMajorCompactedCellsSize = 0;
+ long tempMobCompactedIntoMobCellsCount = 0;
+ long tempMobCompactedFromMobCellsCount = 0;
+ long tempMobCompactedIntoMobCellsSize = 0;
+ long tempMobCompactedFromMobCellsSize = 0;
+ long tempMobFlushCount = 0;
+ long tempMobFlushedCellsCount = 0;
+ long tempMobFlushedCellsSize = 0;
+ long tempMobScanCellsCount = 0;
+ long tempMobScanCellsSize = 0;
+ long tempBlockedRequestsCount = 0L;
for (HRegion r : regionServer.getOnlineRegionsLocalContext()) {
tempNumMutationsWithoutWAL += r.numMutationsWithoutWAL.get();
@@@ -631,20 -526,22 +646,36 @@@
flushedCellsSize = tempFlushedCellsSize;
compactedCellsSize = tempCompactedCellsSize;
majorCompactedCellsSize = tempMajorCompactedCellsSize;
+ mobCompactedIntoMobCellsCount = tempMobCompactedIntoMobCellsCount;
+ mobCompactedFromMobCellsCount = tempMobCompactedFromMobCellsCount;
+ mobCompactedIntoMobCellsSize = tempMobCompactedIntoMobCellsSize;
+ mobCompactedFromMobCellsSize = tempMobCompactedFromMobCellsSize;
+ mobFlushCount = tempMobFlushCount;
+ mobFlushedCellsCount = tempMobFlushedCellsCount;
+ mobFlushedCellsSize = tempMobFlushedCellsSize;
+ mobScanCellsCount = tempMobScanCellsCount;
+ mobScanCellsSize = tempMobScanCellsSize;
+ mobFileCacheAccessCount = mobFileCache.getAccessCount();
+ mobFileCacheMissCount = mobFileCache.getMissCount();
+ mobFileCacheHitRatio = mobFileCache.getHitRatio();
+ mobFileCacheEvictedCount = mobFileCache.getEvictedFileCount();
+ mobFileCacheCount = mobFileCache.getCacheSize();
+ blockedRequestsCount = tempBlockedRequestsCount;
}
}
+
+ @Override
+ public long getHedgedReadOps() {
+ return this.dfsHedgedReadMetrics == null? 0: this.dfsHedgedReadMetrics.getHedgedReadOps();
+ }
+
+ @Override
+ public long getHedgedReadWins() {
+ return this.dfsHedgedReadMetrics == null? 0: this.dfsHedgedReadMetrics.getHedgedReadWins();
+ }
+
+ @Override
+ public long getBlockedRequestsCount() {
+ return blockedRequestsCount;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 13967c2,a92c17e..7870040
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@@ -43,10 -43,9 +43,12 @@@ import org.apache.hadoop.hbase.regionse
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.util.StringUtils;
+ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+ import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
/**
* A compactor is a compaction algorithm associated a given policy. Base class also contains
@@@ -226,64 -220,87 +233,90 @@@ public abstract class Compactor
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
}
++ // TODO mob introduced the fd parameter; can we make this cleaner and easier to extend in future?
/**
* Performs the compaction.
- * @param fd File details
++ * @param fd FileDetails of cell sink writer
* @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
+ * @param major Is a major compaction.
* @return Whether compaction ended; false if it was interrupted for some reason.
*/
- protected boolean performCompaction(InternalScanner scanner, CellSink writer,
+ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
- long smallestReadPoint, boolean cleanSeqId, boolean major) throws IOException {
- int bytesWritten = 0;
+ long smallestReadPoint, boolean cleanSeqId,
- CompactionThroughputController throughputController) throws IOException {
++ CompactionThroughputController throughputController, boolean major) throws IOException {
+ long bytesWritten = 0;
+ long bytesWrittenProgress = 0;
// Since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
- List<Cell> kvs = new ArrayList<Cell>();
- int closeCheckInterval = HStore.getCloseCheckInterval();
- long lastMillis;
+ List<Cell> cells = new ArrayList<Cell>();
+ long closeCheckInterval = HStore.getCloseCheckInterval();
+ long lastMillis = 0;
if (LOG.isDebugEnabled()) {
- lastMillis = System.currentTimeMillis();
- } else {
- lastMillis = 0;
+ lastMillis = EnvironmentEdgeManager.currentTime();
}
+ String compactionName =
+ store.getRegionInfo().getRegionNameAsString() + "#" + store.getFamily().getNameAsString();
+ long now = 0;
boolean hasMore;
- do {
- hasMore = scanner.next(kvs, compactionKVMax);
- // output to writer:
- for (Cell c : kvs) {
- KeyValue kv = KeyValueUtil.ensureKeyValue(c);
- resetSeqId(smallestReadPoint, cleanSeqId, kv);
- writer.append(kv);
- ++progress.currentCompactedKVs;
- progress.totalCompactedSize += kv.getLength();
-
- // check periodically to see if a system stop is requested
- if (closeCheckInterval > 0) {
- bytesWritten += kv.getLength();
- if (bytesWritten > closeCheckInterval) {
- // Log the progress of long running compactions every minute if
- // logging at DEBUG level
- if (LOG.isDebugEnabled()) {
- long now = System.currentTimeMillis();
- if ((now - lastMillis) >= 60 * 1000) {
- LOG.debug("Compaction progress: " + progress + String.format(", rate=%.2f kB/sec",
- (bytesWritten / 1024.0) / ((now - lastMillis) / 1000.0)));
- lastMillis = now;
+ throughputController.start(compactionName);
+ try {
+ do {
+ hasMore = scanner.next(cells, compactionKVMax);
+ if (LOG.isDebugEnabled()) {
+ now = EnvironmentEdgeManager.currentTime();
+ }
+ // output to writer:
+ for (Cell c : cells) {
+ if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
+ CellUtil.setSequenceId(c, 0);
+ }
+ writer.append(c);
+ int len = KeyValueUtil.length(c);
+ ++progress.currentCompactedKVs;
+ progress.totalCompactedSize += len;
+ if (LOG.isDebugEnabled()) {
+ bytesWrittenProgress += len;
+ }
+ throughputController.control(compactionName, len);
+ // check periodically to see if a system stop is requested
+ if (closeCheckInterval > 0) {
+ bytesWritten += len;
+ if (bytesWritten > closeCheckInterval) {
+ bytesWritten = 0;
+ if (!store.areWritesEnabled()) {
+ progress.cancel();
+ return false;
}
}
- bytesWritten = 0;
- if (!store.areWritesEnabled()) {
- progress.cancel();
- return false;
- }
}
}
- }
- kvs.clear();
- } while (hasMore);
+ // Log the progress of long running compactions every minute if
+ // logging at DEBUG level
+ if (LOG.isDebugEnabled()) {
+ if ((now - lastMillis) >= 60 * 1000) {
+ LOG.debug("Compaction progress: "
+ + compactionName
+ + " "
+ + progress
+ + String.format(", rate=%.2f kB/sec", (bytesWrittenProgress / 1024.0)
+ / ((now - lastMillis) / 1000.0)) + ", throughputController is "
+ + throughputController);
+ lastMillis = now;
+ bytesWrittenProgress = 0;
+ }
+ }
+ cells.clear();
+ } while (hasMore);
+ } catch (InterruptedException e) {
+ progress.cancel();
+ throw new InterruptedIOException("Interrupted while control throughput of compacting "
+ + compactionName);
+ } finally {
+ throughputController.finish(compactionName);
+ }
progress.complete();
return true;
}
@@@ -321,29 -338,4 +354,17 @@@
return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
}
+
+ /**
- * Resets the sequence id.
- * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region.
- * @param cleanSeqId Should clean the sequence id.
- * @param kv The current KeyValue.
- */
- protected void resetSeqId(long smallestReadPoint, boolean cleanSeqId, KeyValue kv) {
- if (cleanSeqId && kv.getSequenceId() <= smallestReadPoint) {
- kv.setSequenceId(0);
- }
- }
-
- /**
+ * Appends the metadata and closes the writer.
+ * @param writer The current store writer.
+ * @param fd The file details.
+ * @param isMajor Is a major compaction.
+ * @throws IOException
+ */
+ protected void appendMetadataAndCloseWriter(StoreFile.Writer writer, FileDetails fd,
+ boolean isMajor) throws IOException {
+ writer.appendMetadata(fd.maxSeqId, isMajor);
+ writer.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index be859c0,5d712c1..090be8c
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@@ -76,9 -98,11 +98,10 @@@ public class DefaultCompactor extends C
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
cleanSeqId = true;
}
-
- writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
- true, fd.maxTagsLength > 0);
+ writer = createTmpWriter(fd, smallestReadPoint);
- boolean finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
- request.isAllFiles());
+ boolean finished =
- performCompaction(scanner, writer, smallestReadPoint, cleanSeqId, throughputController);
++ performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, throughputController,
++ request.isAllFiles());
if (!finished) {
writer.close();
store.getFileSystem().delete(writer.getPath(), false);
@@@ -102,22 -146,8 +145,22 @@@
}
/**
+ * Creates a writer for a new file in a temporary directory.
+ * @param fd The file details.
+ * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region.
+ * @return Writer for a new StoreFile in the tmp dir.
+ * @throws IOException
+ */
+ protected StoreFile.Writer createTmpWriter(FileDetails fd, long smallestReadPoint)
+ throws IOException {
+ StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
+ true, fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
+ return writer;
+ }
+
+ /**
* Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
- * {@link #compact(CompactionRequest)};
+ * {@link #compact(CompactionRequest, CompactionThroughputController)};
* @param filesToCompact the files to compact. These are used as the compactionSelection for
* the generated {@link CompactionRequest}.
* @param isMajor true to major compact (prune all deletes, max versions, etc)
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index 3109015,b957e16..fab4c2f
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@@ -127,8 -130,8 +130,9 @@@ public class StripeCompactor extends Co
// It is ok here if storeScanner is null.
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
mw.init(storeScanner, factory, store.getComparator());
- finished = performCompaction(fd, scanner, mw, smallestReadPoint, cleanSeqId,
- request.isMajor());
+ finished =
- performCompaction(scanner, mw, smallestReadPoint, cleanSeqId, throughputController);
++ performCompaction(fd, scanner, mw, smallestReadPoint, cleanSeqId, throughputController,
++ request.isMajor());
if (!finished) {
throw new InterruptedIOException( "Aborting compaction of store " + store +
" in region " + store.getRegionInfo().getRegionNameAsString() +
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
index 0d7efe7,2655e2b..841bc04
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
@@@ -52,21 -51,16 +51,20 @@@ import org.apache.hadoop.hbase.HConstan
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.io.FileLink;
import org.apache.hadoop.hbase.io.HFileLink;
- import org.apache.hadoop.hbase.io.HLogLink;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.mapreduce.JobUtil;
+ import org.apache.hadoop.hbase.io.WALLink;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
- import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.BytesWritable;
+ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
@@@ -423,10 -433,10 +437,10 @@@ public class ExportSnapshot extends Con
switch (fileInfo.getType()) {
case HFILE:
Path inputPath = new Path(fileInfo.getHfile());
- link = HFileLink.buildFromHFileLinkPattern(conf, inputPath);
+ link = getFileLink(inputPath, conf);
break;
case WAL:
- link = new HLogLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
+ link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
break;
default:
throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
@@@ -442,16 -452,6 +456,16 @@@
}
}
+ private FileLink getFileLink(Path path, Configuration conf) throws IOException{
+ String regionName = HFileLink.getReferencedRegionName(path.getName());
+ TableName tableName = HFileLink.getReferencedTableName(path.getName());
+ if(MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) {
- return new HFileLink(MobUtils.getQualifiedMobRootDir(conf),
++ return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
+ HFileArchiveUtil.getArchivePath(conf), path);
+ }
- return new HFileLink(inputRoot, inputArchive, path);
++ return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path);
+ }
+
private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
try {
return fs.getFileChecksum(path);
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
index 6f7d847,330ead4..9d3407a
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
@@@ -34,17 -34,12 +34,18 @@@ import org.apache.hadoop.hbase.classifi
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
+ import org.apache.hadoop.hbase.TableDescriptor;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotDataManifest;
import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
index 2a599d3,d1f787a..50b5c9a
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
@@@ -39,14 -39,11 +39,14 @@@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
- import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+ import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.FSVisitor;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
/**
* Utility methods for interacting with the snapshot referenced files.
@@@ -299,15 -296,7 +299,15 @@@ public final class SnapshotReferenceUti
}
// check if the linked file exists (in the archive, or in the table dir)
- HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, linkPath);
+ HFileLink link = null;
+ if (MobUtils.isMobRegionInfo(regionInfo)) {
+ // for mob region
- link = new HFileLink(MobUtils.getQualifiedMobRootDir(conf),
++ link = HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
+ HFileArchiveUtil.getArchivePath(conf), linkPath);
+ } else {
+ // not mob region
- link = new HFileLink(conf, linkPath);
++ link = HFileLink.buildFromHFileLinkPattern(conf, linkPath);
+ }
try {
FileStatus fstat = link.getFileStatus(fs);
if (storeFile.hasFileSize() && storeFile.getFileSize() != fstat.getLen()) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
index a939422,0000000..27d53ba
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
@@@ -1,250 -1,0 +1,251 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
- import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.NamespaceNotFoundException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
++import org.apache.hadoop.hbase.testclassification.ClientTests;
++import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test clone snapshots from the client
+ */
- @Category(LargeTests.class)
++@Category({LargeTests.class, ClientTests.class})
+public class TestMobCloneSnapshotFromClient {
+ final Log LOG = LogFactory.getLog(getClass());
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private final byte[] FAMILY = Bytes.toBytes("cf");
+
+ private byte[] emptySnapshot;
+ private byte[] snapshotName0;
+ private byte[] snapshotName1;
+ private byte[] snapshotName2;
+ private int snapshot0Rows;
+ private int snapshot1Rows;
+ private TableName tableName;
+ private Admin admin;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
+ TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 10);
+ TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
+ TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
+ TEST_UTIL.getConfiguration().setBoolean(
+ "hbase.master.enabletable.roundrobin", true);
+ TEST_UTIL.getConfiguration().setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0);
+ TEST_UTIL.startMiniCluster(3);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * Initialize the tests with a table filled with some data
+ * and two snapshots (snapshotName0, snapshotName1) of different states.
+ * The tableName, snapshotNames and the number of rows in the snapshot are initialized.
+ */
+ @Before
+ public void setup() throws Exception {
+ this.admin = TEST_UTIL.getHBaseAdmin();
+
+ long tid = System.currentTimeMillis();
+ tableName = TableName.valueOf("testtb-" + tid);
+ emptySnapshot = Bytes.toBytes("emptySnaptb-" + tid);
+ snapshotName0 = Bytes.toBytes("snaptb0-" + tid);
+ snapshotName1 = Bytes.toBytes("snaptb1-" + tid);
+ snapshotName2 = Bytes.toBytes("snaptb2-" + tid);
+
+ // create Table and disable it
+ MobSnapshotTestingUtils.createMobTable(TEST_UTIL, tableName, getNumReplicas(), FAMILY);
+ admin.disableTable(tableName);
+
+ // take an empty snapshot
+ admin.snapshot(emptySnapshot, tableName);
+
+ HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+ try {
+ // enable table and insert data
+ admin.enableTable(tableName);
- SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY);
++ SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
+ snapshot0Rows = MobSnapshotTestingUtils.countMobRows(table);
+ admin.disableTable(tableName);
+
+ // take a snapshot
+ admin.snapshot(snapshotName0, tableName);
+
+ // enable table and insert more data
+ admin.enableTable(tableName);
- SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY);
++ SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
+ snapshot1Rows = MobSnapshotTestingUtils.countMobRows(table);
+ admin.disableTable(tableName);
+
+ // take a snapshot of the updated table
+ admin.snapshot(snapshotName1, tableName);
+
+ // re-enable table
+ admin.enableTable(tableName);
+ } finally {
+ table.close();
+ }
+ }
+
+ protected int getNumReplicas() {
+ return 1;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (admin.tableExists(tableName)) {
+ TEST_UTIL.deleteTable(tableName);
+ }
+ SnapshotTestingUtils.deleteAllSnapshots(admin);
+ SnapshotTestingUtils.deleteArchiveDirectory(TEST_UTIL);
+ }
+
+ @Test(expected=SnapshotDoesNotExistException.class)
+ public void testCloneNonExistentSnapshot() throws IOException, InterruptedException {
+ String snapshotName = "random-snapshot-" + System.currentTimeMillis();
+ TableName tableName = TableName.valueOf("random-table-" + System.currentTimeMillis());
+ admin.cloneSnapshot(snapshotName, tableName);
+ }
+
+ @Test(expected = NamespaceNotFoundException.class)
+ public void testCloneOnMissingNamespace() throws IOException, InterruptedException {
+ TableName clonedTableName = TableName.valueOf("unknownNS:clonetb");
+ admin.cloneSnapshot(snapshotName1, clonedTableName);
+ }
+
+ @Test
+ public void testCloneSnapshot() throws IOException, InterruptedException {
+ TableName clonedTableName = TableName.valueOf("clonedtb-" + System.currentTimeMillis());
+ testCloneSnapshot(clonedTableName, snapshotName0, snapshot0Rows);
+ testCloneSnapshot(clonedTableName, snapshotName1, snapshot1Rows);
+ testCloneSnapshot(clonedTableName, emptySnapshot, 0);
+ }
+
+ private void testCloneSnapshot(final TableName tableName, final byte[] snapshotName,
+ int snapshotRows) throws IOException, InterruptedException {
+ // create a new table from snapshot
+ admin.cloneSnapshot(snapshotName, tableName);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshotRows);
+
+ verifyReplicasCameOnline(tableName);
+ TEST_UTIL.deleteTable(tableName);
+ }
+
+ protected void verifyReplicasCameOnline(TableName tableName) throws IOException {
+ SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
+ }
+
+ @Test
+ public void testCloneSnapshotCrossNamespace() throws IOException, InterruptedException {
+ String nsName = "testCloneSnapshotCrossNamespace";
+ admin.createNamespace(NamespaceDescriptor.create(nsName).build());
+ TableName clonedTableName =
+ TableName.valueOf(nsName, "clonedtb-" + System.currentTimeMillis());
+ testCloneSnapshot(clonedTableName, snapshotName0, snapshot0Rows);
+ testCloneSnapshot(clonedTableName, snapshotName1, snapshot1Rows);
+ testCloneSnapshot(clonedTableName, emptySnapshot, 0);
+ }
+
+ /**
+ * Verify that tables created from the snapshot are still alive after source table deletion.
+ */
+ @Test
+ public void testCloneLinksAfterDelete() throws IOException, InterruptedException {
+ // Clone a table from the first snapshot
+ TableName clonedTableName = TableName.valueOf("clonedtb1-" + System.currentTimeMillis());
+ admin.cloneSnapshot(snapshotName0, clonedTableName);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName, snapshot0Rows);
+
+ // Take a snapshot of this cloned table.
+ admin.disableTable(clonedTableName);
+ admin.snapshot(snapshotName2, clonedTableName);
+
+ // Clone the snapshot of the cloned table
+ TableName clonedTableName2 = TableName.valueOf("clonedtb2-" + System.currentTimeMillis());
+ admin.cloneSnapshot(snapshotName2, clonedTableName2);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName2, snapshot0Rows);
+ admin.disableTable(clonedTableName2);
+
+ // Remove the original table
+ TEST_UTIL.deleteTable(tableName);
+ waitCleanerRun();
+
+ // Verify the first cloned table
+ admin.enableTable(clonedTableName);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName, snapshot0Rows);
+
+ // Verify the second cloned table
+ admin.enableTable(clonedTableName2);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName2, snapshot0Rows);
+ admin.disableTable(clonedTableName2);
+
+ // Delete the first cloned table
+ TEST_UTIL.deleteTable(clonedTableName);
+ waitCleanerRun();
+
+ // Verify the second cloned table
+ admin.enableTable(clonedTableName2);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName2, snapshot0Rows);
+
+ // Clone a new table from cloned
+ TableName clonedTableName3 = TableName.valueOf("clonedtb3-" + System.currentTimeMillis());
+ admin.cloneSnapshot(snapshotName2, clonedTableName3);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName3, snapshot0Rows);
+
+ // Delete the cloned tables
+ TEST_UTIL.deleteTable(clonedTableName2);
+ TEST_UTIL.deleteTable(clonedTableName3);
+ admin.deleteSnapshot(snapshotName2);
+ }
+
+ // ==========================================================================
+ // Helpers
+ // ==========================================================================
+
+ private void waitCleanerRun() throws InterruptedException {
+ TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobRestoreSnapshotFromClient.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobRestoreSnapshotFromClient.java
index c75e006,0000000..0bb498d
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobRestoreSnapshotFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobRestoreSnapshotFromClient.java
@@@ -1,303 -1,0 +1,304 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
- import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
+import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
+import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
++import org.apache.hadoop.hbase.testclassification.ClientTests;
++import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test restore snapshots from the client
+ */
- @Category(LargeTests.class)
++@Category({ClientTests.class, LargeTests.class})
+public class TestMobRestoreSnapshotFromClient {
+ final Log LOG = LogFactory.getLog(getClass());
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private final byte[] FAMILY = Bytes.toBytes("cf");
+
+ private byte[] emptySnapshot;
+ private byte[] snapshotName0;
+ private byte[] snapshotName1;
+ private byte[] snapshotName2;
+ private int snapshot0Rows;
+ private int snapshot1Rows;
+ private TableName tableName;
+ private Admin admin;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
+ TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 10);
+ TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
+ TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
+ TEST_UTIL.getConfiguration().setBoolean(
+ "hbase.master.enabletable.roundrobin", true);
+ TEST_UTIL.getConfiguration().setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0);
+ TEST_UTIL.startMiniCluster(3);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * Initialize the tests with a table filled with some data
+ * and two snapshots (snapshotName0, snapshotName1) of different states.
+ * The tableName, snapshotNames and the number of rows in the snapshot are initialized.
+ */
+ @Before
+ public void setup() throws Exception {
+ this.admin = TEST_UTIL.getHBaseAdmin();
+
+ long tid = System.currentTimeMillis();
+ tableName =
+ TableName.valueOf("testtb-" + tid);
+ emptySnapshot = Bytes.toBytes("emptySnaptb-" + tid);
+ snapshotName0 = Bytes.toBytes("snaptb0-" + tid);
+ snapshotName1 = Bytes.toBytes("snaptb1-" + tid);
+ snapshotName2 = Bytes.toBytes("snaptb2-" + tid);
+
+ // create Table and disable it
+ MobSnapshotTestingUtils.createMobTable(TEST_UTIL, tableName, getNumReplicas(), FAMILY);
+
+ admin.disableTable(tableName);
+
+ // take an empty snapshot
+ admin.snapshot(emptySnapshot, tableName);
+
+ HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+ // enable table and insert data
+ admin.enableTable(tableName);
- SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY);
++ SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
+ snapshot0Rows = MobSnapshotTestingUtils.countMobRows(table);
+ admin.disableTable(tableName);
+
+ // take a snapshot
+ admin.snapshot(snapshotName0, tableName);
+
+ // enable table and insert more data
+ admin.enableTable(tableName);
- SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY);
++ SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
+ snapshot1Rows = MobSnapshotTestingUtils.countMobRows(table);
+ table.close();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TEST_UTIL.deleteTable(tableName);
+ SnapshotTestingUtils.deleteAllSnapshots(TEST_UTIL.getHBaseAdmin());
+ SnapshotTestingUtils.deleteArchiveDirectory(TEST_UTIL);
+ }
+
+ @Test
+ public void testRestoreSnapshot() throws IOException {
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot1Rows);
+ admin.disableTable(tableName);
+ admin.snapshot(snapshotName1, tableName);
+ // Restore from snapshot-0
+ admin.restoreSnapshot(snapshotName0);
+ admin.enableTable(tableName);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot0Rows);
+ SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
+
+ // Restore from emptySnapshot
+ admin.disableTable(tableName);
+ admin.restoreSnapshot(emptySnapshot);
+ admin.enableTable(tableName);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, 0);
+ SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
+
+ // Restore from snapshot-1
+ admin.disableTable(tableName);
+ admin.restoreSnapshot(snapshotName1);
+ admin.enableTable(tableName);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot1Rows);
+ SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
+
+ // Restore from snapshot-1
+ TEST_UTIL.deleteTable(tableName);
+ admin.restoreSnapshot(snapshotName1);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot1Rows);
+ SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
+ }
+
+ protected int getNumReplicas() {
+ return 1;
+ }
+
+ @Test
+ public void testRestoreSchemaChange() throws Exception {
+ byte[] TEST_FAMILY2 = Bytes.toBytes("cf2");
+
+ HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+
+ // Add one column family and put some data in it
+ admin.disableTable(tableName);
+ HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAMILY2);
+ hcd.setMobEnabled(true);
+ hcd.setMobThreshold(3L);
+ admin.addColumn(tableName, hcd);
+ admin.enableTable(tableName);
+ assertEquals(2, table.getTableDescriptor().getFamilies().size());
+ HTableDescriptor htd = admin.getTableDescriptor(tableName);
+ assertEquals(2, htd.getFamilies().size());
- SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, TEST_FAMILY2);
++ SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, TEST_FAMILY2);
+ long snapshot2Rows = snapshot1Rows + 500;
+ assertEquals(snapshot2Rows, MobSnapshotTestingUtils.countMobRows(table));
+ assertEquals(500, MobSnapshotTestingUtils.countMobRows(table, TEST_FAMILY2));
+ Set<String> fsFamilies = getFamiliesFromFS(tableName);
+ assertEquals(2, fsFamilies.size());
+
+ // Take a snapshot
+ admin.disableTable(tableName);
+ admin.snapshot(snapshotName2, tableName);
+
+ // Restore the snapshot (without the cf)
+ admin.restoreSnapshot(snapshotName0);
+ admin.enableTable(tableName);
+ assertEquals(1, table.getTableDescriptor().getFamilies().size());
+ try {
+ MobSnapshotTestingUtils.countMobRows(table, TEST_FAMILY2);
+ fail("family '" + Bytes.toString(TEST_FAMILY2) + "' should not exists");
+ } catch (NoSuchColumnFamilyException e) {
+ // expected
+ }
+ assertEquals(snapshot0Rows, MobSnapshotTestingUtils.countMobRows(table));
+ htd = admin.getTableDescriptor(tableName);
+ assertEquals(1, htd.getFamilies().size());
+ fsFamilies = getFamiliesFromFS(tableName);
+ assertEquals(1, fsFamilies.size());
+
+ // Restore back the snapshot (with the cf)
+ admin.disableTable(tableName);
+ admin.restoreSnapshot(snapshotName2);
+ admin.enableTable(tableName);
+ htd = admin.getTableDescriptor(tableName);
+ assertEquals(2, htd.getFamilies().size());
+ assertEquals(2, table.getTableDescriptor().getFamilies().size());
+ assertEquals(500, MobSnapshotTestingUtils.countMobRows(table, TEST_FAMILY2));
+ assertEquals(snapshot2Rows, MobSnapshotTestingUtils.countMobRows(table));
+ fsFamilies = getFamiliesFromFS(tableName);
+ assertEquals(2, fsFamilies.size());
+ table.close();
+ }
+
+ @Test
+ public void testCloneSnapshotOfCloned() throws IOException, InterruptedException {
+ TableName clonedTableName =
+ TableName.valueOf("clonedtb-" + System.currentTimeMillis());
+ admin.cloneSnapshot(snapshotName0, clonedTableName);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName, snapshot0Rows);
+ SnapshotTestingUtils.verifyReplicasCameOnline(clonedTableName, admin, getNumReplicas());
+ admin.disableTable(clonedTableName);
+ admin.snapshot(snapshotName2, clonedTableName);
+ TEST_UTIL.deleteTable(clonedTableName);
+ waitCleanerRun();
+
+ admin.cloneSnapshot(snapshotName2, clonedTableName);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName, snapshot0Rows);
+ SnapshotTestingUtils.verifyReplicasCameOnline(clonedTableName, admin, getNumReplicas());
+ TEST_UTIL.deleteTable(clonedTableName);
+ }
+
+ @Test
+ public void testCloneAndRestoreSnapshot() throws IOException, InterruptedException {
+ TEST_UTIL.deleteTable(tableName);
+ waitCleanerRun();
+
+ admin.cloneSnapshot(snapshotName0, tableName);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot0Rows);
+ SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
+ waitCleanerRun();
+
+ admin.disableTable(tableName);
+ admin.restoreSnapshot(snapshotName0);
+ admin.enableTable(tableName);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot0Rows);
+ SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
+ }
+
+ @Test
+ public void testCorruptedSnapshot() throws IOException, InterruptedException {
+ SnapshotTestingUtils.corruptSnapshot(TEST_UTIL, Bytes.toString(snapshotName0));
+ TableName cloneName = TableName.valueOf("corruptedClone-" + System.currentTimeMillis());
+ try {
+ admin.cloneSnapshot(snapshotName0, cloneName);
+ fail("Expected CorruptedSnapshotException, got succeeded cloneSnapshot()");
+ } catch (CorruptedSnapshotException e) {
+ // Got the expected corruption exception.
+ // check for no references of the cloned table.
+ assertFalse(admin.tableExists(cloneName));
+ } catch (Exception e) {
+ fail("Expected CorruptedSnapshotException got: " + e);
+ }
+ }
+
+ // ==========================================================================
+ // Helpers
+ // ==========================================================================
+ private void waitCleanerRun() throws InterruptedException {
+ TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting();
+ }
+
+ private Set<String> getFamiliesFromFS(final TableName tableName) throws IOException {
+ MasterFileSystem mfs = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterFileSystem();
+ Set<String> families = new HashSet<String>();
+ Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), tableName);
+ for (Path regionDir: FSUtils.getRegionDirs(mfs.getFileSystem(), tableDir)) {
+ for (Path familyDir: FSUtils.getFamilyDirs(mfs.getFileSystem(), regionDir)) {
+ families.add(familyDir.getName());
+ }
+ }
+ return families;
+ }
+}