You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/07/29 23:52:03 UTC

[25/47] hbase git commit: HBASE-15968 New behavior of versions considering mvcc and ts rather than ts only

HBASE-15968 New behavior of versions considering mvcc and ts rather than ts only


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1ac4152b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1ac4152b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1ac4152b

Branch: refs/heads/HBASE-18426
Commit: 1ac4152b1952202f99e216265c9c046e09ecd02e
Parents: fabab8c
Author: Phil Yang <ya...@apache.org>
Authored: Tue Jul 18 11:37:28 2017 +0800
Committer: Phil Yang <ya...@apache.org>
Committed: Tue Jul 25 15:00:36 2017 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/HColumnDescriptor.java  |  17 +
 .../hbase/client/ColumnFamilyDescriptor.java    |   7 +
 .../client/ColumnFamilyDescriptorBuilder.java   |  19 +
 .../hadoop/hbase/regionserver/ScanInfo.java     |  13 +-
 .../regionserver/compactions/Compactor.java     |   3 +-
 .../querymatcher/ColumnTracker.java             |   6 +-
 .../CompactionScanQueryMatcher.java             |  30 +-
 .../querymatcher/DeleteTracker.java             |   4 +-
 .../DropDeletesCompactionScanQueryMatcher.java  |   5 +-
 .../IncludeAllCompactionQueryMatcher.java       |  42 +++
 .../querymatcher/LegacyScanQueryMatcher.java    |  29 +-
 .../MajorCompactionScanQueryMatcher.java        |   5 +-
 .../MinorCompactionScanQueryMatcher.java        |   4 +-
 .../querymatcher/NewVersionBehaviorTracker.java | 376 +++++++++++++++++++
 .../NormalUserScanQueryMatcher.java             |   8 +-
 .../querymatcher/ScanQueryMatcher.java          |  55 ++-
 .../StripeCompactionScanQueryMatcher.java       |   6 +-
 .../querymatcher/UserScanQueryMatcher.java      |  29 +-
 .../VisibilityNewVersionBehaivorTracker.java    | 202 ++++++++++
 .../hadoop/hbase/PerformanceEvaluation.java     |   3 +-
 .../regionserver/TestCompactingMemStore.java    |   2 +-
 .../hbase/regionserver/TestCompaction.java      |   2 +-
 .../TestDefaultCompactSelection.java            |   2 +-
 .../hbase/regionserver/TestDefaultMemStore.java |   4 +-
 .../hbase/regionserver/TestMajorCompaction.java |   2 +-
 .../TestNewVersionBehaviorFromClientSide.java   | 356 ++++++++++++++++++
 .../regionserver/TestReversibleScanners.java    |   2 +-
 .../hbase/regionserver/TestStoreScanner.java    |  10 +-
 .../TestCompactionScanQueryMatcher.java         |   2 +-
 .../TestNewVersionBehaviorTracker.java          | 262 +++++++++++++
 .../querymatcher/TestUserScanQueryMatcher.java  |  10 +-
 ...sibilityLabelsOnNewVersionBehaviorTable.java |  38 ++
 .../TestVisibilityLabelsWithDeletes.java        |  42 ++-
 .../hbase/util/TestCoprocessorScanPolicy.java   |   6 +-
 hbase-shell/src/main/ruby/hbase/admin.rb        |   1 +
 35 files changed, 1484 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
index 6764975..5fe85cc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
@@ -97,6 +97,8 @@ public class HColumnDescriptor implements ColumnFamilyDescriptor, Comparable<HCo
   public static final boolean DEFAULT_EVICT_BLOCKS_ON_CLOSE = ColumnFamilyDescriptorBuilder.DEFAULT_EVICT_BLOCKS_ON_CLOSE;
   public static final boolean DEFAULT_COMPRESS_TAGS = ColumnFamilyDescriptorBuilder.DEFAULT_COMPRESS_TAGS;
   public static final boolean DEFAULT_PREFETCH_BLOCKS_ON_OPEN = ColumnFamilyDescriptorBuilder.DEFAULT_PREFETCH_BLOCKS_ON_OPEN;
+  public static final String NEW_VERSION_BEHAVIOR = ColumnFamilyDescriptorBuilder.NEW_VERSION_BEHAVIOR;
+  public static final boolean DEFAULT_NEW_VERSION_BEHAVIOR = ColumnFamilyDescriptorBuilder.DEFAULT_NEW_VERSION_BEHAVIOR;
   protected final ModifyableColumnFamilyDescriptor delegatee;
   /**
    * Construct a column descriptor specifying only the family name
@@ -411,6 +413,21 @@ public class HColumnDescriptor implements ColumnFamilyDescriptor, Comparable<HCo
     return this;
   }
 
+  /**
+   * By default, HBase only consider timestamp in versions. So a previous Delete with higher ts
+   * will mask a later Put with lower ts. Set this to true to enable new semantics of versions.
+   * We will also consider mvcc in versions. See HBASE-15968 for details.
+   */
+  public boolean isNewVersionBehavior() {
+    return delegatee.isNewVersionBehavior();
+  }
+
+  public HColumnDescriptor setNewVersionBehavior(boolean newVersionBehavior) {
+    getDelegateeForModification().setNewVersionBehavior(newVersionBehavior);
+    return this;
+  }
+
+
   @Override
   public int getTimeToLive() {
     return delegatee.getTimeToLive();

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java
index 507de93..76b5333 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java
@@ -215,4 +215,11 @@ public interface ColumnFamilyDescriptor {
    * @return Column family descriptor with only the customized attributes.
    */
   String toStringCustomizedValues();
+
+  /**
+   * By default, HBase only consider timestamp in versions. So a previous Delete with higher ts
+   * will mask a later Put with lower ts. Set this to true to enable new semantics of versions.
+   * We will also consider mvcc in versions. See HBASE-15968 for details.
+   */
+  boolean isNewVersionBehavior();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptorBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptorBuilder.java
index 613faa9..b3abaca 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptorBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptorBuilder.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
+
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeepDeletedCells;
 import org.apache.hadoop.hbase.MemoryCompactionPolicy;
@@ -179,6 +180,9 @@ public class ColumnFamilyDescriptorBuilder {
   public static final String STORAGE_POLICY = "STORAGE_POLICY";
   private static final Bytes STORAGE_POLICY_BYTES = new Bytes(Bytes.toBytes(STORAGE_POLICY));
 
+  public static final String NEW_VERSION_BEHAVIOR = "NEW_VERSION_BEHAVIOR";
+  private static final Bytes NEW_VERSION_BEHAVIOR_BYTES = new Bytes(Bytes.toBytes(NEW_VERSION_BEHAVIOR));
+  public static final boolean DEFAULT_NEW_VERSION_BEHAVIOR = false;
   /**
    * Default compression type.
    */
@@ -308,6 +312,7 @@ public class ColumnFamilyDescriptorBuilder {
     DEFAULT_VALUES.put(CACHE_BLOOMS_ON_WRITE, String.valueOf(DEFAULT_CACHE_BLOOMS_ON_WRITE));
     DEFAULT_VALUES.put(EVICT_BLOCKS_ON_CLOSE, String.valueOf(DEFAULT_EVICT_BLOCKS_ON_CLOSE));
     DEFAULT_VALUES.put(PREFETCH_BLOCKS_ON_OPEN, String.valueOf(DEFAULT_PREFETCH_BLOCKS_ON_OPEN));
+    DEFAULT_VALUES.put(NEW_VERSION_BEHAVIOR, String.valueOf(DEFAULT_NEW_VERSION_BEHAVIOR));
     DEFAULT_VALUES.keySet().forEach(s -> RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(s))));
     RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION)));
     RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION_KEY)));
@@ -889,6 +894,20 @@ public class ColumnFamilyDescriptorBuilder {
       return setValue(KEEP_DELETED_CELLS_BYTES, keepDeletedCells.name());
     }
 
