You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2015/06/10 14:35:40 UTC
hbase git commit: HBASE-13836 Do not reset the mvcc for bulk loaded
mob reference cells in reading. (Jingcheng)
Repository: hbase
Updated Branches:
refs/heads/hbase-11339 ba348cf5a -> 26893aa45
HBASE-13836 Do not reset the mvcc for bulk loaded mob reference cells in reading. (Jingcheng)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/26893aa4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/26893aa4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/26893aa4
Branch: refs/heads/hbase-11339
Commit: 26893aa451215ef0395b7df16f129414b7b86c86
Parents: ba348cf
Author: anoopsjohn <an...@gmail.com>
Authored: Wed Jun 10 18:05:10 2015 +0530
Committer: anoopsjohn <an...@gmail.com>
Committed: Wed Jun 10 18:05:10 2015 +0530
----------------------------------------------------------------------
.../compactions/PartitionedMobCompactor.java | 1 +
.../hadoop/hbase/regionserver/StoreFile.java | 34 ++++++++++
.../hbase/regionserver/StoreFileScanner.java | 2 +-
.../hbase/mob/compactions/TestMobCompactor.java | 70 ++++++++++++++++++--
4 files changed, 101 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/26893aa4/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
index 065787e..8cda746 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
@@ -588,6 +588,7 @@ public class PartitionedMobCompactor extends MobCompactor {
if (writer != null) {
writer.appendMetadata(maxSeqId, false);
writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime));
+ writer.appendFileInfo(StoreFile.SKIP_RESET_SEQ_ID, Bytes.toBytes(true));
try {
writer.close();
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/26893aa4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 7001acb..009ba0d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -165,6 +165,13 @@ public class StoreFile {
private final BloomType cfBloomType;
/**
+ * Key for skipping resetting sequence id in metadata.
+ * For bulk loaded hfiles, the scanner resets the cell seqId with the latest one,
+ * if this metadata is set as true, the reset is skipped.
+ */
+ public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID");
+
+ /**
* Constructor, loads a reader and it's indices, etc. May allocate a
* substantial amount of ram depending on the underlying files (10-20MB?).
*
@@ -407,6 +414,12 @@ public class StoreFile {
this.sequenceid += 1;
}
}
+ // SKIP_RESET_SEQ_ID only works in bulk loaded file.
+ // In mob compaction, the hfile where the cells contain the path of a new mob file is bulk
+ // loaded to hbase, these cells have the same seqIds with the old ones. We do not want
+ // to reset new seqIds for them since this might make a mess of the visibility of cells that
+ // have the same row key but different seqIds.
+ this.reader.setSkipResetSeqId(isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID)));
this.reader.setBulkLoaded(true);
}
this.reader.setSequenceID(this.sequenceid);
@@ -536,6 +549,18 @@ public class StoreFile {
return sb.toString();
}
+ /**
+ * Gets whether to skip resetting the sequence id for cells.
+ * @param skipResetSeqId The byte array of boolean.
+ * @return Whether to skip resetting the sequence id.
+ */
+ private boolean isSkipResetSeqId(byte[] skipResetSeqId) {
+ if (skipResetSeqId != null && skipResetSeqId.length == 1) {
+ return Bytes.toBoolean(skipResetSeqId);
+ }
+ return false;
+ }
+
public static class WriterBuilder {
private final Configuration conf;
private final CacheConfig cacheConf;
@@ -1068,6 +1093,7 @@ public class StoreFile {
private long deleteFamilyCnt = -1;
private boolean bulkLoadResult = false;
private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null;
+ private boolean skipResetSeqId = true;
public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf)
throws IOException {
@@ -1594,6 +1620,14 @@ public class StoreFile {
public long getMaxTimestamp() {
return timeRangeTracker == null ? Long.MAX_VALUE : timeRangeTracker.getMaximumTimestamp();
}
+
+ boolean isSkipResetSeqId() {
+ return skipResetSeqId;
+ }
+
+ void setSkipResetSeqId(boolean skipResetSeqId) {
+ this.skipResetSeqId = skipResetSeqId;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/26893aa4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index 42a378d..1111a61 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -208,7 +208,7 @@ public class StoreFileScanner implements KeyValueScanner {
protected void setCurrentCell(Cell newVal) throws IOException {
this.cur = newVal;
- if (this.cur != null && this.reader.isBulkLoaded()) {
+ if (this.cur != null && this.reader.isBulkLoaded() && !this.reader.isSkipResetSeqId()) {
CellUtil.setSequenceId(cur, this.reader.getSequenceID());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26893aa4/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
index d63bb95..380ebac 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.security.Key;
import java.security.SecureRandom;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
@@ -54,6 +55,7 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -177,11 +179,11 @@ public class TestMobCompactor {
TableName tableName = TableName.valueOf(tableNameAsString);
HColumnDescriptor hcd1 = new HColumnDescriptor(family1);
hcd1.setMobEnabled(true);
- hcd1.setMobThreshold(0L);
+ hcd1.setMobThreshold(5);
hcd1.setMaxVersions(4);
HColumnDescriptor hcd2 = new HColumnDescriptor(family2);
hcd2.setMobEnabled(true);
- hcd2.setMobThreshold(0L);
+ hcd2.setMobThreshold(5);
hcd2.setMaxVersions(4);
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(hcd1);
@@ -570,7 +572,7 @@ public class TestMobCompactor {
// do the mob compaction
admin.compactMob(tableName, hcd1.getName());
- waitUntilCompactionFinished(tableName);
+ waitUntilMobCompactionFinished(tableName);
assertEquals("After compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum),
countMobRows(hTable));
assertEquals("After compaction: mob cells count", regionNum
@@ -618,7 +620,7 @@ public class TestMobCompactor {
// do the major mob compaction, it will force all files to compaction
admin.majorCompactMob(tableName, hcd1.getName());
- waitUntilCompactionFinished(tableName);
+ waitUntilMobCompactionFinished(tableName);
assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
countMobRows(hTable));
assertEquals("After compaction: mob cells count",
@@ -633,7 +635,58 @@ public class TestMobCompactor {
countFiles(tableName, false, family2));
}
- private void waitUntilCompactionFinished(TableName tableName) throws IOException,
+ @Test
+ public void testScannerOnBulkLoadRefHFiles() throws Exception {
+ long ts = EnvironmentEdgeManager.currentTime();
+ byte[] key0 = Bytes.toBytes("k0");
+ byte[] key1 = Bytes.toBytes("k1");
+ String value0 = "mobValue0";
+ String value1 = "mobValue1";
+ String newValue0 = "new";
+ Put put0 = new Put(key0);
+ put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(value0));
+ loadData(admin, bufMut, tableName, new Put[] { put0 });
+ put0 = new Put(key0);
+ put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(newValue0));
+ Put put1 = new Put(key1);
+ put1.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(value1));
+ loadData(admin, bufMut, tableName, new Put[] { put0, put1 });
+ // read the latest cell of key0.
+ Get get = new Get(key0);
+ Result result = hTable.get(get);
+ Cell cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
+ assertEquals("Before compaction: mob value of k0", "new",
+ Bytes.toString(CellUtil.cloneValue(cell)));
+ admin.majorCompactMob(tableName, hcd1.getName());
+ waitUntilMobCompactionFinished(tableName);
+ // read the latest cell of key0, the cell seqId in bulk loaded file is not reset in the
+ // scanner. The cell that has "new" value is still visible.
+ result = hTable.get(get);
+ cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
+ assertEquals("After compaction: mob value of k0", "new",
+ Bytes.toString(CellUtil.cloneValue(cell)));
+ // read the ref cell, not read further to the mob cell.
+ get.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(true));
+ result = hTable.get(get);
+ cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
+ // the ref name is the new file
+ Path mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(),
+ tableName), hcd1.getNameAsString());
+ List<Path> paths = new ArrayList<Path>();
+ if (fs.exists(mobFamilyPath)) {
+ FileStatus[] files = fs.listStatus(mobFamilyPath);
+ for (FileStatus file : files) {
+ if (!StoreFileInfo.isDelFile(file.getPath())) {
+ paths.add(file.getPath());
+ }
+ }
+ }
+ assertEquals("After compaction: number of mob files:", 1, paths.size());
+ assertEquals("After compaction: mob file name:", MobUtils.getMobFileName(cell), paths.get(0)
+ .getName());
+ }
+
+ private void waitUntilMobCompactionFinished(TableName tableName) throws IOException,
InterruptedException {
long finished = EnvironmentEdgeManager.currentTime() + 60000;
CompactionState state = admin.getMobCompactionState(tableName);
@@ -804,6 +857,13 @@ public class TestMobCompactor {
}
}
+ private void loadData(Admin admin, BufferedMutator table, TableName tableName, Put[] puts)
+ throws IOException {
+ table.mutate(Arrays.asList(puts));
+ table.flush();
+ admin.flush(tableName);
+ }
+
/**
* delete the row, family and cell to create the del file
*/