You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2015/05/01 17:28:34 UTC
[48/50] [abbrv] hbase git commit: Merge branch 'apache/master'
(4/16/15) into hbase-11339
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
index d286b72,0000000..37d4461
mode 100644,000000..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
@@@ -1,180 -1,0 +1,182 @@@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
++import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
++import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepReducer.SweepPartitionId;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.MemStore;
+import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Reducer.Context;
+
+/**
+ * The wrapper of a DefaultMemStore.
+ * This wrapper is used in the sweep reducer to buffer and sort the cells written from
+ * the invalid and small mob files.
+ * It's flushed when it's full, the mob data are written to the mob files, and their file names
+ * are written back to store files of HBase.
+ * This memStore is used to sort the cells in mob files.
+ * In a reducer of sweep tool, the mob files are grouped by the same prefix (start key and date),
+ * in each group, the reducer iterates the files and read the cells to a new and bigger mob file.
+ * The cells in the same mob file are ordered, but cells across mob files are not.
+ * So we need this MemStoreWrapper to sort those cells come from different mob files before
+ * flushing them to the disk, when the memStore is big enough it's flushed as a new mob file.
+ */
+@InterfaceAudience.Private
+public class MemStoreWrapper {
+
+ private static final Log LOG = LogFactory.getLog(MemStoreWrapper.class);
+
+ private MemStore memstore;
+ private long flushSize;
+ private SweepPartitionId partitionId;
+ private Context context;
+ private Configuration conf;
- private HTable table;
++ private BufferedMutator table;
+ private HColumnDescriptor hcd;
+ private Path mobFamilyDir;
+ private FileSystem fs;
+ private CacheConfig cacheConfig;
+
- public MemStoreWrapper(Context context, FileSystem fs, HTable table, HColumnDescriptor hcd,
++ public MemStoreWrapper(Context context, FileSystem fs, BufferedMutator table, HColumnDescriptor hcd,
+ MemStore memstore, CacheConfig cacheConfig) throws IOException {
+ this.memstore = memstore;
+ this.context = context;
+ this.fs = fs;
+ this.table = table;
+ this.hcd = hcd;
+ this.conf = context.getConfiguration();
+ this.cacheConfig = cacheConfig;
+ flushSize = this.conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE,
+ MobConstants.DEFAULT_MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE);
+ mobFamilyDir = MobUtils.getMobFamilyPath(conf, table.getName(), hcd.getNameAsString());
+ }
+
+ public void setPartitionId(SweepPartitionId partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ /**
+ * Flushes the memstore if the size is large enough.
+ * @throws IOException
+ */
+ private void flushMemStoreIfNecessary() throws IOException {
+ if (memstore.heapSize() >= flushSize) {
+ flushMemStore();
+ }
+ }
+
+ /**
+ * Flushes the memstore anyway.
+ * @throws IOException
+ */
+ public void flushMemStore() throws IOException {
+ MemStoreSnapshot snapshot = memstore.snapshot();
+ internalFlushCache(snapshot);
+ memstore.clearSnapshot(snapshot.getId());
+ }
+
+ /**
+ * Flushes the snapshot of the memstore.
+ * Flushes the mob data to the mob files, and flushes the name of these mob files to HBase.
+ * @param snapshot The snapshot of the memstore.
+ * @throws IOException
+ */
+ private void internalFlushCache(final MemStoreSnapshot snapshot)
+ throws IOException {
+ if (snapshot.getCellsCount() == 0) {
+ return;
+ }
+ // generate the files into a temp directory.
+ String tempPathString = context.getConfiguration().get(SweepJob.WORKING_FILES_DIR_KEY);
+ StoreFile.Writer mobFileWriter = MobUtils.createWriter(conf, fs, hcd,
+ partitionId.getDate(), new Path(tempPathString), snapshot.getCellsCount(),
+ hcd.getCompactionCompression(), partitionId.getStartKey(), cacheConfig);
+
+ String relativePath = mobFileWriter.getPath().getName();
+ LOG.info("Create files under a temp directory " + mobFileWriter.getPath().toString());
+
+ byte[] referenceValue = Bytes.toBytes(relativePath);
+ KeyValueScanner scanner = snapshot.getScanner();
+ Cell cell = null;
+ while (null != (cell = scanner.next())) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ mobFileWriter.append(kv);
+ }
+ scanner.close();
+ // Write out the log sequence number that corresponds to this output
+ // hfile. The hfile is current up to and including logCacheFlushId.
+ mobFileWriter.appendMetadata(Long.MAX_VALUE, false, snapshot.getCellsCount());
+ mobFileWriter.close();
+
+ MobUtils.commitFile(conf, fs, mobFileWriter.getPath(), mobFamilyDir, cacheConfig);
+ context.getCounter(SweepCounter.FILE_AFTER_MERGE_OR_CLEAN).increment(1);
+ // write reference/fileName back to the store files of HBase.
+ scanner = snapshot.getScanner();
+ scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
+ cell = null;
- Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, this.table.getTableName());
++ Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, Bytes.toBytes(this.table.getName().toString()));
+ while (null != (cell = scanner.next())) {
+ KeyValue reference = MobUtils.createMobRefKeyValue(cell, referenceValue, tableNameTag);
+ Put put =
+ new Put(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength());
+ put.add(reference);
- table.put(put);
++ table.mutate(put);
+ context.getCounter(SweepCounter.RECORDS_UPDATED).increment(1);
+ }
- table.flushCommits();
++ table.flush();
+ scanner.close();
+ }
+
+ /**
+ * Adds a KeyValue into the memstore.
+ * @param kv The KeyValue to be added.
+ * @throws IOException
+ */
+ public void addToMemstore(KeyValue kv) throws IOException {
+ memstore.add(kv);
+ // flush the memstore if it's full.
+ flushMemStoreIfNecessary();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
index 73ca1a2,0000000..cbefd8a
mode 100644,000000..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
@@@ -1,512 -1,0 +1,509 @@@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.InvalidFamilyOperationException;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.HBaseAdmin;
- import org.apache.hadoop.hbase.client.HTable;
++import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobFile;
+import org.apache.hadoop.hbase.mob.MobFileName;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.DefaultMemStore;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * The reducer of a sweep job.
+ * This reducer merges the small mob files into bigger ones, and write visited
+ * names of mob files to a sequence file which is used by the sweep job to delete
+ * the unused mob files.
+ * The key of the input is a file name, the value is a collection of KeyValue where
+ * the KeyValue is the actual cell (its format is valueLength + fileName) in HBase.
+ * In this reducer, we could know how many cells exist in HBase for a mob file.
+ * If the existCellSize/mobFileSize < compactionRatio, this mob
+ * file needs to be merged.
+ */
+@InterfaceAudience.Private
+public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
+
+ private static final Log LOG = LogFactory.getLog(SweepReducer.class);
+
+ private SequenceFile.Writer writer = null;
+ private MemStoreWrapper memstore;
+ private Configuration conf;
+ private FileSystem fs;
+
+ private Path familyDir;
+ private CacheConfig cacheConfig;
+ private long compactionBegin;
- private HTable table;
++ private BufferedMutator table;
+ private HColumnDescriptor family;
+ private long mobCompactionDelay;
+ private Path mobTableDir;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ this.conf = context.getConfiguration();
++ Connection c = ConnectionFactory.createConnection(this.conf);
+ this.fs = FileSystem.get(conf);
+ // the MOB_SWEEP_JOB_DELAY is ONE_DAY by default. Its value is only changed when testing.
+ mobCompactionDelay = conf.getLong(SweepJob.MOB_SWEEP_JOB_DELAY, SweepJob.ONE_DAY);
+ String tableName = conf.get(TableInputFormat.INPUT_TABLE);
+ String familyName = conf.get(TableInputFormat.SCAN_COLUMN_FAMILY);
+ TableName tn = TableName.valueOf(tableName);
+ this.familyDir = MobUtils.getMobFamilyPath(conf, tn, familyName);
- HBaseAdmin admin = new HBaseAdmin(this.conf);
++ Admin admin = c.getAdmin();
+ try {
+ family = admin.getTableDescriptor(tn).getFamily(Bytes.toBytes(familyName));
+ if (family == null) {
+ // this column family might be removed, directly return.
+ throw new InvalidFamilyOperationException("Column family '" + familyName
+ + "' does not exist. It might be removed.");
+ }
+ } finally {
+ try {
+ admin.close();
+ } catch (IOException e) {
+ LOG.warn("Fail to close the HBaseAdmin", e);
+ }
+ }
+ // disable the block cache.
+ Configuration copyOfConf = new Configuration(conf);
+ copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
+ this.cacheConfig = new CacheConfig(copyOfConf);
+
- table = new HTable(this.conf, Bytes.toBytes(tableName));
- table.setAutoFlush(false, false);
-
- table.setWriteBufferSize(1 * 1024 * 1024); // 1MB
++ table = c.getBufferedMutator(new BufferedMutatorParams(tn).writeBufferSize(1*1024*1024));
+ memstore = new MemStoreWrapper(context, fs, table, family, new DefaultMemStore(), cacheConfig);
+
+ // The start time of the sweep tool.
+ // Only the mob files whose creation time is older than startTime-oneDay will be handled by the
+ // reducer since it brings inconsistency to handle the latest mob files.
+ this.compactionBegin = conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE, 0);
+ mobTableDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), tn);
+ }
+
+ private SweepPartition createPartition(SweepPartitionId id, Context context) throws IOException {
+ return new SweepPartition(id, context);
+ }
+
+ @Override
+ public void run(Context context) throws IOException, InterruptedException {
+ String jobId = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID);
+ String owner = context.getConfiguration().get(SweepJob.SWEEP_JOB_SERVERNAME);
+ String sweeperNode = context.getConfiguration().get(SweepJob.SWEEP_JOB_TABLE_NODE);
+ ZooKeeperWatcher zkw = new ZooKeeperWatcher(context.getConfiguration(), jobId,
+ new DummyMobAbortable());
+ FSDataOutputStream fout = null;
+ try {
+ SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, owner);
+ tracker.start();
+ setup(context);
+ // create a sequence contains all the visited file names in this reducer.
+ String dir = this.conf.get(SweepJob.WORKING_VISITED_DIR_KEY);
+ Path nameFilePath = new Path(dir, UUID.randomUUID().toString()
+ .replace("-", MobConstants.EMPTY_STRING));
+ fout = fs.create(nameFilePath, true);
+ writer = SequenceFile.createWriter(context.getConfiguration(), fout, String.class,
+ String.class, CompressionType.NONE, null);
+ SweepPartitionId id;
+ SweepPartition partition = null;
+ // the mob files which have the same start key and date are in the same partition.
+ while (context.nextKey()) {
+ Text key = context.getCurrentKey();
+ String keyString = key.toString();
+ id = SweepPartitionId.create(keyString);
+ if (null == partition || !id.equals(partition.getId())) {
+ // It's the first mob file in the current partition.
+ if (null != partition) {
+ // this mob file is in different partitions with the previous mob file.
+ // directly close.
+ partition.close();
+ }
+ // create a new one
+ partition = createPartition(id, context);
+ }
+ if (partition != null) {
+ // run the partition
+ partition.execute(key, context.getValues());
+ }
+ }
+ if (null != partition) {
+ partition.close();
+ }
+ writer.hflush();
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ } finally {
+ cleanup(context);
+ zkw.close();
+ if (writer != null) {
+ IOUtils.closeStream(writer);
+ }
+ if (fout != null) {
+ IOUtils.closeStream(fout);
+ }
+ if (table != null) {
+ try {
+ table.close();
+ } catch (IOException e) {
+ LOG.warn(e);
+ }
+ }
+ }
+
+ }
+
+ /**
+ * The mob files which have the same start key and date are in the same partition.
+ * The files in the same partition are merged together into bigger ones.
+ */
+ public class SweepPartition {
+
+ private final SweepPartitionId id;
+ private final Context context;
+ private boolean memstoreUpdated = false;
+ private boolean mergeSmall = false;
+ private final Map<String, MobFileStatus> fileStatusMap = new HashMap<String, MobFileStatus>();
+ private final List<Path> toBeDeleted = new ArrayList<Path>();
+
+ public SweepPartition(SweepPartitionId id, Context context) throws IOException {
+ this.id = id;
+ this.context = context;
+ memstore.setPartitionId(id);
+ init();
+ }
+
+ public SweepPartitionId getId() {
+ return this.id;
+ }
+
+ /**
+ * Prepares the map of files.
+ *
+ * @throws IOException
+ */
+ private void init() throws IOException {
+ FileStatus[] fileStats = listStatus(familyDir, id.getStartKey());
+ if (null == fileStats) {
+ return;
+ }
+
+ int smallFileCount = 0;
+ float compactionRatio = conf.getFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO,
+ MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO);
+ long compactionMergeableSize = conf.getLong(
+ MobConstants.MOB_SWEEP_TOOL_COMPACTION_MERGEABLE_SIZE,
+ MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE);
+ // list the files. Just merge the hfiles, don't merge the hfile links.
+ // prepare the map of mob files. The key is the file name, the value is the file status.
+ for (FileStatus fileStat : fileStats) {
+ MobFileStatus mobFileStatus = null;
+ if (!HFileLink.isHFileLink(fileStat.getPath())) {
+ mobFileStatus = new MobFileStatus(fileStat, compactionRatio, compactionMergeableSize);
+ if (mobFileStatus.needMerge()) {
+ smallFileCount++;
+ }
+ // key is file name (not hfile name), value is hfile status.
+ fileStatusMap.put(fileStat.getPath().getName(), mobFileStatus);
+ }
+ }
+ if (smallFileCount >= 2) {
+ // merge the files only when there're more than 1 files in the same partition.
+ this.mergeSmall = true;
+ }
+ }
+
+ /**
+ * Flushes the data into mob files and store files, and archives the small
+ * files after they're merged.
+ * @throws IOException
+ */
+ public void close() throws IOException {
+ if (null == id) {
+ return;
+ }
+ // flush remain key values into mob files
+ if (memstoreUpdated) {
+ memstore.flushMemStore();
+ }
+ List<StoreFile> storeFiles = new ArrayList<StoreFile>(toBeDeleted.size());
+ // delete samll files after compaction
+ for (Path path : toBeDeleted) {
+ LOG.info("[In Partition close] Delete the file " + path + " in partition close");
+ storeFiles.add(new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE));
+ }
+ if (!storeFiles.isEmpty()) {
+ try {
+ MobUtils.removeMobFiles(conf, fs, table.getName(), mobTableDir, family.getName(),
+ storeFiles);
+ context.getCounter(SweepCounter.FILE_TO_BE_MERGE_OR_CLEAN).increment(storeFiles.size());
+ } catch (IOException e) {
+ LOG.error("Fail to archive the store files " + storeFiles, e);
+ }
+ storeFiles.clear();
+ }
+ fileStatusMap.clear();
+ }
+
+ /**
+ * Merges the small mob files into bigger ones.
+ * @param fileName The current mob file name.
+ * @param values The collection of KeyValues in this mob file.
+ * @throws IOException
+ */
+ public void execute(Text fileName, Iterable<KeyValue> values) throws IOException {
+ if (null == values) {
+ return;
+ }
+ MobFileName mobFileName = MobFileName.create(fileName.toString());
+ LOG.info("[In reducer] The file name: " + fileName.toString());
+ MobFileStatus mobFileStat = fileStatusMap.get(mobFileName.getFileName());
+ if (null == mobFileStat) {
+ LOG.info("[In reducer] Cannot find the file, probably this record is obsolete");
+ return;
+ }
+ // only handle the files that are older then one day.
+ if (compactionBegin - mobFileStat.getFileStatus().getModificationTime()
+ <= mobCompactionDelay) {
+ return;
+ }
+ // write the hfile name
+ writer.append(mobFileName.getFileName(), MobConstants.EMPTY_STRING);
+ Set<KeyValue> kvs = new HashSet<KeyValue>();
+ for (KeyValue kv : values) {
+ if (kv.getValueLength() > Bytes.SIZEOF_INT) {
+ mobFileStat.addValidSize(Bytes.toInt(kv.getValueArray(), kv.getValueOffset(),
+ Bytes.SIZEOF_INT));
+ }
+ kvs.add(kv.createKeyOnly(false));
+ }
+ // If the mob file is a invalid one or a small one, merge it into new/bigger ones.
+ if (mobFileStat.needClean() || (mergeSmall && mobFileStat.needMerge())) {
+ context.getCounter(SweepCounter.INPUT_FILE_COUNT).increment(1);
+ MobFile file = MobFile.create(fs,
+ new Path(familyDir, mobFileName.getFileName()), conf, cacheConfig);
+ StoreFileScanner scanner = null;
+ try {
+ scanner = file.getScanner();
+ scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY));
+ Cell cell;
+ while (null != (cell = scanner.next())) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ KeyValue keyOnly = kv.createKeyOnly(false);
+ if (kvs.contains(keyOnly)) {
+ // write the KeyValue existing in HBase to the memstore.
+ memstore.addToMemstore(kv);
+ memstoreUpdated = true;
+ }
+ }
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ }
+ toBeDeleted.add(mobFileStat.getFileStatus().getPath());
+ }
+ }
+
+ /**
+ * Lists the files with the same prefix.
+ * @param p The file path.
+ * @param prefix The prefix.
+ * @return The files with the same prefix.
+ * @throws IOException
+ */
+ private FileStatus[] listStatus(Path p, String prefix) throws IOException {
+ return fs.listStatus(p, new PathPrefixFilter(prefix));
+ }
+ }
+
+ static class PathPrefixFilter implements PathFilter {
+
+ private final String prefix;
+
+ public PathPrefixFilter(String prefix) {
+ this.prefix = prefix;
+ }
+
+ public boolean accept(Path path) {
+ return path.getName().startsWith(prefix, 0);
+ }
+
+ }
+
+ /**
+ * The sweep partition id.
+ * It consists of the start key and date.
+ * The start key is a hex string of the checksum of a region start key.
+ * The date is the latest timestamp of cells in a mob file.
+ */
+ public static class SweepPartitionId {
+ private String date;
+ private String startKey;
+
+ public SweepPartitionId(MobFileName fileName) {
+ this.date = fileName.getDate();
+ this.startKey = fileName.getStartKey();
+ }
+
+ public SweepPartitionId(String date, String startKey) {
+ this.date = date;
+ this.startKey = startKey;
+ }
+
+ public static SweepPartitionId create(String key) {
+ return new SweepPartitionId(MobFileName.create(key));
+ }
+
+ @Override
+ public boolean equals(Object anObject) {
+ if (this == anObject) {
+ return true;
+ }
+ if (anObject instanceof SweepPartitionId) {
+ SweepPartitionId another = (SweepPartitionId) anObject;
+ if (this.date.equals(another.getDate()) && this.startKey.equals(another.getStartKey())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public String getDate() {
+ return this.date;
+ }
+
+ public String getStartKey() {
+ return this.startKey;
+ }
+
+ public void setDate(String date) {
+ this.date = date;
+ }
+
+ public void setStartKey(String startKey) {
+ this.startKey = startKey;
+ }
+ }
+
+ /**
+ * The mob file status used in the sweep reduecer.
+ */
+ private static class MobFileStatus {
+ private FileStatus fileStatus;
+ private int validSize;
+ private long size;
+
+ private float compactionRatio = MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO;
+ private long compactionMergeableSize =
+ MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE;
+
+ /**
+ * @param fileStatus The current FileStatus.
+ * @param compactionRatio compactionRatio the invalid ratio.
+ * If there're too many cells deleted in a mob file, it's regarded as invalid,
+ * and needs to be written to a new one.
+ * If existingCellSize/fileSize < compactionRatio, it's regarded as a invalid one.
+ * @param compactionMergeableSize compactionMergeableSize If the size of a mob file is less
+ * than this value, it's regarded as a small file and needs to be merged
+ */
+ public MobFileStatus(FileStatus fileStatus, float compactionRatio,
+ long compactionMergeableSize) {
+ this.fileStatus = fileStatus;
+ this.size = fileStatus.getLen();
+ validSize = 0;
+ this.compactionRatio = compactionRatio;
+ this.compactionMergeableSize = compactionMergeableSize;
+ }
+
+ /**
+ * Add size to this file.
+ * @param size The size to be added.
+ */
+ public void addValidSize(int size) {
+ this.validSize += size;
+ }
+
+ /**
+ * Whether the mob files need to be cleaned.
+ * If there're too many cells deleted in this mob file, it needs to be cleaned.
+ * @return True if it needs to be cleaned.
+ */
+ public boolean needClean() {
+ return validSize < compactionRatio * size;
+ }
+
+ /**
+ * Whether the mob files need to be merged.
+ * If this mob file is too small, it needs to be merged.
+ * @return True if it needs to be merged.
+ */
+ public boolean needMerge() {
+ return this.size < compactionMergeableSize;
+ }
+
+ /**
+ * Gets the file status.
+ * @return The file status.
+ */
+ public FileStatus getFileStatus() {
+ return fileStatus;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
index 73b8cb9,73b8cb9..8ff4840
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
@@@ -85,7 -85,7 +85,7 @@@ public class DefaultStoreFlusher extend
scanner.close();
}
LOG.info("Flushed, sequenceid=" + cacheFlushId +", memsize="
-- + StringUtils.humanReadableInt(snapshot.getSize()) +
++ + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getSize(), "", 1) +
", hasBloomFilter=" + writer.hasGeneralBloom() +
", into tmp file " + writer.getPath());
result.add(writer.getPath());
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index ab0165d,e082698..6684309
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@@ -3276,34 -3421,12 +3422,30 @@@ public class HRegion implements HeapSiz
Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
SnapshotManifest manifest = SnapshotManifest.create(conf, getFilesystem(),
- snapshotDir, desc, exnSnare);
+ snapshotDir, desc, exnSnare);
manifest.addRegion(this);
+
+ // The regionserver holding the first region of the table is responsible for taking the
+ // manifest of the mob dir.
- if (!Bytes.equals(getStartKey(), HConstants.EMPTY_START_ROW))
++ if (!Bytes.equals(getRegionInfo().getStartKey(), HConstants.EMPTY_START_ROW))
+ return;
+
+ // if any cf's have is mob enabled, add the "mob region" to the manifest.
- Map<byte[], Store> stores = getStores();
- for (Entry<byte[], Store> store : stores.entrySet()) {
- boolean hasMobStore = store.getValue().getFamily().isMobEnabled();
++ List<Store> stores = getStores();
++ for (Store store : stores) {
++ boolean hasMobStore = store.getFamily().isMobEnabled();
+ if (hasMobStore) {
+ // use the .mob as the start key and 0 as the regionid
+ HRegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(this.getTableDesc().getTableName());
+ mobRegionInfo.setOffline(true);
+ manifest.addMobRegion(mobRegionInfo, this.getTableDesc().getColumnFamilies());
+ return;
+ }
+ }
}
- /**
- * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} with the
- * provided current timestamp.
- * @throws IOException
- */
- void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final byte[] now)
+ @Override
+ public void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final byte[] now)
throws IOException {
for (List<Cell> cells: cellItr) {
if (cells == null) continue;
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index 159ec55,8f7dee4..ea9558f
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@@ -549,27 -461,19 +560,28 @@@ class MetricsRegionServerWrapperImp
long tempFlushedCellsSize = 0;
long tempCompactedCellsSize = 0;
long tempMajorCompactedCellsSize = 0;
+ long tempMobCompactedIntoMobCellsCount = 0;
+ long tempMobCompactedFromMobCellsCount = 0;
+ long tempMobCompactedIntoMobCellsSize = 0;
+ long tempMobCompactedFromMobCellsSize = 0;
+ long tempMobFlushCount = 0;
+ long tempMobFlushedCellsCount = 0;
+ long tempMobFlushedCellsSize = 0;
+ long tempMobScanCellsCount = 0;
+ long tempMobScanCellsSize = 0;
long tempBlockedRequestsCount = 0L;
- for (HRegion r : regionServer.getOnlineRegionsLocalContext()) {
- tempNumMutationsWithoutWAL += r.numMutationsWithoutWAL.get();
- tempDataInMemoryWithoutWAL += r.dataInMemoryWithoutWAL.get();
- tempReadRequestsCount += r.readRequestsCount.get();
- tempWriteRequestsCount += r.writeRequestsCount.get();
- tempCheckAndMutateChecksFailed += r.checkAndMutateChecksFailed.get();
- tempCheckAndMutateChecksPassed += r.checkAndMutateChecksPassed.get();
+ for (Region r : regionServer.getOnlineRegionsLocalContext()) {
+ tempNumMutationsWithoutWAL += r.getNumMutationsWithoutWAL();
+ tempDataInMemoryWithoutWAL += r.getDataInMemoryWithoutWAL();
+ tempReadRequestsCount += r.getReadRequestsCount();
+ tempWriteRequestsCount += r.getWriteRequestsCount();
+ tempCheckAndMutateChecksFailed += r.getCheckAndMutateChecksFailed();
+ tempCheckAndMutateChecksPassed += r.getCheckAndMutateChecksPassed();
tempBlockedRequestsCount += r.getBlockedRequestsCount();
- tempNumStores += r.stores.size();
- for (Store store : r.stores.values()) {
+ List<Store> storeList = r.getStores();
+ tempNumStores += storeList.size();
+ for (Store store : storeList) {
tempNumStoreFiles += store.getStorefilesCount();
tempMemstoreSize += store.getMemStoreSize();
tempStoreFileSize += store.getStorefilesSize();
@@@ -582,21 -486,13 +594,25 @@@
tempFlushedCellsSize += store.getFlushedCellsSize();
tempCompactedCellsSize += store.getCompactedCellsSize();
tempMajorCompactedCellsSize += store.getMajorCompactedCellsSize();
+ if (store instanceof HMobStore) {
+ HMobStore mobStore = (HMobStore) store;
+ tempMobCompactedIntoMobCellsCount += mobStore.getMobCompactedIntoMobCellsCount();
+ tempMobCompactedFromMobCellsCount += mobStore.getMobCompactedFromMobCellsCount();
+ tempMobCompactedIntoMobCellsSize += mobStore.getMobCompactedIntoMobCellsSize();
+ tempMobCompactedFromMobCellsSize += mobStore.getMobCompactedFromMobCellsSize();
+ tempMobFlushCount += mobStore.getMobFlushCount();
+ tempMobFlushedCellsCount += mobStore.getMobFlushedCellsCount();
+ tempMobFlushedCellsSize += mobStore.getMobFlushedCellsSize();
+ tempMobScanCellsCount += mobStore.getMobScanCellsCount();
+ tempMobScanCellsSize += mobStore.getMobScanCellsSize();
+ }
}
- hdfsBlocksDistribution.add(r.getHDFSBlocksDistribution());
+ HDFSBlocksDistribution distro = r.getHDFSBlocksDistribution();
+ hdfsBlocksDistribution.add(distro);
+ if (r.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
+ hdfsBlocksDistributionSecondaryRegions.add(distro);
+ }
}
float localityIndex = hdfsBlocksDistribution.getBlockLocalityIndex(
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
index f7f0acd,0000000..5739df1
mode 100644,000000..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
@@@ -1,80 -1,0 +1,80 @@@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mob.MobUtils;
+
+/**
+ * Scanner scans both the memstore and the MOB Store. Coalesce KeyValue stream into List<KeyValue>
+ * for a single row.
+ *
+ */
+@InterfaceAudience.Private
+public class MobStoreScanner extends StoreScanner {
+
+ private boolean cacheMobBlocks = false;
+ private final HMobStore mobStore;
+
+ public MobStoreScanner(Store store, ScanInfo scanInfo, Scan scan,
+ final NavigableSet<byte[]> columns, long readPt) throws IOException {
+ super(store, scanInfo, scan, columns, readPt);
+ cacheMobBlocks = MobUtils.isCacheMobBlocks(scan);
+ if (!(store instanceof HMobStore)) {
+ throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
+ }
+ mobStore = (HMobStore) store;
+ }
+
+ /**
+ * Firstly reads the cells from the HBase. If the cell are a reference cell (which has the
+ * reference tag), the scanner need seek this cell from the mob file, and use the cell found
+ * from the mob file as the result.
+ */
+ @Override
- public boolean next(List<Cell> outResult, int limit) throws IOException {
- boolean result = super.next(outResult, limit);
++ public boolean next(List<Cell> outResult, ScannerContext ctx) throws IOException {
++ boolean result = super.next(outResult, ctx);
+ if (!MobUtils.isRawMobScan(scan)) {
+ // retrieve the mob data
+ if (outResult.isEmpty()) {
+ return result;
+ }
+ long mobKVCount = 0;
+ long mobKVSize = 0;
+ for (int i = 0; i < outResult.size(); i++) {
+ Cell cell = outResult.get(i);
+ if (MobUtils.isMobReferenceCell(cell)) {
+ Cell mobCell = mobStore.resolve(cell, cacheMobBlocks);
+ mobKVCount++;
+ mobKVSize += mobCell.getValueLength();
+ outResult.set(i, mobCell);
+ }
+ }
+ mobStore.updateMobScanCellsCount(mobKVCount);
+ mobStore.updateMobScanCellsSize(mobKVSize);
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
index 4c46218,0000000..85be382
mode 100644,000000..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
@@@ -1,80 -1,0 +1,80 @@@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mob.MobUtils;
+
+/**
+ * ReversedMobStoreScanner extends from ReversedStoreScanner, and is used to support
+ * reversed scanning in both the memstore and the MOB store.
+ *
+ */
+@InterfaceAudience.Private
+public class ReversedMobStoreScanner extends ReversedStoreScanner {
+
+ private boolean cacheMobBlocks = false;
+ protected final HMobStore mobStore;
+
+ ReversedMobStoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
+ long readPt) throws IOException {
+ super(store, scanInfo, scan, columns, readPt);
+ cacheMobBlocks = MobUtils.isCacheMobBlocks(scan);
+ if (!(store instanceof HMobStore)) {
+ throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
+ }
+ mobStore = (HMobStore) store;
+ }
+
+ /**
+ * Firstly reads the cells from the HBase. If the cell are a reference cell (which has the
+ * reference tag), the scanner need seek this cell from the mob file, and use the cell found
+ * from the mob file as the result.
+ */
+ @Override
- public boolean next(List<Cell> outResult, int limit) throws IOException {
- boolean result = super.next(outResult, limit);
++ public boolean next(List<Cell> outResult, ScannerContext ctx) throws IOException {
++ boolean result = super.next(outResult, ctx);
+ if (!MobUtils.isRawMobScan(scan)) {
+ // retrieve the mob data
+ if (outResult.isEmpty()) {
+ return result;
+ }
+ long mobKVCount = 0;
+ long mobKVSize = 0;
+ for (int i = 0; i < outResult.size(); i++) {
+ Cell cell = outResult.get(i);
+ if (MobUtils.isMobReferenceCell(cell)) {
+ Cell mobCell = mobStore.resolve(cell, cacheMobBlocks);
+ mobKVCount++;
+ mobKVSize += mobCell.getValueLength();
+ outResult.set(i, mobCell);
+ }
+ }
+ mobStore.updateMobScanCellsCount(mobKVCount);
+ mobStore.updateMobScanCellsSize(mobKVSize);
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
index 27d53ba,0000000..60fc0ff
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
@@@ -1,251 -1,0 +1,252 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.NamespaceNotFoundException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test clone snapshots from the client
+ */
+@Category({LargeTests.class, ClientTests.class})
+public class TestMobCloneSnapshotFromClient {
+ final Log LOG = LogFactory.getLog(getClass());
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private final byte[] FAMILY = Bytes.toBytes("cf");
+
+ private byte[] emptySnapshot;
+ private byte[] snapshotName0;
+ private byte[] snapshotName1;
+ private byte[] snapshotName2;
+ private int snapshot0Rows;
+ private int snapshot1Rows;
+ private TableName tableName;
+ private Admin admin;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
+ TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 10);
+ TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
+ TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
+ TEST_UTIL.getConfiguration().setBoolean(
+ "hbase.master.enabletable.roundrobin", true);
+ TEST_UTIL.getConfiguration().setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0);
+ TEST_UTIL.startMiniCluster(3);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * Initialize the tests with a table filled with some data
+ * and two snapshots (snapshotName0, snapshotName1) of different states.
+ * The tableName, snapshotNames and the number of rows in the snapshot are initialized.
+ */
+ @Before
+ public void setup() throws Exception {
+ this.admin = TEST_UTIL.getHBaseAdmin();
+
+ long tid = System.currentTimeMillis();
+ tableName = TableName.valueOf("testtb-" + tid);
+ emptySnapshot = Bytes.toBytes("emptySnaptb-" + tid);
+ snapshotName0 = Bytes.toBytes("snaptb0-" + tid);
+ snapshotName1 = Bytes.toBytes("snaptb1-" + tid);
+ snapshotName2 = Bytes.toBytes("snaptb2-" + tid);
+
+ // create Table and disable it
+ MobSnapshotTestingUtils.createMobTable(TEST_UTIL, tableName, getNumReplicas(), FAMILY);
+ admin.disableTable(tableName);
+
+ // take an empty snapshot
+ admin.snapshot(emptySnapshot, tableName);
+
- HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
++ Connection c = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
++ Table table = c.getTable(tableName);
+ try {
+ // enable table and insert data
+ admin.enableTable(tableName);
+ SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
+ snapshot0Rows = MobSnapshotTestingUtils.countMobRows(table);
+ admin.disableTable(tableName);
+
+ // take a snapshot
+ admin.snapshot(snapshotName0, tableName);
+
+ // enable table and insert more data
+ admin.enableTable(tableName);
+ SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
+ snapshot1Rows = MobSnapshotTestingUtils.countMobRows(table);
+ admin.disableTable(tableName);
+
+ // take a snapshot of the updated table
+ admin.snapshot(snapshotName1, tableName);
+
+ // re-enable table
+ admin.enableTable(tableName);
+ } finally {
+ table.close();
+ }
+ }
+
+ protected int getNumReplicas() {
+ return 1;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (admin.tableExists(tableName)) {
+ TEST_UTIL.deleteTable(tableName);
+ }
+ SnapshotTestingUtils.deleteAllSnapshots(admin);
+ SnapshotTestingUtils.deleteArchiveDirectory(TEST_UTIL);
+ }
+
+ @Test(expected=SnapshotDoesNotExistException.class)
+ public void testCloneNonExistentSnapshot() throws IOException, InterruptedException {
+ String snapshotName = "random-snapshot-" + System.currentTimeMillis();
+ TableName tableName = TableName.valueOf("random-table-" + System.currentTimeMillis());
+ admin.cloneSnapshot(snapshotName, tableName);
+ }
+
+ @Test(expected = NamespaceNotFoundException.class)
+ public void testCloneOnMissingNamespace() throws IOException, InterruptedException {
+ TableName clonedTableName = TableName.valueOf("unknownNS:clonetb");
+ admin.cloneSnapshot(snapshotName1, clonedTableName);
+ }
+
+ @Test
+ public void testCloneSnapshot() throws IOException, InterruptedException {
+ TableName clonedTableName = TableName.valueOf("clonedtb-" + System.currentTimeMillis());
+ testCloneSnapshot(clonedTableName, snapshotName0, snapshot0Rows);
+ testCloneSnapshot(clonedTableName, snapshotName1, snapshot1Rows);
+ testCloneSnapshot(clonedTableName, emptySnapshot, 0);
+ }
+
+ private void testCloneSnapshot(final TableName tableName, final byte[] snapshotName,
+ int snapshotRows) throws IOException, InterruptedException {
+ // create a new table from snapshot
+ admin.cloneSnapshot(snapshotName, tableName);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshotRows);
+
+ verifyReplicasCameOnline(tableName);
+ TEST_UTIL.deleteTable(tableName);
+ }
+
+ protected void verifyReplicasCameOnline(TableName tableName) throws IOException {
+ SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
+ }
+
+ @Test
+ public void testCloneSnapshotCrossNamespace() throws IOException, InterruptedException {
+ String nsName = "testCloneSnapshotCrossNamespace";
+ admin.createNamespace(NamespaceDescriptor.create(nsName).build());
+ TableName clonedTableName =
+ TableName.valueOf(nsName, "clonedtb-" + System.currentTimeMillis());
+ testCloneSnapshot(clonedTableName, snapshotName0, snapshot0Rows);
+ testCloneSnapshot(clonedTableName, snapshotName1, snapshot1Rows);
+ testCloneSnapshot(clonedTableName, emptySnapshot, 0);
+ }
+
+ /**
+ * Verify that tables created from the snapshot are still alive after source table deletion.
+ */
+ @Test
+ public void testCloneLinksAfterDelete() throws IOException, InterruptedException {
+ // Clone a table from the first snapshot
+ TableName clonedTableName = TableName.valueOf("clonedtb1-" + System.currentTimeMillis());
+ admin.cloneSnapshot(snapshotName0, clonedTableName);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName, snapshot0Rows);
+
+ // Take a snapshot of this cloned table.
+ admin.disableTable(clonedTableName);
+ admin.snapshot(snapshotName2, clonedTableName);
+
+ // Clone the snapshot of the cloned table
+ TableName clonedTableName2 = TableName.valueOf("clonedtb2-" + System.currentTimeMillis());
+ admin.cloneSnapshot(snapshotName2, clonedTableName2);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName2, snapshot0Rows);
+ admin.disableTable(clonedTableName2);
+
+ // Remove the original table
+ TEST_UTIL.deleteTable(tableName);
+ waitCleanerRun();
+
+ // Verify the first cloned table
+ admin.enableTable(clonedTableName);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName, snapshot0Rows);
+
+ // Verify the second cloned table
+ admin.enableTable(clonedTableName2);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName2, snapshot0Rows);
+ admin.disableTable(clonedTableName2);
+
+ // Delete the first cloned table
+ TEST_UTIL.deleteTable(clonedTableName);
+ waitCleanerRun();
+
+ // Verify the second cloned table
+ admin.enableTable(clonedTableName2);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName2, snapshot0Rows);
+
+ // Clone a new table from cloned
+ TableName clonedTableName3 = TableName.valueOf("clonedtb3-" + System.currentTimeMillis());
+ admin.cloneSnapshot(snapshotName2, clonedTableName3);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName3, snapshot0Rows);
+
+ // Delete the cloned tables
+ TEST_UTIL.deleteTable(clonedTableName2);
+ TEST_UTIL.deleteTable(clonedTableName3);
+ admin.deleteSnapshot(snapshotName2);
+ }
+
+ // ==========================================================================
+ // Helpers
+ // ==========================================================================
+
+ private void waitCleanerRun() throws InterruptedException {
+ TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobRestoreSnapshotFromClient.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobRestoreSnapshotFromClient.java
index 0bb498d,0000000..6fc2d28
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobRestoreSnapshotFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobRestoreSnapshotFromClient.java
@@@ -1,304 -1,0 +1,306 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
+import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
+import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test restore snapshots from the client
+ */
+@Category({ClientTests.class, LargeTests.class})
+public class TestMobRestoreSnapshotFromClient {
+ final Log LOG = LogFactory.getLog(getClass());
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private final byte[] FAMILY = Bytes.toBytes("cf");
+
+ private byte[] emptySnapshot;
+ private byte[] snapshotName0;
+ private byte[] snapshotName1;
+ private byte[] snapshotName2;
+ private int snapshot0Rows;
+ private int snapshot1Rows;
+ private TableName tableName;
+ private Admin admin;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
+ TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 10);
+ TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
+ TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
+ TEST_UTIL.getConfiguration().setBoolean(
+ "hbase.master.enabletable.roundrobin", true);
+ TEST_UTIL.getConfiguration().setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0);
+ TEST_UTIL.startMiniCluster(3);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * Initialize the tests with a table filled with some data
+ * and two snapshots (snapshotName0, snapshotName1) of different states.
+ * The tableName, snapshotNames and the number of rows in the snapshot are initialized.
+ */
+ @Before
+ public void setup() throws Exception {
+ this.admin = TEST_UTIL.getHBaseAdmin();
+
+ long tid = System.currentTimeMillis();
+ tableName =
+ TableName.valueOf("testtb-" + tid);
+ emptySnapshot = Bytes.toBytes("emptySnaptb-" + tid);
+ snapshotName0 = Bytes.toBytes("snaptb0-" + tid);
+ snapshotName1 = Bytes.toBytes("snaptb1-" + tid);
+ snapshotName2 = Bytes.toBytes("snaptb2-" + tid);
+
+ // create Table and disable it
+ MobSnapshotTestingUtils.createMobTable(TEST_UTIL, tableName, getNumReplicas(), FAMILY);
+
+ admin.disableTable(tableName);
+
+ // take an empty snapshot
+ admin.snapshot(emptySnapshot, tableName);
+
- HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
++ Table table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())
++ .getTable(tableName);
+ // enable table and insert data
+ admin.enableTable(tableName);
+ SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
+ snapshot0Rows = MobSnapshotTestingUtils.countMobRows(table);
+ admin.disableTable(tableName);
+
+ // take a snapshot
+ admin.snapshot(snapshotName0, tableName);
+
+ // enable table and insert more data
+ admin.enableTable(tableName);
+ SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
+ snapshot1Rows = MobSnapshotTestingUtils.countMobRows(table);
+ table.close();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TEST_UTIL.deleteTable(tableName);
+ SnapshotTestingUtils.deleteAllSnapshots(TEST_UTIL.getHBaseAdmin());
+ SnapshotTestingUtils.deleteArchiveDirectory(TEST_UTIL);
+ }
+
+ @Test
+ public void testRestoreSnapshot() throws IOException {
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot1Rows);
+ admin.disableTable(tableName);
+ admin.snapshot(snapshotName1, tableName);
+ // Restore from snapshot-0
+ admin.restoreSnapshot(snapshotName0);
+ admin.enableTable(tableName);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot0Rows);
+ SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
+
+ // Restore from emptySnapshot
+ admin.disableTable(tableName);
+ admin.restoreSnapshot(emptySnapshot);
+ admin.enableTable(tableName);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, 0);
+ SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
+
+ // Restore from snapshot-1
+ admin.disableTable(tableName);
+ admin.restoreSnapshot(snapshotName1);
+ admin.enableTable(tableName);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot1Rows);
+ SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
+
+ // Restore from snapshot-1
+ TEST_UTIL.deleteTable(tableName);
+ admin.restoreSnapshot(snapshotName1);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot1Rows);
+ SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
+ }
+
+ protected int getNumReplicas() {
+ return 1;
+ }
+
+ @Test
+ public void testRestoreSchemaChange() throws Exception {
+ byte[] TEST_FAMILY2 = Bytes.toBytes("cf2");
+
- HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
++ Table table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())
++ .getTable(tableName);
+
+ // Add one column family and put some data in it
+ admin.disableTable(tableName);
+ HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAMILY2);
+ hcd.setMobEnabled(true);
+ hcd.setMobThreshold(3L);
+ admin.addColumn(tableName, hcd);
+ admin.enableTable(tableName);
+ assertEquals(2, table.getTableDescriptor().getFamilies().size());
+ HTableDescriptor htd = admin.getTableDescriptor(tableName);
+ assertEquals(2, htd.getFamilies().size());
+ SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, TEST_FAMILY2);
+ long snapshot2Rows = snapshot1Rows + 500;
+ assertEquals(snapshot2Rows, MobSnapshotTestingUtils.countMobRows(table));
+ assertEquals(500, MobSnapshotTestingUtils.countMobRows(table, TEST_FAMILY2));
+ Set<String> fsFamilies = getFamiliesFromFS(tableName);
+ assertEquals(2, fsFamilies.size());
+
+ // Take a snapshot
+ admin.disableTable(tableName);
+ admin.snapshot(snapshotName2, tableName);
+
+ // Restore the snapshot (without the cf)
+ admin.restoreSnapshot(snapshotName0);
+ admin.enableTable(tableName);
+ assertEquals(1, table.getTableDescriptor().getFamilies().size());
+ try {
+ MobSnapshotTestingUtils.countMobRows(table, TEST_FAMILY2);
+ fail("family '" + Bytes.toString(TEST_FAMILY2) + "' should not exists");
+ } catch (NoSuchColumnFamilyException e) {
+ // expected
+ }
+ assertEquals(snapshot0Rows, MobSnapshotTestingUtils.countMobRows(table));
+ htd = admin.getTableDescriptor(tableName);
+ assertEquals(1, htd.getFamilies().size());
+ fsFamilies = getFamiliesFromFS(tableName);
+ assertEquals(1, fsFamilies.size());
+
+ // Restore back the snapshot (with the cf)
+ admin.disableTable(tableName);
+ admin.restoreSnapshot(snapshotName2);
+ admin.enableTable(tableName);
+ htd = admin.getTableDescriptor(tableName);
+ assertEquals(2, htd.getFamilies().size());
+ assertEquals(2, table.getTableDescriptor().getFamilies().size());
+ assertEquals(500, MobSnapshotTestingUtils.countMobRows(table, TEST_FAMILY2));
+ assertEquals(snapshot2Rows, MobSnapshotTestingUtils.countMobRows(table));
+ fsFamilies = getFamiliesFromFS(tableName);
+ assertEquals(2, fsFamilies.size());
+ table.close();
+ }
+
+ @Test
+ public void testCloneSnapshotOfCloned() throws IOException, InterruptedException {
+ TableName clonedTableName =
+ TableName.valueOf("clonedtb-" + System.currentTimeMillis());
+ admin.cloneSnapshot(snapshotName0, clonedTableName);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName, snapshot0Rows);
+ SnapshotTestingUtils.verifyReplicasCameOnline(clonedTableName, admin, getNumReplicas());
+ admin.disableTable(clonedTableName);
+ admin.snapshot(snapshotName2, clonedTableName);
+ TEST_UTIL.deleteTable(clonedTableName);
+ waitCleanerRun();
+
+ admin.cloneSnapshot(snapshotName2, clonedTableName);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, clonedTableName, snapshot0Rows);
+ SnapshotTestingUtils.verifyReplicasCameOnline(clonedTableName, admin, getNumReplicas());
+ TEST_UTIL.deleteTable(clonedTableName);
+ }
+
+ @Test
+ public void testCloneAndRestoreSnapshot() throws IOException, InterruptedException {
+ TEST_UTIL.deleteTable(tableName);
+ waitCleanerRun();
+
+ admin.cloneSnapshot(snapshotName0, tableName);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot0Rows);
+ SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
+ waitCleanerRun();
+
+ admin.disableTable(tableName);
+ admin.restoreSnapshot(snapshotName0);
+ admin.enableTable(tableName);
+ MobSnapshotTestingUtils.verifyMobRowCount(TEST_UTIL, tableName, snapshot0Rows);
+ SnapshotTestingUtils.verifyReplicasCameOnline(tableName, admin, getNumReplicas());
+ }
+
+ @Test
+ public void testCorruptedSnapshot() throws IOException, InterruptedException {
+ SnapshotTestingUtils.corruptSnapshot(TEST_UTIL, Bytes.toString(snapshotName0));
+ TableName cloneName = TableName.valueOf("corruptedClone-" + System.currentTimeMillis());
+ try {
+ admin.cloneSnapshot(snapshotName0, cloneName);
+ fail("Expected CorruptedSnapshotException, got succeeded cloneSnapshot()");
+ } catch (CorruptedSnapshotException e) {
+ // Got the expected corruption exception.
+ // check for no references of the cloned table.
+ assertFalse(admin.tableExists(cloneName));
+ } catch (Exception e) {
+ fail("Expected CorruptedSnapshotException got: " + e);
+ }
+ }
+
+ // ==========================================================================
+ // Helpers
+ // ==========================================================================
+ private void waitCleanerRun() throws InterruptedException {
+ TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting();
+ }
+
+ private Set<String> getFamiliesFromFS(final TableName tableName) throws IOException {
+ MasterFileSystem mfs = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterFileSystem();
+ Set<String> families = new HashSet<String>();
+ Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), tableName);
+ for (Path regionDir: FSUtils.getRegionDirs(mfs.getFileSystem(), tableDir)) {
+ for (Path familyDir: FSUtils.getFamilyDirs(mfs.getFileSystem(), regionDir)) {
+ families.add(familyDir.getName());
+ }
+ }
+ return families;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobSnapshotCloneIndependence.java
----------------------------------------------------------------------
diff --cc hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobSnapshotCloneIndependence.java
index 612b98a,0000000..a2cd51c
mode 100644,000000..100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobSnapshotCloneIndependence.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobSnapshotCloneIndependence.java
@@@ -1,376 -1,0 +1,375 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
+import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test to verify that the cloned table is independent of the table from which it was cloned
+ */
+@Category(LargeTests.class)
+public class TestMobSnapshotCloneIndependence {
+ private static final Log LOG = LogFactory.getLog(TestSnapshotCloneIndependence.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static final int NUM_RS = 2;
+ private static final String STRING_TABLE_NAME = "test";
+ private static final String TEST_FAM_STR = "fam";
+ private static final byte[] TEST_FAM = Bytes.toBytes(TEST_FAM_STR);
+ private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME);
+
+ /**
+ * Setup the config for the cluster and start it
+ * @throws Exception on failure
+ */
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ setupConf(UTIL.getConfiguration());
+ UTIL.startMiniCluster(NUM_RS);
+ }
+
+ private static void setupConf(Configuration conf) {
+ // enable snapshot support
+ conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+ // disable the ui
+ conf.setInt("hbase.regionsever.info.port", -1);
+ // change the flush size to a small amount, regulating number of store files
+ conf.setInt("hbase.hregion.memstore.flush.size", 25000);
+ // so make sure we get a compaction when doing a load, but keep around
+ // some files in the store
+ conf.setInt("hbase.hstore.compaction.min", 10);
+ conf.setInt("hbase.hstore.compactionThreshold", 10);
+ // block writes if we get to 12 store files
+ conf.setInt("hbase.hstore.blockingStoreFiles", 12);
+ conf.setInt("hbase.regionserver.msginterval", 100);
+ conf.setBoolean("hbase.master.enabletable.roundrobin", true);
+ // Avoid potentially aggressive splitting which would cause snapshot to fail
+ conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
+ ConstantSizeRegionSplitPolicy.class.getName());
+ conf.setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0);
+ }
+
+ @Before
+ public void setup() throws Exception {
+ MobSnapshotTestingUtils.createMobTable(UTIL, TableName.valueOf(STRING_TABLE_NAME), TEST_FAM);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ UTIL.deleteTable(TABLE_NAME);
+ SnapshotTestingUtils.deleteAllSnapshots(UTIL.getHBaseAdmin());
+ SnapshotTestingUtils.deleteArchiveDirectory(UTIL);
+ }
+
+ @AfterClass
+ public static void cleanupTest() throws Exception {
+ try {
+ UTIL.shutdownMiniCluster();
+ } catch (Exception e) {
+ LOG.warn("failure shutting down cluster", e);
+ }
+ }
+
+ /**
+ * Verify that adding data to the cloned table will not affect the original, and vice-versa when
+ * it is taken as an online snapshot.
+ */
+ @Test (timeout=300000)
+ public void testOnlineSnapshotAppendIndependent() throws Exception {
+ runTestSnapshotAppendIndependent(true);
+ }
+
+ /**
+ * Verify that adding data to the cloned table will not affect the original, and vice-versa when
+ * it is taken as an offline snapshot.
+ */
+ @Test (timeout=300000)
+ public void testOfflineSnapshotAppendIndependent() throws Exception {
+ runTestSnapshotAppendIndependent(false);
+ }
+
+ /**
+ * Verify that adding metadata to the cloned table will not affect the original, and vice-versa
+ * when it is taken as an online snapshot.
+ */
+ @Test (timeout=300000)
+ public void testOnlineSnapshotMetadataChangesIndependent() throws Exception {
+ runTestSnapshotMetadataChangesIndependent(true);
+ }
+
+ /**
+ * Verify that adding netadata to the cloned table will not affect the original, and vice-versa
+ * when is taken as an online snapshot.
+ */
+ @Test (timeout=300000)
+ public void testOfflineSnapshotMetadataChangesIndependent() throws Exception {
+ runTestSnapshotMetadataChangesIndependent(false);
+ }
+
+ /**
+ * Verify that region operations, in this case splitting a region, are independent between the
+ * cloned table and the original.
+ */
+ @Test (timeout=300000)
+ public void testOfflineSnapshotRegionOperationsIndependent() throws Exception {
+ runTestRegionOperationsIndependent(false);
+ }
+
+ /**
+ * Verify that region operations, in this case splitting a region, are independent between the
+ * cloned table and the original.
+ */
+ @Test (timeout=300000)
+ public void testOnlineSnapshotRegionOperationsIndependent() throws Exception {
+ runTestRegionOperationsIndependent(true);
+ }
+
+ private static void waitOnSplit(final HTable t, int originalCount) throws Exception {
+ for (int i = 0; i < 200; i++) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ }
+ if (t.getRegionLocations().size() > originalCount) {
+ return;
+ }
+ }
+ throw new Exception("Split did not increase the number of regions");
+ }
+
+ /*
+ * Take a snapshot of a table, add data, and verify that this only
+ * affects one table
+ * @param online - Whether the table is online or not during the snapshot
+ */
+ private void runTestSnapshotAppendIndependent(boolean online) throws Exception {
+ FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+ Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+
+ Admin admin = UTIL.getHBaseAdmin();
+ final long startTime = System.currentTimeMillis();
+ final TableName localTableName =
+ TableName.valueOf(STRING_TABLE_NAME + startTime);
+
- HTable original = MobSnapshotTestingUtils.createMobTable(UTIL, localTableName, TEST_FAM);
++ Table original = MobSnapshotTestingUtils.createMobTable(UTIL, localTableName, TEST_FAM);
+ try {
+
+ SnapshotTestingUtils.loadData(UTIL, localTableName, 500, TEST_FAM);
+ final int origTableRowCount = MobSnapshotTestingUtils.countMobRows(original);
+
+ // Take a snapshot
+ final String snapshotNameAsString = "snapshot_" + localTableName;
+ byte[] snapshotName = Bytes.toBytes(snapshotNameAsString);
+
+ SnapshotTestingUtils.createSnapshotAndValidate(admin, localTableName, TEST_FAM_STR,
+ snapshotNameAsString, rootDir, fs, online);
+
+ if (!online) {
+ admin.enableTable(localTableName);
+ }
+ TableName cloneTableName = TableName.valueOf("test-clone-" + localTableName);
+ admin.cloneSnapshot(snapshotName, cloneTableName);
+
- HTable clonedTable = new HTable(UTIL.getConfiguration(), cloneTableName);
++ Table clonedTable = ConnectionFactory.createConnection(UTIL.getConfiguration())
++ .getTable(cloneTableName);
+
+ try {
+ final int clonedTableRowCount = MobSnapshotTestingUtils.countMobRows(clonedTable);
+
+ Assert.assertEquals(
+ "The line counts of original and cloned tables do not match after clone. ",
+ origTableRowCount, clonedTableRowCount);
+
+ // Attempt to add data to the test
+ final String rowKey = "new-row-" + System.currentTimeMillis();
+
+ Put p = new Put(Bytes.toBytes(rowKey));
+ p.add(TEST_FAM, Bytes.toBytes("someQualifier"), Bytes.toBytes("someString"));
+ original.put(p);
- original.flushCommits();
+
+ // Verify that it is not present in the original table
+ Assert.assertEquals("The row count of the original table was not modified by the put",
+ origTableRowCount + 1, MobSnapshotTestingUtils.countMobRows(original));
+ Assert.assertEquals(
+ "The row count of the cloned table changed as a result of addition to the original",
+ clonedTableRowCount, MobSnapshotTestingUtils.countMobRows(clonedTable));
+
+ p = new Put(Bytes.toBytes(rowKey));
- p.add(TEST_FAM, Bytes.toBytes("someQualifier"), Bytes.toBytes("someString"));
++ p.addColumn(TEST_FAM, Bytes.toBytes("someQualifier"), Bytes.toBytes("someString"));
+ clonedTable.put(p);
- clonedTable.flushCommits();
+
+ // Verify that the new family is not in the restored table's description
+ Assert.assertEquals(
+ "The row count of the original table was modified by the put to the clone",
+ origTableRowCount + 1, MobSnapshotTestingUtils.countMobRows(original));
+ Assert.assertEquals("The row count of the cloned table was not modified by the put",
+ clonedTableRowCount + 1, MobSnapshotTestingUtils.countMobRows(clonedTable));
+ } finally {
+
+ clonedTable.close();
+ }
+ } finally {
+
+ original.close();
+ }
+ }
+
+ /*
+ * Take a snapshot of a table, do a split, and verify that this only affects one table
+ * @param online - Whether the table is online or not during the snapshot
+ */
+ private void runTestRegionOperationsIndependent(boolean online) throws Exception {
+ FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+ Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+
+ // Create a table
+ Admin admin = UTIL.getHBaseAdmin();
+ final long startTime = System.currentTimeMillis();
+ final TableName localTableName =
+ TableName.valueOf(STRING_TABLE_NAME + startTime);
- HTable original = MobSnapshotTestingUtils.createMobTable(UTIL, localTableName, TEST_FAM);
++ Table original = MobSnapshotTestingUtils.createMobTable(UTIL, localTableName, TEST_FAM);
+ SnapshotTestingUtils.loadData(UTIL, localTableName, 500, TEST_FAM);
+ final int loadedTableCount = MobSnapshotTestingUtils.countMobRows(original);
+ System.out.println("Original table has: " + loadedTableCount + " rows");
+
+ final String snapshotNameAsString = "snapshot_" + localTableName;
+
+ // Create a snapshot
+ SnapshotTestingUtils.createSnapshotAndValidate(admin, localTableName, TEST_FAM_STR,
+ snapshotNameAsString, rootDir, fs, online);
+
+ if (!online) {
+ admin.enableTable(localTableName);
+ }
+
+ TableName cloneTableName = TableName.valueOf("test-clone-" + localTableName);
+
+ // Clone the snapshot
+ byte[] snapshotName = Bytes.toBytes(snapshotNameAsString);
+ admin.cloneSnapshot(snapshotName, cloneTableName);
+
+ // Verify that region information is the same pre-split
- original.clearRegionCache();
++ ((HTable)original).clearRegionCache();
+ List<HRegionInfo> originalTableHRegions = admin.getTableRegions(localTableName);
+
+ final int originalRegionCount = originalTableHRegions.size();
+ final int cloneTableRegionCount = admin.getTableRegions(cloneTableName).size();
+ Assert.assertEquals(
+ "The number of regions in the cloned table is different than in the original table.",
+ originalRegionCount, cloneTableRegionCount);
+
+ // Split a region on the parent table
+ admin.splitRegion(originalTableHRegions.get(0).getRegionName());
- waitOnSplit(original, originalRegionCount);
++ waitOnSplit((HTable)original, originalRegionCount);
+
+ // Verify that the cloned table region is not split
+ final int cloneTableRegionCount2 = admin.getTableRegions(cloneTableName).size();
+ Assert.assertEquals(
+ "The number of regions in the cloned table changed though none of its regions were split.",
+ cloneTableRegionCount, cloneTableRegionCount2);
+ }
+
+ /*
+ * Take a snapshot of a table, add metadata, and verify that this only
+ * affects one table
+ * @param online - Whether the table is online or not during the snapshot
+ */
+ private void runTestSnapshotMetadataChangesIndependent(boolean online) throws Exception {
+ FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+ Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+
+ // Create a table
+ Admin admin = UTIL.getHBaseAdmin();
+ final long startTime = System.currentTimeMillis();
+ final TableName localTableName =
+ TableName.valueOf(STRING_TABLE_NAME + startTime);
- HTable original = MobSnapshotTestingUtils.createMobTable(UTIL, localTableName, TEST_FAM);
++ Table original = MobSnapshotTestingUtils.createMobTable(UTIL, localTableName, TEST_FAM);
+ SnapshotTestingUtils.loadData(UTIL, localTableName, 500, TEST_FAM);
+
+ final String snapshotNameAsString = "snapshot_" + localTableName;
+
+ // Create a snapshot
+ SnapshotTestingUtils.createSnapshotAndValidate(admin, localTableName, TEST_FAM_STR,
+ snapshotNameAsString, rootDir, fs, online);
+
+ if (!online) {
+ admin.enableTable(localTableName);
+ }
+ TableName cloneTableName = TableName.valueOf("test-clone-" + localTableName);
+
+ // Clone the snapshot
+ byte[] snapshotName = Bytes.toBytes(snapshotNameAsString);
+ admin.cloneSnapshot(snapshotName, cloneTableName);
+
+ // Add a new column family to the original table
+ byte[] TEST_FAM_2 = Bytes.toBytes("fam2");
+ HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM_2);
+
+ admin.disableTable(localTableName);
+ admin.addColumn(localTableName, hcd);
+
+ // Verify that it is not in the snapshot
+ admin.enableTable(localTableName);
+
+ // get a description of the cloned table
+ // get a list of its families
+ // assert that the family is there
+ HTableDescriptor originalTableDescriptor = original.getTableDescriptor();
+ HTableDescriptor clonedTableDescriptor = admin.getTableDescriptor(cloneTableName);
+
+ Assert.assertTrue("The original family was not found. There is something wrong. ",
+ originalTableDescriptor.hasFamily(TEST_FAM));
+ Assert.assertTrue("The original family was not found in the clone. There is something wrong. ",
+ clonedTableDescriptor.hasFamily(TEST_FAM));
+
+ Assert.assertTrue("The new family was not found. ",
+ originalTableDescriptor.hasFamily(TEST_FAM_2));
+ Assert.assertTrue("The new family was not found. ",
+ !clonedTableDescriptor.hasFamily(TEST_FAM_2));
+ }
+}