+    /**
+     * By default, HBase only consider timestamp in versions. So a previous Delete with higher ts
+     * will mask a later Put with lower ts. Set this to true to enable new semantics of versions.
+     * We will also consider mvcc in versions. See HBASE-15968 for details.
+     */
+    public boolean isNewVersionBehavior() {
+      return getStringOrDefault(NEW_VERSION_BEHAVIOR_BYTES,
+          Boolean::parseBoolean, DEFAULT_NEW_VERSION_BEHAVIOR);
+    }
+
+    public ModifyableColumnFamilyDescriptor setNewVersionBehavior(boolean newVersionBehavior) {
+      return setValue(NEW_VERSION_BEHAVIOR_BYTES, Boolean.toString(newVersionBehavior));
+    }
+
     @Override
     public int getTimeToLive() {
       return getStringOrDefault(TTL_BYTES, Integer::parseInt, DEFAULT_TTL);

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
index 15baec6..da52f80 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
@@ -50,10 +50,11 @@ public class ScanInfo {
   private boolean parallelSeekEnabled;
   private final long preadMaxBytes;
   private final Configuration conf;
+  private final boolean newVersionBehavior;
 
   public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
       + (2 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_INT)
-      + (4 * Bytes.SIZEOF_LONG) + (3 * Bytes.SIZEOF_BOOLEAN));
+      + (4 * Bytes.SIZEOF_LONG) + (4 * Bytes.SIZEOF_BOOLEAN));
 
   /**
    * @param conf
@@ -66,7 +67,7 @@ public class ScanInfo {
   public ScanInfo(final Configuration conf, final ColumnFamilyDescriptor family, final long ttl,
       final long timeToPurgeDeletes, final CellComparator comparator) {
     this(conf, family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl,
-        family.getKeepDeletedCells(), family.getBlocksize(), timeToPurgeDeletes, comparator);
+        family.getKeepDeletedCells(), family.getBlocksize(), timeToPurgeDeletes, comparator, family.isNewVersionBehavior());
   }
 
   /**
@@ -83,7 +84,8 @@ public class ScanInfo {
    */
   public ScanInfo(final Configuration conf, final byte[] family, final int minVersions,
       final int maxVersions, final long ttl, final KeepDeletedCells keepDeletedCells,
-      final long blockSize, final long timeToPurgeDeletes, final CellComparator comparator) {
+      final long blockSize, final long timeToPurgeDeletes, final CellComparator comparator,
+      final boolean newVersionBehavior) {
     this.family = family;
     this.minVersions = minVersions;
     this.maxVersions = maxVersions;
@@ -103,6 +105,7 @@ public class ScanInfo {
       conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false);
     this.preadMaxBytes = conf.getLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 4 * blockSize);
     this.conf = conf;
+    this.newVersionBehavior = newVersionBehavior;
   }
 
   public Configuration getConfiguration() {
@@ -156,4 +159,8 @@ public class ScanInfo {
   long getPreadMaxBytes() {
     return preadMaxBytes;
   }
+
+  public boolean isNewVersionBehavior() {
+    return newVersionBehavior;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 736fb9a..d43a75b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -300,7 +300,8 @@ public abstract class Compactor<T extends CellSink> {
         return new ArrayList<>();
       }
       boolean cleanSeqId = false;
-      if (fd.minSeqIdToKeep > 0) {
+      if (fd.minSeqIdToKeep > 0 && !store.getColumnFamilyDescriptor().isNewVersionBehavior()) {
+        // For mvcc-sensitive family, we never set mvcc to 0.
         smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
         cleanSeqId = true;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java
index 7a2a1e2..7616df9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java
@@ -78,7 +78,7 @@ public interface ColumnTracker extends ShipperListener {
    * the {@link #checkColumn(Cell, byte)} method and perform all the operations in this
    * checkVersions method.
    * @param cell
-   * @param ttl The timeToLive to enforce.
+   * @param timestamp The timestamp of the cell.
    * @param type the type of the key value (Put/Delete)
    * @param ignoreCount indicates if the KV needs to be excluded while counting (used during
    *          compactions. We only count KV's that are older than all the scanners' read points.)
@@ -86,8 +86,8 @@ public interface ColumnTracker extends ShipperListener {
    * @throws IOException in case there is an internal consistency problem caused by a data
    *           corruption.
    */
-  ScanQueryMatcher.MatchCode checkVersions(Cell cell, long ttl, byte type, boolean ignoreCount)
-      throws IOException;
+  ScanQueryMatcher.MatchCode checkVersions(Cell cell, long timestamp, byte type,
+      boolean ignoreCount) throws IOException;
   /**
    * Resets the Matcher
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/CompactionScanQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/CompactionScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/CompactionScanQueryMatcher.java
index b3c14d7..8e68c76 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/CompactionScanQueryMatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/CompactionScanQueryMatcher.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.ScanInfo;
 import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * Query matcher for compaction.
@@ -45,10 +46,8 @@ public abstract class CompactionScanQueryMatcher extends ScanQueryMatcher {
   protected final KeepDeletedCells keepDeletedCells;
 
   protected CompactionScanQueryMatcher(ScanInfo scanInfo, DeleteTracker deletes,
-      long readPointToUse, long oldestUnexpiredTS, long now) {
-    super(createStartKeyFromRow(EMPTY_START_ROW, scanInfo), scanInfo,
-        new ScanWildcardColumnTracker(scanInfo.getMinVersions(), scanInfo.getMaxVersions(),
-            oldestUnexpiredTS),
+      ColumnTracker columnTracker, long readPointToUse, long oldestUnexpiredTS, long now) {
+    super(createStartKeyFromRow(EMPTY_START_ROW, scanInfo), scanInfo, columnTracker,
         oldestUnexpiredTS, now);
     this.maxReadPointToTrackVersions = readPointToUse;
     this.deletes = deletes;
@@ -109,18 +108,27 @@ public abstract class CompactionScanQueryMatcher extends ScanQueryMatcher {
       long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now,
       byte[] dropDeletesFromRow, byte[] dropDeletesToRow,
       RegionCoprocessorHost regionCoprocessorHost) throws IOException {
-    DeleteTracker deleteTracker = instantiateDeleteTracker(regionCoprocessorHost);
+    Pair<DeleteTracker, ColumnTracker> trackers = getTrackers(regionCoprocessorHost, null,
+        scanInfo,oldestUnexpiredTS, null);
+    DeleteTracker deleteTracker = trackers.getFirst();
+    ColumnTracker columnTracker = trackers.getSecond();
     if (dropDeletesFromRow == null) {
       if (scanType == ScanType.COMPACT_RETAIN_DELETES) {
-        return new MinorCompactionScanQueryMatcher(scanInfo, deleteTracker, readPointToUse,
-            oldestUnexpiredTS, now);
+        if (scanInfo.isNewVersionBehavior()) {
+          return new IncludeAllCompactionQueryMatcher(scanInfo, deleteTracker, columnTracker,
+              readPointToUse, oldestUnexpiredTS, now);
+        } else {
+          return new MinorCompactionScanQueryMatcher(scanInfo, deleteTracker, columnTracker,
+              readPointToUse, oldestUnexpiredTS, now);
+        }
       } else {
-        return new MajorCompactionScanQueryMatcher(scanInfo, deleteTracker, readPointToUse,
-            earliestPutTs, oldestUnexpiredTS, now);
+        return new MajorCompactionScanQueryMatcher(scanInfo, deleteTracker, columnTracker,
+            readPointToUse, earliestPutTs, oldestUnexpiredTS, now);
       }
     } else {
-      return new StripeCompactionScanQueryMatcher(scanInfo, deleteTracker, readPointToUse,
-          earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow);
+      return new StripeCompactionScanQueryMatcher(scanInfo, deleteTracker, columnTracker,
+          readPointToUse, earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow,
+          dropDeletesToRow);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DeleteTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DeleteTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DeleteTracker.java
index 45b170e..7c63b12 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DeleteTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DeleteTracker.java
@@ -96,7 +96,9 @@ public interface DeleteTracker extends ShipperListener {
     FAMILY_VERSION_DELETED, // The KeyValue is deleted by a delete family version.
     COLUMN_DELETED, // The KeyValue is deleted by a delete column.
     VERSION_DELETED, // The KeyValue is deleted by a version delete.
-    NOT_DELETED
+    NOT_DELETED,
+    VERSION_MASKED  // The KeyValue is masked by max number of versions which is considered as
+                    // deleted in strong semantics of versions(See MvccTracker)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DropDeletesCompactionScanQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DropDeletesCompactionScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DropDeletesCompactionScanQueryMatcher.java
index 89725fe..15762be 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DropDeletesCompactionScanQueryMatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DropDeletesCompactionScanQueryMatcher.java
@@ -53,8 +53,9 @@ public abstract class DropDeletesCompactionScanQueryMatcher extends CompactionSc
   protected final long earliestPutTs;
 
   protected DropDeletesCompactionScanQueryMatcher(ScanInfo scanInfo, DeleteTracker deletes,
-      long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now) {
-    super(scanInfo, deletes, readPointToUse, oldestUnexpiredTS, now);
+      ColumnTracker columns, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS,
+      long now) {
+    super(scanInfo, deletes, columns, readPointToUse, oldestUnexpiredTS, now);
     this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes();
     this.earliestPutTs = earliestPutTs;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/IncludeAllCompactionQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/IncludeAllCompactionQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/IncludeAllCompactionQueryMatcher.java
new file mode 100644
index 0000000..6937626
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/IncludeAllCompactionQueryMatcher.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.querymatcher;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+
+/**
+ * A compaction query matcher that always return INCLUDE and drops nothing.
+ */
+@InterfaceAudience.Private
+public class IncludeAllCompactionQueryMatcher extends MinorCompactionScanQueryMatcher{
+
+  public IncludeAllCompactionQueryMatcher(ScanInfo scanInfo, DeleteTracker deletes,
+      ColumnTracker columns, long readPointToUse, long oldestUnexpiredTS, long now) {
+    super(scanInfo, deletes, columns, readPointToUse, oldestUnexpiredTS, now);
+  }
+
+  @Override
+  public MatchCode match(Cell cell) throws IOException {
+    return MatchCode.INCLUDE;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java
index 8cdcd40..07fcb08 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.regionserver.ScanInfo;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker.DeleteResult;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * The old query matcher implementation. Used to keep compatibility for coprocessor that could
@@ -365,29 +366,17 @@ public class LegacyScanQueryMatcher extends ScanQueryMatcher {
       NavigableSet<byte[]> columns, ScanType scanType, long readPointToUse, long earliestPutTs,
       long oldestUnexpiredTS, long now, byte[] dropDeletesFromRow, byte[] dropDeletesToRow,
       RegionCoprocessorHost regionCoprocessorHost) throws IOException {
-    int maxVersions = Math.min(scan.getMaxVersions(), scanInfo.getMaxVersions());
-    boolean hasNullColumn;
-    ColumnTracker columnTracker;
-    if (columns == null || columns.isEmpty()) {
-      // there is always a null column in the wildcard column query.
-      hasNullColumn = true;
-      // use a specialized scan for wildcard column tracker.
-      columnTracker = new ScanWildcardColumnTracker(scanInfo.getMinVersions(), maxVersions,
-          oldestUnexpiredTS);
-    } else {
-      // We can share the ExplicitColumnTracker, diff is we reset
-      // between rows, not between storefiles.
-      // whether there is null column in the explicit column query
-      hasNullColumn = columns.first().length == 0;
-      columnTracker = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions,
-          oldestUnexpiredTS);
-    }
-    DeleteTracker deletes = instantiateDeleteTracker(regionCoprocessorHost);
+    boolean hasNullColumn =
+        !(columns != null && columns.size() != 0 && columns.first().length != 0);
+    Pair<DeleteTracker, ColumnTracker> trackers = getTrackers(regionCoprocessorHost, null,
+        scanInfo, oldestUnexpiredTS, scan);
+    DeleteTracker deleteTracker = trackers.getFirst();
+    ColumnTracker columnTracker = trackers.getSecond();
     if (dropDeletesFromRow == null) {
-      return new LegacyScanQueryMatcher(scan, scanInfo, columnTracker, hasNullColumn, deletes,
+      return new LegacyScanQueryMatcher(scan, scanInfo, columnTracker, hasNullColumn, deleteTracker,
           scanType, readPointToUse, earliestPutTs, oldestUnexpiredTS, now);
     } else {
-      return new LegacyScanQueryMatcher(scan, scanInfo, columnTracker, hasNullColumn, deletes,
+      return new LegacyScanQueryMatcher(scan, scanInfo, columnTracker, hasNullColumn, deleteTracker,
           scanType, readPointToUse, earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow,
           dropDeletesToRow);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MajorCompactionScanQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MajorCompactionScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MajorCompactionScanQueryMatcher.java
index 67e40ed..fda35dc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MajorCompactionScanQueryMatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MajorCompactionScanQueryMatcher.java
@@ -31,8 +31,9 @@ import org.apache.hadoop.hbase.regionserver.ScanInfo;
 public class MajorCompactionScanQueryMatcher extends DropDeletesCompactionScanQueryMatcher {
 
   public MajorCompactionScanQueryMatcher(ScanInfo scanInfo, DeleteTracker deletes,
-      long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now) {
-    super(scanInfo, deletes, readPointToUse, earliestPutTs, oldestUnexpiredTS, now);
+      ColumnTracker columns, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS,
+      long now) {
+    super(scanInfo, deletes, columns, readPointToUse, earliestPutTs, oldestUnexpiredTS, now);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MinorCompactionScanQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MinorCompactionScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MinorCompactionScanQueryMatcher.java
index cf36366..1e5d99b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MinorCompactionScanQueryMatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MinorCompactionScanQueryMatcher.java
@@ -31,8 +31,8 @@ import org.apache.hadoop.hbase.regionserver.ScanInfo;
 public class MinorCompactionScanQueryMatcher extends CompactionScanQueryMatcher {
 
   public MinorCompactionScanQueryMatcher(ScanInfo scanInfo, DeleteTracker deletes,
-      long readPointToUse, long oldestUnexpiredTS, long now) {
-    super(scanInfo, deletes, readPointToUse, oldestUnexpiredTS, now);
+      ColumnTracker columns, long readPointToUse, long oldestUnexpiredTS, long now) {
+    super(scanInfo, deletes, columns, readPointToUse, oldestUnexpiredTS, now);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NewVersionBehaviorTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NewVersionBehaviorTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NewVersionBehaviorTracker.java
new file mode 100644
index 0000000..9625a1c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NewVersionBehaviorTracker.java
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.querymatcher;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * A tracker both implementing ColumnTracker and DeleteTracker, used for mvcc-sensitive scanning.
+ * We should make sure in one QueryMatcher the ColumnTracker and DeleteTracker is the same instance.
+ */
+@InterfaceAudience.Private
+public class NewVersionBehaviorTracker implements ColumnTracker, DeleteTracker {
+
+  private byte[] lastCqArray;
+  private int lastCqLength;
+  private int lastCqOffset;
+  private long lastCqTs;
+  private long lastCqMvcc;
+  private byte lastCqType;
+  private int columnIndex;
+  private int countCurrentCol;
+
+  protected int maxVersions;
+  private int resultMaxVersions;
+  private byte[][] columns;
+  private int minVersions;
+  private long oldestStamp;
+
+  // These two maps have same structure.
+  // Each node is a versions deletion (DeleteFamily or DeleteColumn). Key is the mvcc of the marker,
+  // value is a data structure which contains infos we need that happens before this node's mvcc and
+  // after the previous node's mvcc. The last node is a special node whose key is max_long that
+  // saves infos after last deletion. See DeleteVersionsNode's comments for details.
+  // The delColMap is constructed and used for each cq, and thedelFamMap is constructed when cq is
+  // null and saving family-level delete markers. Each time the cq is changed, we should
+  // reconstruct delColMap as a deep copy of delFamMap.
+  protected NavigableMap<Long, DeleteVersionsNode> delColMap = new TreeMap<>();
+  protected NavigableMap<Long, DeleteVersionsNode> delFamMap = new TreeMap<>();
+
+  /**
+   * Note maxVersion and minVersion must set according to cf's conf, not user's scan parameter.
+   *
+   * @param columns           columns specified user in query
+   * @param minVersion        The minimum number of versions to keep(used when TTL is set).
+   * @param maxVersion        The maximum number of versions in CF's conf
+   * @param resultMaxVersions maximum versions to return per column, which may be different from
+   *                          maxVersion
+   * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL
+   */
+  public NewVersionBehaviorTracker(NavigableSet<byte[]> columns, int minVersion, int maxVersion,
+      int resultMaxVersions, long oldestUnexpiredTS) {
+    this.maxVersions = maxVersion;
+    this.minVersions = minVersion;
+    this.resultMaxVersions = resultMaxVersions;
+    this.oldestStamp = oldestUnexpiredTS;
+    if (columns != null && columns.size() > 0) {
+      this.columns = new byte[columns.size()][];
+      int i = 0;
+      for (byte[] column : columns) {
+        this.columns[i++] = column;
+      }
+    }
+    reset();
+  }
+
+  @Override
+  public void beforeShipped() throws IOException {
+    // Do nothing
+  }
+
+  /**
+   * A data structure which contains infos we need that happens before this node's mvcc and
+   * after the previous node's mvcc. A node means there is a version deletion at the mvcc and ts.
+   */
+  protected class DeleteVersionsNode {
+    public long ts;
+    public long mvcc;
+
+    // <timestamp, set<mvcc>>
+    // Key is ts of version deletes, value is its mvccs.
+    // We may delete more than one time for a version.
+    private Map<Long, SortedSet<Long>> deletesMap = new HashMap<>();
+
+    // <mvcc, set<mvcc>>
+    // Key is mvcc of version deletes, value is mvcc of visible puts before the delete effect.
+    private NavigableMap<Long, SortedSet<Long>> mvccCountingMap = new TreeMap<>();
+
+    protected DeleteVersionsNode(long ts, long mvcc) {
+      this.ts = ts;
+      this.mvcc = mvcc;
+      mvccCountingMap.put(Long.MAX_VALUE, new TreeSet<Long>());
+    }
+
+    protected DeleteVersionsNode() {
+      this(Long.MIN_VALUE, Long.MAX_VALUE);
+    }
+
+    public void addVersionDelete(Cell cell) {
+      SortedSet<Long> set = deletesMap.get(cell.getTimestamp());
+      if (set == null) {
+        set = new TreeSet<>();
+        deletesMap.put(cell.getTimestamp(), set);
+      }
+      set.add(cell.getSequenceId());
+      // The init set should be the puts whose mvcc is smaller than this Delete. Because
+      // there may be some Puts masked by them. The Puts whose mvcc is larger than this Delete can
+      // not be copied to this node because we may delete one version and the oldest put may not be
+      // masked.
+      SortedSet<Long> nextValue = mvccCountingMap.ceilingEntry(cell.getSequenceId()).getValue();
+      SortedSet<Long> thisValue = new TreeSet<>(nextValue.headSet(cell.getSequenceId()));
+      mvccCountingMap.put(cell.getSequenceId(), thisValue);
+    }
+
+    protected DeleteVersionsNode getDeepCopy() {
+      DeleteVersionsNode node = new DeleteVersionsNode(ts, mvcc);
+      for (Map.Entry<Long, SortedSet<Long>> e : deletesMap.entrySet()) {
+        node.deletesMap.put(e.getKey(), new TreeSet<>(e.getValue()));
+      }
+      for (Map.Entry<Long, SortedSet<Long>> e : mvccCountingMap.entrySet()) {
+        node.mvccCountingMap.put(e.getKey(), new TreeSet<>(e.getValue()));
+      }
+      return node;
+    }
+  }
+
+  /**
+   * Reset the map if it is different with the last Cell.
+   * Save the cq array/offset/length for next Cell.
+   *
+   * @return If this put has duplicate ts with last cell, return the mvcc of last cell.
+   * Else return MAX_VALUE.
+   */
+  protected long prepare(Cell cell) {
+    boolean matchCq = CellUtil.matchingQualifier(cell, lastCqArray, lastCqOffset, lastCqLength);
+    if (!matchCq) {
+      // The last cell is family-level delete and this is not, or the cq is changed,
+      // we should construct delColMap as a deep copy of delFamMap.
+      delColMap.clear();
+      for (Map.Entry<Long, DeleteVersionsNode> e : delFamMap.entrySet()) {
+        delColMap.put(e.getKey(), e.getValue().getDeepCopy());
+      }
+      countCurrentCol = 0;
+    }
+    if (matchCq && !CellUtil.isDelete(lastCqType) && lastCqType == cell.getTypeByte()
+        && lastCqTs == cell.getTimestamp()) {
+      // Put with duplicate timestamp, ignore.
+      return lastCqMvcc;
+    }
+    lastCqArray = cell.getQualifierArray();
+    lastCqOffset = cell.getQualifierOffset();
+    lastCqLength = cell.getQualifierLength();
+    lastCqTs = cell.getTimestamp();
+    lastCqMvcc = cell.getSequenceId();
+    lastCqType = cell.getTypeByte();
+    return Long.MAX_VALUE;
+  }
+
+  // DeleteTracker
+  @Override
+  public void add(Cell cell) {
+    prepare(cell);
+    byte type = cell.getTypeByte();
+    switch (Type.codeToType(type)) {
+    // By the order of seen. We put null cq at first.
+    case DeleteFamily: // Delete all versions of all columns of the specified family
+      delFamMap.put(cell.getSequenceId(),
+          new DeleteVersionsNode(cell.getTimestamp(), cell.getSequenceId()));
+      break;
+    case DeleteFamilyVersion: // Delete all columns of the specified family and specified version
+      delFamMap.ceilingEntry(cell.getSequenceId()).getValue().addVersionDelete(cell);
+      break;
+
+    // These two kinds of markers are mix with Puts.
+    case DeleteColumn: // Delete all versions of the specified column
+      delColMap.put(cell.getSequenceId(),
+          new DeleteVersionsNode(cell.getTimestamp(), cell.getSequenceId()));
+      break;
+    case Delete: // Delete the specified version of the specified column.
+      delColMap.ceilingEntry(cell.getSequenceId()).getValue().addVersionDelete(cell);
+      break;
+    default:
+      throw new AssertionError("Unknown delete marker type for " + cell);
+    }
+  }
+
+  /**
+   * This method is not idempotent, we will save some info to judge VERSION_MASKED.
+   * @param cell - current cell to check if deleted by a previously seen delete
+   * @return We don't distinguish DeleteColumn and DeleteFamily. We only return code for column.
+   */
+  @Override
+  public DeleteResult isDeleted(Cell cell) {
+    long duplicateMvcc = prepare(cell);
+
+    for (Map.Entry<Long, DeleteVersionsNode> e : delColMap.tailMap(cell.getSequenceId())
+        .entrySet()) {
+      DeleteVersionsNode node = e.getValue();
+      long deleteMvcc = Long.MAX_VALUE;
+      SortedSet<Long> deleteVersionMvccs = node.deletesMap.get(cell.getTimestamp());
+      if (deleteVersionMvccs != null) {
+        SortedSet<Long> tail = deleteVersionMvccs.tailSet(cell.getSequenceId());
+        if (!tail.isEmpty()) {
+          deleteMvcc = tail.first();
+        }
+      }
+      SortedMap<Long, SortedSet<Long>> subMap =
+          node.mvccCountingMap
+              .subMap(cell.getSequenceId(), true, Math.min(duplicateMvcc, deleteMvcc), true);
+      for (Map.Entry<Long, SortedSet<Long>> seg : subMap.entrySet()) {
+        if (seg.getValue().size() >= maxVersions) {
+          return DeleteResult.VERSION_MASKED;
+        }
+        seg.getValue().add(cell.getSequenceId());
+      }
+      if (deleteMvcc < Long.MAX_VALUE) {
+        return DeleteResult.VERSION_DELETED;
+      }
+
+      if (cell.getTimestamp() <= node.ts) {
+        return DeleteResult.COLUMN_DELETED;
+      }
+    }
+    if (duplicateMvcc < Long.MAX_VALUE) {
+      return DeleteResult.VERSION_MASKED;
+    }
+    return DeleteResult.NOT_DELETED;
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return delColMap.size() == 1 && delColMap.get(Long.MAX_VALUE).mvccCountingMap.size() == 1
+        && delFamMap.size() == 1 && delFamMap.get(Long.MAX_VALUE).mvccCountingMap.size() == 1;
+  }
+
+  @Override
+  public void update() {
+    // ignore
+  }
+
+  //ColumnTracker
+
+  @Override
+  public MatchCode checkColumn(Cell cell, byte type) throws IOException {
+    if (done()) {
+      // No more columns left, we are done with this query
+      return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row
+    }
+    if (columns != null) {
+      while (columnIndex < columns.length) {
+        int c = Bytes.compareTo(columns[columnIndex], 0, columns[columnIndex].length,
+            cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+        if (c < 0) {
+          columnIndex++;
+        } else if (c == 0) {
+          // We drop old version in #isDeleted, so here we must return INCLUDE.
+          return MatchCode.INCLUDE;
+        } else {
+          return MatchCode.SEEK_NEXT_COL;
+        }
+      }
+      return MatchCode.SEEK_NEXT_ROW;
+    }
+    return MatchCode.INCLUDE;
+  }
+
+  @Override
+  public MatchCode checkVersions(Cell cell, long timestamp, byte type,
+      boolean ignoreCount) throws IOException {
+    assert !CellUtil.isDelete(type);
+    // We drop old version in #isDeleted, so here we won't SKIP because of versioning. But we should
+    // consider TTL.
+    if (ignoreCount) {
+      return MatchCode.INCLUDE;
+    }
+    countCurrentCol++;
+    if (timestamp < this.oldestStamp) {
+      if (countCurrentCol == minVersions) {
+        return MatchCode.INCLUDE_AND_SEEK_NEXT_COL;
+      }
+      if (countCurrentCol > minVersions) {
+        // This may not be reached, only for safety.
+        return MatchCode.SEEK_NEXT_COL;
+      }
+    }
+
+    if (countCurrentCol == resultMaxVersions) {
+      // We have enough number of versions for user's requirement.
+      return MatchCode.INCLUDE_AND_SEEK_NEXT_COL;
+    }
+    if (countCurrentCol > resultMaxVersions) {
+      // This may not be reached, only for safety
+      return MatchCode.SEEK_NEXT_COL;
+    }
+    return MatchCode.INCLUDE;
+  }
+
+  @Override
+  public void reset() {
+    delColMap.clear();
+    delFamMap.clear();
+    lastCqArray = null;
+    lastCqLength = 0;
+    lastCqOffset = 0;
+    lastCqTs = Long.MIN_VALUE;
+    lastCqMvcc = 0;
+    lastCqType = 0;
+    columnIndex = 0;
+    countCurrentCol = 0;
+    resetInternal();
+  }
+
+  protected void resetInternal(){
+    delFamMap.put(Long.MAX_VALUE, new DeleteVersionsNode());
+  }
+
+  @Override
+  public boolean done() {
+    // lastCq* have been updated to this cell.
+    return !(columns == null || lastCqArray == null) && Bytes
+        .compareTo(lastCqArray, lastCqOffset, lastCqLength, columns[columnIndex], 0,
+            columns[columnIndex].length) > 0;
+  }
+
+  @Override
+  public ColumnCount getColumnHint() {
+    if (columns != null) {
+      if (columnIndex < columns.length) {
+        return new ColumnCount(columns[columnIndex]);
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public MatchCode getNextRowOrNextColumn(Cell cell) {
+    // TODO maybe we can optimize.
+    return MatchCode.SEEK_NEXT_COL;
+  }
+
+  @Override
+  public boolean isDone(long timestamp) {
+    // We can not skip Cells with small ts.
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NormalUserScanQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NormalUserScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NormalUserScanQueryMatcher.java
index d5fda54..b168034 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NormalUserScanQueryMatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NormalUserScanQueryMatcher.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeepDeletedCells;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.ScanInfo;
 
 /**
@@ -40,7 +39,7 @@ public abstract class NormalUserScanQueryMatcher extends UserScanQueryMatcher {
   private final boolean get;
 
   /** whether time range queries can see rows "behind" a delete */
-  private final boolean seePastDeleteMarkers;
+  protected final boolean seePastDeleteMarkers;
 
   protected NormalUserScanQueryMatcher(Scan scan, ScanInfo scanInfo, ColumnTracker columns,
       boolean hasNullColumn, DeleteTracker deletes, long oldestUnexpiredTS, long now) {
@@ -93,9 +92,8 @@ public abstract class NormalUserScanQueryMatcher extends UserScanQueryMatcher {
   }
 
   public static NormalUserScanQueryMatcher create(Scan scan, ScanInfo scanInfo,
-      ColumnTracker columns, boolean hasNullColumn, long oldestUnexpiredTS, long now,
-      RegionCoprocessorHost regionCoprocessorHost) throws IOException {
-    DeleteTracker deletes = instantiateDeleteTracker(regionCoprocessorHost);
+      ColumnTracker columns, DeleteTracker deletes, boolean hasNullColumn, long oldestUnexpiredTS,
+      long now) throws IOException {
     if (scan.isReversed()) {
       if (scan.includeStopRow()) {
         return new NormalUserScanQueryMatcher(scan, scanInfo, columns, hasNullColumn, deletes,

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java
index e508a9a..8bdab08 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.querymatcher;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.NavigableSet;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
@@ -31,13 +32,16 @@ import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.TagUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.ScanInfo;
 import org.apache.hadoop.hbase.regionserver.ShipperListener;
 import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker.DeleteResult;
+import org.apache.hadoop.hbase.security.visibility.VisibilityNewVersionBehaivorTracker;
+import org.apache.hadoop.hbase.security.visibility.VisibilityScanDeleteTracker;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * A query matcher that is specifically designed for the scan case.
@@ -198,16 +202,21 @@ public abstract class ScanQueryMatcher implements ShipperListener {
   }
 
   protected final MatchCode checkDeleted(DeleteTracker deletes, Cell cell) {
-    if (deletes.isEmpty()) {
+    if (deletes.isEmpty() && !(deletes instanceof NewVersionBehaviorTracker)) {
       return null;
     }
+    // MvccSensitiveTracker always need check all cells to save some infos.
     DeleteResult deleteResult = deletes.isDeleted(cell);
     switch (deleteResult) {
       case FAMILY_DELETED:
       case COLUMN_DELETED:
-        return columns.getNextRowOrNextColumn(cell);
+        if (!(deletes instanceof NewVersionBehaviorTracker)) {
+          // MvccSensitive can not seek to next because the Put with lower ts may have higher mvcc
+          return columns.getNextRowOrNextColumn(cell);
+        }
       case VERSION_DELETED:
       case FAMILY_VERSION_DELETED:
+      case VERSION_MASKED:
         return MatchCode.SKIP;
       case NOT_DELETED:
         return null;
@@ -216,6 +225,7 @@ public abstract class ScanQueryMatcher implements ShipperListener {
     }
   }
 
+
   /**
    * Determines if the caller should do one of several things:
    * <ul>
@@ -341,13 +351,44 @@ public abstract class ScanQueryMatcher implements ShipperListener {
     return CellUtil.createFirstDeleteFamilyCellOnRow(startRow, scanInfo.getFamily());
   }
 
-  protected static DeleteTracker instantiateDeleteTracker(RegionCoprocessorHost host)
+  protected static Pair<DeleteTracker, ColumnTracker> getTrackers(RegionCoprocessorHost host,
+      NavigableSet<byte[]> columns, ScanInfo scanInfo, long oldestUnexpiredTS, Scan userScan)
       throws IOException {
-    DeleteTracker tracker = new ScanDeleteTracker();
+    int resultMaxVersion = scanInfo.getMaxVersions();
+    if (userScan != null) {
+      if (userScan.isRaw()) {
+        resultMaxVersion = userScan.getMaxVersions();
+      } else {
+        resultMaxVersion = Math.min(userScan.getMaxVersions(), scanInfo.getMaxVersions());
+      }
+    }
+    DeleteTracker deleteTracker;
+    if (scanInfo.isNewVersionBehavior() && (userScan == null || !userScan.isRaw())) {
+      deleteTracker = new NewVersionBehaviorTracker(columns, scanInfo.getMinVersions(),
+          scanInfo.getMaxVersions(), resultMaxVersion, oldestUnexpiredTS);
+    } else {
+      deleteTracker = new ScanDeleteTracker();
+    }
     if (host != null) {
-      tracker = host.postInstantiateDeleteTracker(tracker);
+      deleteTracker = host.postInstantiateDeleteTracker(deleteTracker);
+      if (deleteTracker instanceof VisibilityScanDeleteTracker && scanInfo.isNewVersionBehavior()) {
+        deleteTracker = new VisibilityNewVersionBehaivorTracker(columns, scanInfo.getMinVersions(),
+            scanInfo.getMaxVersions(), resultMaxVersion, oldestUnexpiredTS);
+      }
+    }
+
+    ColumnTracker columnTracker;
+
+    if (deleteTracker instanceof NewVersionBehaviorTracker) {
+      columnTracker = (NewVersionBehaviorTracker) deleteTracker;
+    } else if (columns == null || columns.size() == 0) {
+      columnTracker = new ScanWildcardColumnTracker(scanInfo.getMinVersions(), resultMaxVersion,
+          oldestUnexpiredTS);
+    } else {
+      columnTracker = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(),
+          resultMaxVersion, oldestUnexpiredTS);
     }
-    return tracker;
+    return new Pair<>(deleteTracker, columnTracker);
   }
 
   // Used only for testing purposes

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/StripeCompactionScanQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/StripeCompactionScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/StripeCompactionScanQueryMatcher.java
index 1ba08f7..152fb9d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/StripeCompactionScanQueryMatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/StripeCompactionScanQueryMatcher.java
@@ -41,9 +41,9 @@ public class StripeCompactionScanQueryMatcher extends DropDeletesCompactionScanQ
   private DropDeletesInOutput dropDeletesInOutput = DropDeletesInOutput.BEFORE;
 
   public StripeCompactionScanQueryMatcher(ScanInfo scanInfo, DeleteTracker deletes,
-      long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now,
-      byte[] dropDeletesFromRow, byte[] dropDeletesToRow) {
-    super(scanInfo, deletes, readPointToUse, earliestPutTs, oldestUnexpiredTS, now);
+      ColumnTracker columns, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS,
+      long now, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) {
+    super(scanInfo, deletes, columns, readPointToUse, earliestPutTs, oldestUnexpiredTS, now);
     this.dropDeletesFromRow = dropDeletesFromRow;
     this.dropDeletesToRow = dropDeletesToRow;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java
index 95563b5..250a4a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * Query matcher for user scan.
@@ -184,30 +185,18 @@ public abstract class UserScanQueryMatcher extends ScanQueryMatcher {
   public static UserScanQueryMatcher create(Scan scan, ScanInfo scanInfo,
       NavigableSet<byte[]> columns, long oldestUnexpiredTS, long now,
       RegionCoprocessorHost regionCoprocessorHost) throws IOException {
-    int maxVersions = scan.isRaw() ? scan.getMaxVersions()
-        : Math.min(scan.getMaxVersions(), scanInfo.getMaxVersions());
-    boolean hasNullColumn;
-    ColumnTracker columnTracker;
-    if (columns == null || columns.isEmpty()) {
-      // there is always a null column in the wildcard column query.
-      hasNullColumn = true;
-      // use a specialized scan for wildcard column tracker.
-      columnTracker = new ScanWildcardColumnTracker(scanInfo.getMinVersions(), maxVersions,
-          oldestUnexpiredTS);
-    } else {
-      // We can share the ExplicitColumnTracker, diff is we reset
-      // between rows, not between storefiles.
-      // whether there is null column in the explicit column query
-      hasNullColumn = columns.first().length == 0;
-      columnTracker = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions,
-          oldestUnexpiredTS);
-    }
+    boolean hasNullColumn =
+        !(columns != null && columns.size() != 0 && columns.first().length != 0);
+    Pair<DeleteTracker, ColumnTracker> trackers = getTrackers(regionCoprocessorHost, columns,
+        scanInfo, oldestUnexpiredTS, scan);
+    DeleteTracker deleteTracker = trackers.getFirst();
+    ColumnTracker columnTracker = trackers.getSecond();
     if (scan.isRaw()) {
       return RawScanQueryMatcher.create(scan, scanInfo, columnTracker, hasNullColumn,
         oldestUnexpiredTS, now);
     } else {
-      return NormalUserScanQueryMatcher.create(scan, scanInfo, columnTracker, hasNullColumn,
-        oldestUnexpiredTS, now, regionCoprocessorHost);
+      return NormalUserScanQueryMatcher.create(scan, scanInfo, columnTracker, deleteTracker,
+          hasNullColumn, oldestUnexpiredTS, now);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityNewVersionBehaivorTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityNewVersionBehaivorTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityNewVersionBehaivorTracker.java
new file mode 100644
index 0000000..d3fcb6e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityNewVersionBehaivorTracker.java
@@ -0,0 +1,202 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.visibility;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.regionserver.querymatcher.NewVersionBehaviorTracker;
+
+/**
+ * Similar to MvccSensitiveTracker but tracks the visibility expression also before
+ * deciding if a Cell can be considered deleted
+ */
+public class VisibilityNewVersionBehaivorTracker extends NewVersionBehaviorTracker {
+
+  private static final Log LOG = LogFactory.getLog(VisibilityNewVersionBehaivorTracker.class);
+
+  public VisibilityNewVersionBehaivorTracker(NavigableSet<byte[]> columns, int minVersion,
+      int maxVersion,
+      int resultMaxVersions, long oldestUnexpiredTS) {
+    super(columns, minVersion, maxVersion, resultMaxVersions, oldestUnexpiredTS);
+  }
+
+  private static class TagInfo {
+    List<Tag> tags;
+    Byte format;
+
+    private TagInfo(Cell c) {
+      tags = new ArrayList<>();
+      format = VisibilityUtils.extractVisibilityTags(c, tags);
+    }
+
+    private TagInfo() {
+      tags = new ArrayList<>();
+    }
+  }
+
+  private class VisibilityDeleteVersionsNode extends DeleteVersionsNode {
+    private TagInfo tagInfo;
+
+    // <timestamp, set<mvcc>>
+    // Key is ts of version deletes, value is its mvccs.
+    // We may delete more than one time for a version.
+    private Map<Long, SortedMap<Long, TagInfo>> deletesMap = new HashMap<>();
+
+    // <mvcc, set<mvcc>>
+    // Key is mvcc of version deletes, value is mvcc of visible puts before the delete effect.
+    private NavigableMap<Long, SortedSet<Long>> mvccCountingMap = new TreeMap<>();
+
+    protected VisibilityDeleteVersionsNode(long ts, long mvcc, TagInfo tagInfo) {
+      this.tagInfo = tagInfo;
+      this.ts = ts;
+      this.mvcc = mvcc;
+      mvccCountingMap.put(Long.MAX_VALUE, new TreeSet<Long>());
+    }
+
+    protected VisibilityDeleteVersionsNode getDeepCopy() {
+      VisibilityDeleteVersionsNode node = new VisibilityDeleteVersionsNode(ts, mvcc, tagInfo);
+      for (Map.Entry<Long, SortedMap<Long, TagInfo>> e : deletesMap.entrySet()) {
+        node.deletesMap.put(e.getKey(), new TreeMap<>(e.getValue()));
+      }
+      for (Map.Entry<Long, SortedSet<Long>> e : mvccCountingMap.entrySet()) {
+        node.mvccCountingMap.put(e.getKey(), new TreeSet<>(e.getValue()));
+      }
+      return node;
+    }
+
+    public void addVersionDelete(Cell cell) {
+      SortedMap<Long, TagInfo> set = deletesMap.get(cell.getTimestamp());
+      if (set == null) {
+        set = new TreeMap<>();
+        deletesMap.put(cell.getTimestamp(), set);
+      }
+      set.put(cell.getSequenceId(), new TagInfo(cell));
+      // The init set should be the puts whose mvcc is smaller than this Delete. Because
+      // there may be some Puts masked by them. The Puts whose mvcc is larger than this Delete can
+      // not be copied to this node because we may delete one version and the oldest put may not be
+      // masked.
+      SortedSet<Long> nextValue = mvccCountingMap.ceilingEntry(cell.getSequenceId()).getValue();
+      SortedSet<Long> thisValue = new TreeSet<>(nextValue.headSet(cell.getSequenceId()));
+      mvccCountingMap.put(cell.getSequenceId(), thisValue);
+    }
+
+  }
+
+  @Override
+  public void add(Cell cell) {
+    prepare(cell);
+    byte type = cell.getTypeByte();
+    switch (KeyValue.Type.codeToType(type)) {
+    // By the order of seen. We put null cq at first.
+    case DeleteFamily: // Delete all versions of all columns of the specified family
+      delFamMap.put(cell.getSequenceId(),
+          new VisibilityDeleteVersionsNode(cell.getTimestamp(), cell.getSequenceId(),
+              new TagInfo(cell)));
+      break;
+    case DeleteFamilyVersion: // Delete all columns of the specified family and specified version
+      delFamMap.ceilingEntry(cell.getSequenceId()).getValue().addVersionDelete(cell);
+      break;
+
+    // These two kinds of markers are mix with Puts.
+    case DeleteColumn: // Delete all versions of the specified column
+      delColMap.put(cell.getSequenceId(),
+          new VisibilityDeleteVersionsNode(cell.getTimestamp(), cell.getSequenceId(),
+              new TagInfo(cell)));
+      break;
+    case Delete: // Delete the specified version of the specified column.
+      delColMap.ceilingEntry(cell.getSequenceId()).getValue().addVersionDelete(cell);
+      break;
+    default:
+      throw new AssertionError("Unknown delete marker type for " + cell);
+    }
+  }
+
+  private boolean tagMatched(Cell put, TagInfo delInfo) throws IOException {
+    List<Tag> putVisTags = new ArrayList<>();
+    Byte putCellVisTagsFormat = VisibilityUtils.extractVisibilityTags(put, putVisTags);
+    return putVisTags.isEmpty() == delInfo.tags.isEmpty() && (
+        putVisTags.isEmpty() && delInfo.tags.isEmpty() || VisibilityLabelServiceManager
+            .getInstance().getVisibilityLabelService()
+            .matchVisibility(putVisTags, putCellVisTagsFormat, delInfo.tags, delInfo.format));
+  }
+
+  @Override
+  public DeleteResult isDeleted(Cell cell) {
+    try {
+      long duplicateMvcc = prepare(cell);
+
+      for (Map.Entry<Long, DeleteVersionsNode> e : delColMap.tailMap(cell.getSequenceId())
+          .entrySet()) {
+        VisibilityDeleteVersionsNode node = (VisibilityDeleteVersionsNode) e.getValue();
+        long deleteMvcc = Long.MAX_VALUE;
+        SortedMap<Long, TagInfo> deleteVersionMvccs = node.deletesMap.get(cell.getTimestamp());
+        if (deleteVersionMvccs != null) {
+          SortedMap<Long, TagInfo> tail = deleteVersionMvccs.tailMap(cell.getSequenceId());
+          for (Map.Entry<Long, TagInfo> entry : tail.entrySet()) {
+            if (tagMatched(cell, entry.getValue())) {
+              deleteMvcc = tail.firstKey();
+              break;
+            }
+          }
+        }
+        SortedMap<Long, SortedSet<Long>> subMap = node.mvccCountingMap
+            .subMap(cell.getSequenceId(), true, Math.min(duplicateMvcc, deleteMvcc), true);
+        for (Map.Entry<Long, SortedSet<Long>> seg : subMap.entrySet()) {
+          if (seg.getValue().size() >= maxVersions) {
+            return DeleteResult.VERSION_MASKED;
+          }
+          seg.getValue().add(cell.getSequenceId());
+        }
+        if (deleteMvcc < Long.MAX_VALUE) {
+          return DeleteResult.VERSION_DELETED;
+        }
+
+        if (cell.getTimestamp() <= node.ts && tagMatched(cell, node.tagInfo)) {
+          return DeleteResult.COLUMN_DELETED;
+        }
+      }
+      if (duplicateMvcc < Long.MAX_VALUE) {
+        return DeleteResult.VERSION_MASKED;
+      }
+    } catch (IOException e) {
+      LOG.error("Error in isDeleted() check! Will treat cell as not deleted", e);
+    }
+    return DeleteResult.NOT_DELETED;
+  }
+
+  protected void resetInternal() {
+    delFamMap.put(Long.MAX_VALUE,
+        new VisibilityDeleteVersionsNode(Long.MIN_VALUE, Long.MAX_VALUE, new TagInfo()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
index cabbdb4..af0956a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
@@ -1064,7 +1064,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
     }
 
     void updateValueSize(final Result r) throws IOException {
-      if (r == null || !isRandomValueSize()) return;
+      if (r == null ) return;
       int size = 0;
       for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
         size += scanner.current().getValueLength();
@@ -1073,7 +1073,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
     }
 
     void updateValueSize(final int valueSize) {
-      if (!isRandomValueSize()) return;
       this.valueSizeHistogram.update(valueSize);
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index b0eadb5..8118e41 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -193,7 +193,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     Configuration conf = HBaseConfiguration.create();
     for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
       ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE,
-          KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator());
+          KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false);
       ScanType scanType = ScanType.USER_SCAN;
       InternalScanner scanner = new StoreScanner(new Scan(
           Bytes.toBytes(startRowId)), scanInfo, scanType, null,

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
index 747fd54..be078f2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
@@ -214,7 +214,7 @@ public class TestCompaction {
         ScanInfo old = store.getScanInfo();
         ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(), old.getMinVersions(),
             old.getMaxVersions(), ttl, old.getKeepDeletedCells(), HConstants.DEFAULT_BLOCKSIZE, 0,
-            old.getComparator());
+            old.getComparator(), old.isNewVersionBehavior());
         store.setScanInfo(si);
       }
       Thread.sleep(ttl);

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
index 584285b..059b850 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
@@ -161,7 +161,7 @@ public class TestDefaultCompactSelection extends TestCompactionPolicy {
     ScanInfo newScanInfo = new ScanInfo(oldScanInfo.getConfiguration(), oldScanInfo.getFamily(),
         oldScanInfo.getMinVersions(), oldScanInfo.getMaxVersions(), 600,
         oldScanInfo.getKeepDeletedCells(), oldScanInfo.getPreadMaxBytes(),
-        oldScanInfo.getTimeToPurgeDeletes(), oldScanInfo.getComparator());
+        oldScanInfo.getTimeToPurgeDeletes(), oldScanInfo.getComparator(), oldScanInfo.isNewVersionBehavior());
     store.setScanInfo(newScanInfo);
     // Do not compact empty store file
     List<StoreFile> candidates = sfCreate(0);

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index 26c894b..0b1638b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -163,7 +163,7 @@ public class TestDefaultMemStore {
     List<Cell> result = new ArrayList<>();
     Configuration conf = HBaseConfiguration.create();
     ScanInfo scanInfo = new ScanInfo(conf, null, 0, 1, HConstants.LATEST_TIMESTAMP,
-        KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator());
+        KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false);
     ScanType scanType = ScanType.USER_SCAN;
     StoreScanner s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
     int count = 0;
@@ -601,7 +601,7 @@ public class TestDefaultMemStore {
     Configuration conf = HBaseConfiguration.create();
     for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
       ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE,
-          KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator());
+          KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false);
       ScanType scanType = ScanType.USER_SCAN;
       try (InternalScanner scanner = new StoreScanner(new Scan(
           Bytes.toBytes(startRowId)), scanInfo, scanType, null,

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
index 0b35f95..f45c76c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
@@ -294,7 +294,7 @@ public class TestMajorCompaction {
       ScanInfo old = store.getScanInfo();
       ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(), old.getMinVersions(),
           old.getMaxVersions(), ttl, old.getKeepDeletedCells(), old.getPreadMaxBytes(), 0,
-          old.getComparator());
+          old.getComparator(), old.isNewVersionBehavior());
       store.setScanInfo(si);
     }
     Thread.sleep(1000);

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ac4152b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestNewVersionBehaviorFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestNewVersionBehaviorFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestNewVersionBehaviorFromClientSide.java
new file mode 100644
index 0000000..ebd2f01
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestNewVersionBehaviorFromClientSide.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestNewVersionBehaviorFromClientSide {
+
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static final byte[] ROW = Bytes.toBytes("r1");
+  private static final byte[] ROW2 = Bytes.toBytes("r2");
+  private static final byte[] FAMILY = Bytes.toBytes("f");
+  private static final byte[] value = Bytes.toBytes("value");
+  private static final byte[] col1 = Bytes.toBytes("col1");
+  private static final byte[] col2 = Bytes.toBytes("col2");
+  private static final byte[] col3 = Bytes.toBytes("col3");
+
+  @Rule
+  public TestName name = new TestName();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void setDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  private Table createTable() throws IOException {
+    TableName tableName = TableName.valueOf(name.getMethodName());
+    HTableDescriptor table = new HTableDescriptor(tableName);
+    HColumnDescriptor fam = new HColumnDescriptor(FAMILY);
+    fam.setNewVersionBehavior(true);
+    fam.setMaxVersions(3);
+    table.addFamily(fam);
+    TEST_UTIL.getHBaseAdmin().createTable(table);
+    return TEST_UTIL.getConnection().getTable(tableName);
+  }
+
+  @Test
+  public void testPutAndDeleteVersions() throws IOException {
+    try (Table t = createTable()) {
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000001, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000002, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000003, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000004, value));
+      t.delete(new Delete(ROW).addColumns(FAMILY, col1, 2000000));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000000, value));
+      TEST_UTIL.getAdmin().flush(t.getName());
+      Result r = t.get(new Get(ROW).setMaxVersions(3));
+      assertEquals(1, r.size());
+      assertEquals(1000000, r.rawCells()[0].getTimestamp());
+    }
+  }
+
+  @Test
+  public void testPutMasked() throws IOException {
+    try (Table t = createTable()) {
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000001, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000002, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000003, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000004, value));
+
+      t.delete(new Delete(ROW).addColumn(FAMILY, col1, 1000003));
+
+      Result r = t.get(new Get(ROW).setMaxVersions(3));
+      assertEquals(2, r.size());
+      assertEquals(1000004, r.rawCells()[0].getTimestamp());
+      assertEquals(1000002, r.rawCells()[1].getTimestamp());
+      TEST_UTIL.getAdmin().flush(t.getName());
+      r = t.get(new Get(ROW).setMaxVersions(3));
+      assertEquals(2, r.size());
+      assertEquals(1000004, r.rawCells()[0].getTimestamp());
+      assertEquals(1000002, r.rawCells()[1].getTimestamp());
+    }
+  }
+
+  @Test
+  public void testPutMasked2() throws IOException {
+    try (Table t = createTable()) {
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000001, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000002, value));
+      t.delete(new Delete(ROW).addColumn(FAMILY, col1, 1000003));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000003, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000004, value));
+
+      Result r = t.get(new Get(ROW).setMaxVersions(3));
+      assertEquals(3, r.size());
+      assertEquals(1000004, r.rawCells()[0].getTimestamp());
+      assertEquals(1000003, r.rawCells()[1].getTimestamp());
+      assertEquals(1000002, r.rawCells()[2].getTimestamp());
+      TEST_UTIL.getAdmin().flush(t.getName());
+      r = t.get(new Get(ROW).setMaxVersions(3));
+      assertEquals(3, r.size());
+      assertEquals(1000004, r.rawCells()[0].getTimestamp());
+      assertEquals(1000003, r.rawCells()[1].getTimestamp());
+      assertEquals(1000002, r.rawCells()[2].getTimestamp());
+    }
+  }
+
+  @Test
+  public void testPutMaskedAndUserMaxVersion() throws IOException {
+    try (Table t = createTable()) {
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000001, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000002, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000003, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000004, value));
+
+      t.delete(new Delete(ROW).addColumn(FAMILY, col1, 1000004));
+      t.delete(new Delete(ROW).addColumn(FAMILY, col1, 1000003));
+
+      Result r = t.get(new Get(ROW).setMaxVersions(1));
+      assertEquals(1, r.size());
+      assertEquals(1000002, r.rawCells()[0].getTimestamp());
+      TEST_UTIL.getAdmin().flush(t.getName());
+      r = t.get(new Get(ROW).setMaxVersions(1));
+      assertEquals(1, r.size());
+      assertEquals(1000002, r.rawCells()[0].getTimestamp());
+    }
+  }
+
+  @Test
+  public void testSameTs() throws IOException {
+    try (Table t = createTable()) {
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000001, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000002, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000003, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000003, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000003, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000004, value));
+
+      Result r = t.get(new Get(ROW).setMaxVersions(3));
+      assertEquals(3, r.size());
+      assertEquals(1000004, r.rawCells()[0].getTimestamp());
+      assertEquals(1000003, r.rawCells()[1].getTimestamp());
+      assertEquals(1000002, r.rawCells()[2].getTimestamp());
+      TEST_UTIL.getAdmin().flush(t.getName());
+      r = t.get(new Get(ROW).setMaxVersions(3));
+      assertEquals(3, r.size());
+      assertEquals(1000004, r.rawCells()[0].getTimestamp());
+      assertEquals(1000003, r.rawCells()[1].getTimestamp());
+      assertEquals(1000002, r.rawCells()[2].getTimestamp());
+    }
+  }
+
+  @Test
+  public void testSameTsAndDelete() throws IOException {
+    try (Table t = createTable()) {
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000001, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000002, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000003, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000003, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000003, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000003, value));
+
+      t.delete(new Delete(ROW).addColumn(FAMILY, col1, 1000003));
+
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000004, value));
+
+      Result r = t.get(new Get(ROW).setMaxVersions(3));
+      assertEquals(3, r.size());
+      assertEquals(1000004, r.rawCells()[0].getTimestamp());
+      assertEquals(1000002, r.rawCells()[1].getTimestamp());
+      assertEquals(1000001, r.rawCells()[2].getTimestamp());
+      TEST_UTIL.getAdmin().flush(t.getName());
+      r = t.get(new Get(ROW).setMaxVersions(3));
+      assertEquals(3, r.size());
+      assertEquals(1000004, r.rawCells()[0].getTimestamp());
+      assertEquals(1000002, r.rawCells()[1].getTimestamp());
+      assertEquals(1000001, r.rawCells()[2].getTimestamp());
+    }
+  }
+
+  @Test
+  public void testDeleteFamily() throws IOException {
+    try (Table t = createTable()) {
+
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000001, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000002, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col2, 1000002, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col3, 1000001, value));
+
+      t.delete(new Delete(ROW).addFamily(FAMILY, 2000000));
+
+      t.put(new Put(ROW).addColumn(FAMILY, col3, 1500002, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col2, 1500001, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1500001, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1500002, value));
+      TEST_UTIL.getAdmin().flush(t.getName());
+      Result r = t.get(new Get(ROW).setMaxVersions(3));
+      assertEquals(4, r.size());
+      assertEquals(1500002, r.rawCells()[0].getTimestamp());
+      assertEquals(1500001, r.rawCells()[1].getTimestamp());
+      assertEquals(1500001, r.rawCells()[2].getTimestamp());
+      assertEquals(1500002, r.rawCells()[3].getTimestamp());
+
+      t.delete(new Delete(ROW).addFamilyVersion(FAMILY, 1500001));
+
+      r = t.get(new Get(ROW).setMaxVersions(3));
+      assertEquals(2, r.size());
+      assertEquals(1500002, r.rawCells()[0].getTimestamp());
+      assertEquals(1500002, r.rawCells()[1].getTimestamp());
+
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000001, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000002, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col2, 1000002, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col3, 1000001, value));
+      TEST_UTIL.getAdmin().flush(t.getName());
+      r = t.get(new Get(ROW).setMaxVersions(3));
+      assertEquals(6, r.size());
+      assertEquals(1500002, r.rawCells()[0].getTimestamp());
+      assertEquals(1000002, r.rawCells()[1].getTimestamp());
+      assertEquals(1000001, r.rawCells()[2].getTimestamp());
+      assertEquals(1000002, r.rawCells()[3].getTimestamp());
+      assertEquals(1500002, r.rawCells()[4].getTimestamp());
+      assertEquals(1000001, r.rawCells()[5].getTimestamp());
+    }
+  }
+
+  @Test
+  public void testTimeRange() throws IOException {
+    try (Table t = createTable()) {
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000001, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000002, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000003, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000004, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000005, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000006, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000007, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000008, value));
+      Result r = t.get(new Get(ROW).setMaxVersions(3).setTimeRange(0, 1000005));
+      assertEquals(0, r.size());
+      TEST_UTIL.getAdmin().flush(t.getName());
+      r = t.get(new Get(ROW).setMaxVersions(3).setTimeRange(0, 1000005));
+      assertEquals(0, r.size());
+    }
+  }
+
+  @Test
+  public void testExplicitColum() throws IOException {
+    try (Table t = createTable()) {
+      t.put(new Put(ROW).addColumn(FAMILY, col1, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col2, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col2, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col2, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col2, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col3, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col3, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col3, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col3, value));
+      Result r = t.get(new Get(ROW).setMaxVersions(3).addColumn(FAMILY, col2));
+      assertEquals(3, r.size());
+      TEST_UTIL.getAdmin().flush(t.getName());
+      r = t.get(new Get(ROW).setMaxVersions(3).addColumn(FAMILY, col2));
+      assertEquals(3, r.size());
+      TEST_UTIL.getAdmin().flush(t.getName());
+    }
+  }
+
+  @Test
+  public void testgetColumnHint() throws IOException {
+    try (Table t = createTable()) {
+      t.setOperationTimeout(10000);
+      t.setRpcTimeout(10000);
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 100, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 101, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 102, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 103, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 104, value));
+      t.put(new Put(ROW2).addColumn(FAMILY, col1, 104, value));
+      TEST_UTIL.getAdmin().flush(t.getName());
+      t.delete(new Delete(ROW).addColumn(FAMILY, col1));
+    }
+  }
+
+  @Test
+  public void testRawScanAndMajorCompaction() throws IOException {
+    try (Table t = createTable()) {
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000001, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000002, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000003, value));
+      t.put(new Put(ROW).addColumn(FAMILY, col1, 1000004, value));
+
+      t.delete(new Delete(ROW).addColumn(FAMILY, col1, 1000004));
+      t.delete(new Delete(ROW).addColumn(FAMILY, col1, 1000003));
+
+      try (ResultScanner scannner = t.getScanner(new Scan().setRaw(true).setMaxVersions())) {
+        Result r = scannner.next();
+        assertNull(scannner.next());
+        assertEquals(6, r.size());
+      }
+      TEST_UTIL.getAdmin().flush(t.getName());
+      try (ResultScanner scannner = t.getScanner(new Scan().setRaw(true).setMaxVersions())) {
+        Result r = scannner.next();
+        assertNull(scannner.next());
+        assertEquals(6, r.size());
+      }
+      TEST_UTIL.getAdmin().majorCompact(t.getName());
+      Threads.sleep(5000);
+      try (ResultScanner scannner = t.getScanner(new Scan().setRaw(true).setMaxVersions())) {
+        Result r = scannner.next();
+        assertNull(scannner.next());
+        assertEquals(1, r.size());
+        assertEquals(1000002, r.rawCells()[0].getTimestamp());
+      }
+    }
+  }
+
+}