You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ay...@apache.org on 2023/02/04 14:04:46 UTC

[hive] branch master updated: HIVE-27000: Improve the modularity of the *ColumnStatsMerger classes (#3997). (Alessandro Solimando, reviewed by Ayush Saxena)

This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 35b151876fc HIVE-27000: Improve the modularity of the *ColumnStatsMerger classes (#3997). (Alessandro Solimando, reviewed by Ayush Saxena)
35b151876fc is described below

commit 35b151876fc87cc4a6b757b9172dad557e13904c
Author: Alessandro Solimando <al...@gmail.com>
AuthorDate: Sat Feb 4 15:04:35 2023 +0100

    HIVE-27000: Improve the modularity of the *ColumnStatsMerger classes (#3997). (Alessandro Solimando, reviewed by Ayush Saxena)
---
 .../aggr/DateColumnStatsAggregator.java            |  10 +-
 .../aggr/DecimalColumnStatsAggregator.java         |  10 +-
 .../aggr/DoubleColumnStatsAggregator.java          |  10 +-
 .../aggr/LongColumnStatsAggregator.java            |  10 +-
 .../aggr/TimestampColumnStatsAggregator.java       |  10 +-
 .../columnstats/merge/BinaryColumnStatsMerger.java |   9 +-
 .../merge/BooleanColumnStatsMerger.java            |   5 +-
 .../columnstats/merge/ColumnStatsMerger.java       |  62 ++++-
 .../merge/ColumnStatsMergerFactory.java            |   6 +-
 .../columnstats/merge/DateColumnStatsMerger.java   |  94 ++++---
 .../merge/DecimalColumnStatsMerger.java            |  95 ++++---
 .../columnstats/merge/DoubleColumnStatsMerger.java |  94 ++++---
 .../columnstats/merge/LongColumnStatsMerger.java   |  94 ++++---
 .../columnstats/merge/StringColumnStatsMerger.java |  36 ++-
 .../merge/TimestampColumnStatsMerger.java          |  94 ++++---
 .../hive/metastore/utils/MetaStoreServerUtils.java |   2 +-
 .../merge/BinaryColumnStatsMergerTest.java         |  64 +++++
 .../merge/BooleanColumnStatsMergerTest.java        |  64 +++++
 .../columnstats/merge/ColumnStatsMergerTest.java   | 119 ++++++++-
 .../merge/DateColumnStatsMergerTest.java           | 242 ++++++++++++++----
 .../merge/DecimalColumnStatsMergerTest.java        | 272 ++++++++++-----------
 .../merge/DoubleColumnStatsMergerTest.java         | 240 ++++++++++++++++++
 .../merge/LongColumnStatsMergerTest.java           | 240 ++++++++++++++++++
 .../merge/TimestampColumnStatsMergerTest.java      | 241 ++++++++++++++++++
 24 files changed, 1628 insertions(+), 495 deletions(-)

diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java
index 9318a05596c..211bd2e597f 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java
@@ -112,11 +112,13 @@ public class DateColumnStatsAggregator extends ColumnStatsAggregator implements
         if (aggregateData == null) {
           aggregateData = newData.deepCopy();
         } else {
-          merger.setLowValue(aggregateData, newData);
-          merger.setHighValue(aggregateData, newData);
+          aggregateData.setLowValue(merger.mergeLowValue(
+              merger.getLowValue(aggregateData), merger.getLowValue(newData)));
+          aggregateData.setHighValue(merger.mergeHighValue(
+              merger.getHighValue(aggregateData), merger.getHighValue(newData)));
 
-          aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
-          aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+          aggregateData.setNumNulls(merger.mergeNumNulls(aggregateData.getNumNulls(), newData.getNumNulls()));
+          aggregateData.setNumDVs(merger.mergeNumDVs(aggregateData.getNumDVs(), newData.getNumDVs()));
         }
       }
       if (areAllNDVEstimatorsMergeable && ndvEstimator != null) {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java
index 7d7e6251c64..0854ca41395 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java
@@ -115,11 +115,13 @@ public class DecimalColumnStatsAggregator extends ColumnStatsAggregator implemen
         if (aggregateData == null) {
           aggregateData = newData.deepCopy();
         } else {
-          merger.setLowValue(aggregateData, newData);
-          merger.setHighValue(aggregateData, newData);
+          aggregateData.setLowValue(merger.mergeLowValue(
+              merger.getLowValue(aggregateData), merger.getLowValue(newData)));
+          aggregateData.setHighValue(merger.mergeHighValue(
+              merger.getHighValue(aggregateData), merger.getHighValue(newData)));
 
-          aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
-          aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+          aggregateData.setNumNulls(merger.mergeNumNulls(aggregateData.getNumNulls(), newData.getNumNulls()));
+          aggregateData.setNumDVs(merger.mergeNumDVs(aggregateData.getNumDVs(), newData.getNumDVs()));
         }
       }
       if (areAllNDVEstimatorsMergeable && ndvEstimator != null) {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregator.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregator.java
index 2ce2c7281aa..878a6e18082 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregator.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregator.java
@@ -110,11 +110,13 @@ public class DoubleColumnStatsAggregator extends ColumnStatsAggregator implement
         if (aggregateData == null) {
           aggregateData = newData.deepCopy();
         } else {
-          merger.setLowValue(aggregateData, newData);
-          merger.setHighValue(aggregateData, newData);
+          aggregateData.setLowValue(merger.mergeLowValue(
+              merger.getLowValue(aggregateData), merger.getLowValue(newData)));
+          aggregateData.setHighValue(merger.mergeHighValue(
+              merger.getHighValue(aggregateData), merger.getHighValue(newData)));
 
-          aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
-          aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+          aggregateData.setNumNulls(merger.mergeNumNulls(aggregateData.getNumNulls(), newData.getNumNulls()));
+          aggregateData.setNumDVs(merger.mergeNumDVs(aggregateData.getNumDVs(), newData.getNumDVs()));
         }
       }
       if (areAllNDVEstimatorsMergeable && ndvEstimator != null) {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregator.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregator.java
index d2999054a53..87077cbf7e0 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregator.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregator.java
@@ -109,11 +109,13 @@ public class LongColumnStatsAggregator extends ColumnStatsAggregator implements
         if (aggregateData == null) {
           aggregateData = newData.deepCopy();
         } else {
-          merger.setLowValue(aggregateData, newData);
-          merger.setHighValue(aggregateData, newData);
+          aggregateData.setLowValue(merger.mergeLowValue(
+              merger.getLowValue(aggregateData), merger.getLowValue(newData)));
+          aggregateData.setHighValue(merger.mergeHighValue(
+              merger.getHighValue(aggregateData), merger.getHighValue(newData)));
 
-          aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
-          aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+          aggregateData.setNumNulls(merger.mergeNumNulls(aggregateData.getNumNulls(), newData.getNumNulls()));
+          aggregateData.setNumDVs(merger.mergeNumDVs(aggregateData.getNumDVs(), newData.getNumDVs()));
         }
       }
       if (areAllNDVEstimatorsMergeable && ndvEstimator != null) {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/TimestampColumnStatsAggregator.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/TimestampColumnStatsAggregator.java
index 5962792cb5b..5e8e51c8900 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/TimestampColumnStatsAggregator.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/TimestampColumnStatsAggregator.java
@@ -113,11 +113,13 @@ public class TimestampColumnStatsAggregator extends ColumnStatsAggregator implem
         if (aggregateData == null) {
           aggregateData = newData.deepCopy();
         } else {
-          merger.setLowValue(aggregateData, newData);
-          merger.setHighValue(aggregateData, newData);
+          aggregateData.setLowValue(merger.mergeLowValue(
+              merger.getLowValue(aggregateData), merger.getLowValue(newData)));
+          aggregateData.setHighValue(merger.mergeHighValue(
+              merger.getHighValue(aggregateData), merger.getHighValue(newData)));
 
-          aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
-          aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+          aggregateData.setNumNulls(merger.mergeNumNulls(aggregateData.getNumNulls(), newData.getNumNulls()));
+          aggregateData.setNumDVs(merger.mergeNumDVs(aggregateData.getNumDVs(), newData.getNumDVs()));
         }
       }
       if (areAllNDVEstimatorsMergeable && ndvEstimator != null) {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/BinaryColumnStatsMerger.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/BinaryColumnStatsMerger.java
index 1756db81e8e..0ab43a6dcc3 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/BinaryColumnStatsMerger.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/BinaryColumnStatsMerger.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BinaryColumnStatsMerger extends ColumnStatsMerger {
+public class BinaryColumnStatsMerger extends ColumnStatsMerger<byte []> {
 
   private static final Logger LOG = LoggerFactory.getLogger(BinaryColumnStatsMerger.class);
 
@@ -34,8 +34,9 @@ public class BinaryColumnStatsMerger extends ColumnStatsMerger {
 
     BinaryColumnStatsData aggregateData = aggregateColStats.getStatsData().getBinaryStats();
     BinaryColumnStatsData newData = newColStats.getStatsData().getBinaryStats();
-    aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen()));
-    aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen()));
-    aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+
+    aggregateData.setMaxColLen(mergeMaxColLen(aggregateData.getMaxColLen(), newData.getMaxColLen()));
+    aggregateData.setAvgColLen(mergeAvgColLen(aggregateData.getAvgColLen(), newData.getAvgColLen()));
+    aggregateData.setNumNulls(mergeNumNulls(aggregateData.getNumNulls(), newData.getNumNulls()));
   }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/BooleanColumnStatsMerger.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/BooleanColumnStatsMerger.java
index e33573c4840..86639ca2072 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/BooleanColumnStatsMerger.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/BooleanColumnStatsMerger.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BooleanColumnStatsMerger extends ColumnStatsMerger {
+public class BooleanColumnStatsMerger extends ColumnStatsMerger<Boolean> {
 
   private static final Logger LOG = LoggerFactory.getLogger(BooleanColumnStatsMerger.class);
 
@@ -34,8 +34,9 @@ public class BooleanColumnStatsMerger extends ColumnStatsMerger {
 
     BooleanColumnStatsData aggregateData = aggregateColStats.getStatsData().getBooleanStats();
     BooleanColumnStatsData newData = newColStats.getStatsData().getBooleanStats();
+
     aggregateData.setNumTrues(aggregateData.getNumTrues() + newData.getNumTrues());
     aggregateData.setNumFalses(aggregateData.getNumFalses() + newData.getNumFalses());
-    aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+    aggregateData.setNumNulls(mergeNumNulls(aggregateData.getNumNulls(), newData.getNumNulls()));
   }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/ColumnStatsMerger.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/ColumnStatsMerger.java
index 8d4da8a5ab3..218757c8e18 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/ColumnStatsMerger.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/ColumnStatsMerger.java
@@ -20,11 +20,15 @@
 package org.apache.hadoop.hive.metastore.columnstats.merge;
 
 import org.apache.hadoop.hive.common.histogram.KllHistogramEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class ColumnStatsMerger {
+import java.util.List;
+import java.util.stream.Collectors;
+
+public abstract class ColumnStatsMerger<T> {
 
   private static final Logger LOG = LoggerFactory.getLogger(ColumnStatsMerger.class);
 
@@ -46,4 +50,60 @@ public abstract class ColumnStatsMerger {
     }
     return oldEst;
   }
+
+  protected long mergeNumDistinctValueEstimator(String columnName, List<NumDistinctValueEstimator> estimators,
+      long oldNumDVs, long newNumDVs) {
+    if (estimators == null || estimators.size() != 2) {
+      throw new IllegalArgumentException("NDV estimators list must be set and contain exactly two elements, " +
+          "found " + (estimators == null ? "null" :
+          estimators.stream().map(NumDistinctValueEstimator::toString).collect(Collectors.joining(", "))));
+    }
+
+    NumDistinctValueEstimator oldEst = estimators.get(0);
+    NumDistinctValueEstimator newEst = estimators.get(1);
+    if (oldEst == null && newEst == null) {
+      return mergeNumDVs(oldNumDVs, newNumDVs);
+    }
+
+    if (oldEst == null) {
+      estimators.set(0, newEst);
+      return mergeNumDVs(oldNumDVs, newEst.estimateNumDistinctValues());
+    }
+
+    final long ndv;
+    if (oldEst.canMerge(newEst)) {
+      oldEst.mergeEstimators(newEst);
+      ndv = oldEst.estimateNumDistinctValues();
+      return ndv;
+    } else {
+      ndv = mergeNumDVs(oldNumDVs, newNumDVs);
+    }
+    LOG.debug("Use bitvector to merge column {}'s ndvs of {} and {} to be {}", columnName,
+        oldNumDVs, newNumDVs, ndv);
+    return ndv;
+  }
+
+  public T mergeLowValue(T oldValue, T newValue) {
+    throw new UnsupportedOperationException("This operation is not supported");
+  }
+
+  public T mergeHighValue(T oldValue, T newValue) {
+    throw new UnsupportedOperationException("This operation is not supported");
+  }
+
+  public long mergeNumDVs(long oldValue, long newValue) {
+    return Math.max(oldValue, newValue);
+  }
+
+  public long mergeNumNulls(long oldValue, long newValue) {
+    return oldValue + newValue;
+  }
+
+  public long mergeMaxColLen(long oldValue, long newValue) {
+    return Math.max(oldValue, newValue);
+  }
+
+  public double mergeAvgColLen(double oldValue, double newValue) {
+    return Math.max(oldValue, newValue);
+  }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/ColumnStatsMergerFactory.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/ColumnStatsMergerFactory.java
index 04a264942be..273c9a69929 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/ColumnStatsMergerFactory.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/ColumnStatsMergerFactory.java
@@ -50,10 +50,10 @@ public class ColumnStatsMergerFactory {
    *           two different types or if they are of an unknown type
    * @throws NullPointerException if statistics object is {@code null}
    */
-  public static ColumnStatsMerger getColumnStatsMerger(final ColumnStatisticsObj statsObjNew,
+  public static ColumnStatsMerger<?> getColumnStatsMerger(final ColumnStatisticsObj statsObjNew,
       final ColumnStatisticsObj statsObjOld) {
-    Objects.requireNonNull(statsObjNew, "Column 1 statistcs cannot be null");
-    Objects.requireNonNull(statsObjOld, "Column 2 statistcs cannot be null");
+    Objects.requireNonNull(statsObjNew, "Column 1 statistics cannot be null");
+    Objects.requireNonNull(statsObjOld, "Column 2 statistics cannot be null");
 
     final _Fields typeNew = statsObjNew.getStatsData().getSetField();
     final _Fields typeOld = statsObjOld.getStatsData().getSetField();
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/DateColumnStatsMerger.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/DateColumnStatsMerger.java
index 12c50354843..2f51af81f1d 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/DateColumnStatsMerger.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/DateColumnStatsMerger.java
@@ -32,7 +32,10 @@ import com.google.common.base.MoreObjects;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class DateColumnStatsMerger extends ColumnStatsMerger {
+import java.util.Arrays;
+import java.util.List;
+
+public class DateColumnStatsMerger extends ColumnStatsMerger<Date> {
 
   private static final Logger LOG = LoggerFactory.getLogger(DateColumnStatsMerger.class);
 
@@ -43,64 +46,57 @@ public class DateColumnStatsMerger extends ColumnStatsMerger {
     DateColumnStatsDataInspector aggregateData = dateInspectorFromStats(aggregateColStats);
     DateColumnStatsDataInspector newData = dateInspectorFromStats(newColStats);
 
-    setLowValue(aggregateData, newData);
-    setHighValue(aggregateData, newData);
-
-    aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
-    if (aggregateData.getNdvEstimator() == null || newData.getNdvEstimator() == null) {
-      aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
-    } else {
-      NumDistinctValueEstimator oldEst = aggregateData.getNdvEstimator();
-      NumDistinctValueEstimator newEst = newData.getNdvEstimator();
-      final long ndv;
-      if (oldEst.canMerge(newEst)) {
-        oldEst.mergeEstimators(newEst);
-        ndv = oldEst.estimateNumDistinctValues();
-        aggregateData.setNdvEstimator(oldEst);
-      } else {
-        ndv = Math.max(aggregateData.getNumDVs(), newData.getNumDVs());
-      }
-      LOG.debug("Use bitvector to merge column {}'s ndvs of {} and {} to be {}", aggregateColStats.getColName(),
-          aggregateData.getNumDVs(), newData.getNumDVs(), ndv);
-      aggregateData.setNumDVs(ndv);
+    Date lowValue = mergeLowValue(getLowValue(aggregateData), getLowValue(newData));
+    if (lowValue != null) {
+      aggregateData.setLowValue(lowValue);
+    }
+    Date highValue = mergeHighValue(getHighValue(aggregateData), getHighValue(newData));
+    if (highValue != null) {
+      aggregateData.setHighValue(highValue);
     }
+    aggregateData.setNumNulls(mergeNumNulls(aggregateData.getNumNulls(), newData.getNumNulls()));
+
+    NumDistinctValueEstimator oldNDVEst = aggregateData.getNdvEstimator();
+    NumDistinctValueEstimator newNDVEst = newData.getNdvEstimator();
+    List<NumDistinctValueEstimator> ndvEstimatorsList = Arrays.asList(oldNDVEst, newNDVEst);
+    aggregateData.setNumDVs(mergeNumDistinctValueEstimator(aggregateColStats.getColName(),
+        ndvEstimatorsList, aggregateData.getNumDVs(), newData.getNumDVs()));
+    aggregateData.setNdvEstimator(ndvEstimatorsList.get(0));
 
-    KllHistogramEstimator oldEst = aggregateData.getHistogramEstimator();
-    KllHistogramEstimator newEst = newData.getHistogramEstimator();
-    aggregateData.setHistogramEstimator(mergeHistogramEstimator(aggregateColStats.getColName(), oldEst, newEst));
+    KllHistogramEstimator oldKllEst = aggregateData.getHistogramEstimator();
+    KllHistogramEstimator newKllEst = newData.getHistogramEstimator();
+    aggregateData.setHistogramEstimator(mergeHistogramEstimator(aggregateColStats.getColName(), oldKllEst, newKllEst));
 
     aggregateColStats.getStatsData().setDateStats(aggregateData);
   }
 
-  public void setLowValue(DateColumnStatsDataInspector aggregateData, DateColumnStatsDataInspector newData) {
-    final Date aggregateLowValue = aggregateData.getLowValue();
-    final Date newLowValue = newData.getLowValue();
-
-    final Date mergedLowValue;
-    if (!aggregateData.isSetLowValue() && !newData.isSetLowValue()) {
-      return;
-    } else if (aggregateData.isSetLowValue() && newData.isSetLowValue()) {
-      mergedLowValue = ObjectUtils.min(aggregateLowValue, newLowValue);
-    } else {
-      mergedLowValue = MoreObjects.firstNonNull(aggregateLowValue, newLowValue);
-    }
+  public Date getLowValue(DateColumnStatsDataInspector data) {
+    return data.isSetLowValue() ? data.getLowValue() : null;
+  }
 
-    aggregateData.setLowValue(mergedLowValue);
+  public Date getHighValue(DateColumnStatsDataInspector data) {
+    return data.isSetHighValue() ? data.getHighValue() : null;
   }
 
-  public void setHighValue(DateColumnStatsDataInspector aggregateData, DateColumnStatsDataInspector newData) {
-    final Date aggregateHighValue = aggregateData.getHighValue();
-    final Date newHighValue = newData.getHighValue();
-
-    final Date mergedHighValue;
-    if (!aggregateData.isSetHighValue() && !newData.isSetHighValue()) {
-      return;
-    } else if (aggregateData.isSetHighValue() && newData.isSetHighValue()) {
-      mergedHighValue = ObjectUtils.max(newHighValue, aggregateHighValue);
-    } else {
-      mergedHighValue = MoreObjects.firstNonNull(aggregateHighValue, newHighValue);
+  @Override
+  public Date mergeLowValue(Date oldValue, Date newValue) {
+    if (oldValue != null && newValue != null) {
+      return ObjectUtils.min(oldValue, newValue);
     }
+    if (oldValue != null || newValue != null) {
+      return MoreObjects.firstNonNull(oldValue, newValue);
+    }
+    return null;
+  }
 
-    aggregateData.setHighValue(mergedHighValue);
+  @Override
+  public Date mergeHighValue(Date oldValue, Date newValue) {
+    if (oldValue != null && newValue != null) {
+      return ObjectUtils.max(oldValue, newValue);
+    }
+    if (oldValue != null || newValue != null) {
+      return MoreObjects.firstNonNull(oldValue, newValue);
+    }
+    return null;
   }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/DecimalColumnStatsMerger.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/DecimalColumnStatsMerger.java
index e3737f7a2d5..523f848ba44 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/DecimalColumnStatsMerger.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/DecimalColumnStatsMerger.java
@@ -33,7 +33,10 @@ import org.apache.commons.lang3.ObjectUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class DecimalColumnStatsMerger extends ColumnStatsMerger {
+import java.util.Arrays;
+import java.util.List;
+
+public class DecimalColumnStatsMerger extends ColumnStatsMerger<Decimal> {
 
   private static final Logger LOG = LoggerFactory.getLogger(DecimalColumnStatsMerger.class);
 
@@ -44,65 +47,57 @@ public class DecimalColumnStatsMerger extends ColumnStatsMerger {
     DecimalColumnStatsDataInspector aggregateData = decimalInspectorFromStats(aggregateColStats);
     DecimalColumnStatsDataInspector newData = decimalInspectorFromStats(newColStats);
 
-    setLowValue(aggregateData, newData);
-    setHighValue(aggregateData, newData);
-
-    aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
-
-    if (aggregateData.getNdvEstimator() == null || newData.getNdvEstimator() == null) {
-      aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
-    } else {
-      NumDistinctValueEstimator oldEst = aggregateData.getNdvEstimator();
-      NumDistinctValueEstimator newEst = newData.getNdvEstimator();
-      final long ndv;
-      if (oldEst.canMerge(newEst)) {
-        oldEst.mergeEstimators(newEst);
-        ndv = oldEst.estimateNumDistinctValues();
-        aggregateData.setNdvEstimator(oldEst);
-      } else {
-        ndv = Math.max(aggregateData.getNumDVs(), newData.getNumDVs());
-      }
-      LOG.debug("Use bitvector to merge column {}'s ndvs of {} and {} to be {}", aggregateColStats.getColName(),
-          aggregateData.getNumDVs(), newData.getNumDVs(), ndv);
-      aggregateData.setNumDVs(ndv);
+    Decimal lowValue = mergeLowValue(getLowValue(aggregateData), getLowValue(newData));
+    if (lowValue != null) {
+      aggregateData.setLowValue(lowValue);
+    }
+    Decimal highValue = mergeHighValue(getHighValue(aggregateData), getHighValue(newData));
+    if (highValue != null) {
+      aggregateData.setHighValue(highValue);
     }
+    aggregateData.setNumNulls(mergeNumNulls(aggregateData.getNumNulls(), newData.getNumNulls()));
+
+    NumDistinctValueEstimator oldNDVEst = aggregateData.getNdvEstimator();
+    NumDistinctValueEstimator newNDVEst = newData.getNdvEstimator();
+    List<NumDistinctValueEstimator> ndvEstimatorsList = Arrays.asList(oldNDVEst, newNDVEst);
+    aggregateData.setNumDVs(mergeNumDistinctValueEstimator(aggregateColStats.getColName(),
+        ndvEstimatorsList, aggregateData.getNumDVs(), newData.getNumDVs()));
+    aggregateData.setNdvEstimator(ndvEstimatorsList.get(0));
 
-    KllHistogramEstimator oldEst = aggregateData.getHistogramEstimator();
-    KllHistogramEstimator newEst = newData.getHistogramEstimator();
-    aggregateData.setHistogramEstimator(mergeHistogramEstimator(aggregateColStats.getColName(), oldEst, newEst));
+    KllHistogramEstimator oldKllEst = aggregateData.getHistogramEstimator();
+    KllHistogramEstimator newKllEst = newData.getHistogramEstimator();
+    aggregateData.setHistogramEstimator(mergeHistogramEstimator(aggregateColStats.getColName(), oldKllEst, newKllEst));
 
     aggregateColStats.getStatsData().setDecimalStats(aggregateData);
   }
 
-  public void setLowValue(DecimalColumnStatsDataInspector aggregateData, DecimalColumnStatsDataInspector newData) {
-    final Decimal aggregateLowValue = aggregateData.getLowValue();
-    final Decimal newLowValue = newData.getLowValue();
-
-    final Decimal mergedLowValue;
-    if (!aggregateData.isSetLowValue() && !newData.isSetLowValue()) {
-      return;
-    } else if (aggregateData.isSetLowValue() && newData.isSetLowValue()) {
-      mergedLowValue = ObjectUtils.min(newLowValue, aggregateLowValue);
-    } else {
-      mergedLowValue = MoreObjects.firstNonNull(aggregateLowValue, newLowValue);
-    }
+  public Decimal getLowValue(DecimalColumnStatsDataInspector data) {
+    return data.isSetLowValue() ? data.getLowValue() : null;
+  }
 
-    aggregateData.setLowValue(mergedLowValue);
+  public Decimal getHighValue(DecimalColumnStatsDataInspector data) {
+    return data.isSetHighValue() ? data.getHighValue() : null;
   }
 
-  public void setHighValue(DecimalColumnStatsDataInspector aggregateData, DecimalColumnStatsDataInspector newData) {
-    final Decimal aggregateHighValue = aggregateData.getHighValue();
-    final Decimal newHighValue = newData.getHighValue();
-
-    final Decimal mergedHighValue;
-    if (!aggregateData.isSetHighValue() && !newData.isSetHighValue()) {
-      return;
-    } else if (aggregateData.isSetHighValue() && newData.isSetHighValue()) {
-      mergedHighValue = ObjectUtils.max(aggregateHighValue, newHighValue);
-    } else {
-      mergedHighValue = MoreObjects.firstNonNull(aggregateHighValue, newHighValue);
+  @Override
+  public Decimal mergeLowValue(Decimal oldValue, Decimal newValue) {
+    if (oldValue != null && newValue != null) {
+      return ObjectUtils.min(oldValue, newValue);
     }
+    if (oldValue != null || newValue != null) {
+      return MoreObjects.firstNonNull(oldValue, newValue);
+    }
+    return null;
+  }
 
-    aggregateData.setHighValue(mergedHighValue);
+  @Override
+  public Decimal mergeHighValue(Decimal oldValue, Decimal newValue) {
+    if (oldValue != null && newValue != null) {
+      return ObjectUtils.max(oldValue, newValue);
+    }
+    if (oldValue != null || newValue != null) {
+      return MoreObjects.firstNonNull(oldValue, newValue);
+    }
+    return null;
   }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/DoubleColumnStatsMerger.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/DoubleColumnStatsMerger.java
index ff552b14329..da3e6ead04e 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/DoubleColumnStatsMerger.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/DoubleColumnStatsMerger.java
@@ -26,9 +26,12 @@ import org.apache.hadoop.hive.metastore.columnstats.cache.DoubleColumnStatsDataI
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
+import java.util.List;
+
 import static org.apache.hadoop.hive.metastore.columnstats.ColumnsStatsUtils.doubleInspectorFromStats;
 
-public class DoubleColumnStatsMerger extends ColumnStatsMerger {
+public class DoubleColumnStatsMerger extends ColumnStatsMerger<Double> {
 
   private static final Logger LOG = LoggerFactory.getLogger(DoubleColumnStatsMerger.class);
 
@@ -38,63 +41,58 @@ public class DoubleColumnStatsMerger extends ColumnStatsMerger {
 
     DoubleColumnStatsDataInspector aggregateData = doubleInspectorFromStats(aggregateColStats);
     DoubleColumnStatsDataInspector newData = doubleInspectorFromStats(newColStats);
-    setLowValue(aggregateData, newData);
-    setHighValue(aggregateData, newData);
-    aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
-    if (aggregateData.getNdvEstimator() == null || newData.getNdvEstimator() == null) {
-      aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
-    } else {
-      NumDistinctValueEstimator oldEst = aggregateData.getNdvEstimator();
-      NumDistinctValueEstimator newEst = newData.getNdvEstimator();
-      long ndv;
-      if (oldEst.canMerge(newEst)) {
-        oldEst.mergeEstimators(newEst);
-        ndv = oldEst.estimateNumDistinctValues();
-        aggregateData.setNdvEstimator(oldEst);
-      } else {
-        ndv = Math.max(aggregateData.getNumDVs(), newData.getNumDVs());
-      }
-      LOG.debug("Use bitvector to merge column {}'s ndvs of {} and {} to be {}",
-          aggregateColStats.getColName(), aggregateData.getNumDVs(), newData.getNumDVs(),  ndv);
-      aggregateData.setNumDVs(ndv);
+
+    Double lowValue = mergeLowValue(getLowValue(aggregateData), getLowValue(newData));
+    if (lowValue != null) {
+      aggregateData.setLowValue(lowValue);
     }
+    Double highValue = mergeHighValue(getHighValue(aggregateData), getHighValue(newData));
+    if (highValue != null) {
+      aggregateData.setHighValue(highValue);
+    }
+    aggregateData.setNumNulls(mergeNumNulls(aggregateData.getNumNulls(), newData.getNumNulls()));
+
+    NumDistinctValueEstimator oldNDVEst = aggregateData.getNdvEstimator();
+    NumDistinctValueEstimator newNDVEst = newData.getNdvEstimator();
+    List<NumDistinctValueEstimator> ndvEstimatorsList = Arrays.asList(oldNDVEst, newNDVEst);
+    aggregateData.setNumDVs(mergeNumDistinctValueEstimator(aggregateColStats.getColName(),
+        ndvEstimatorsList, aggregateData.getNumDVs(), newData.getNumDVs()));
+    aggregateData.setNdvEstimator(ndvEstimatorsList.get(0));
 
-    KllHistogramEstimator oldEst = aggregateData.getHistogramEstimator();
-    KllHistogramEstimator newEst = newData.getHistogramEstimator();
-    aggregateData.setHistogramEstimator(mergeHistogramEstimator(aggregateColStats.getColName(), oldEst, newEst));
+    KllHistogramEstimator oldKllEst = aggregateData.getHistogramEstimator();
+    KllHistogramEstimator newKllEst = newData.getHistogramEstimator();
+    aggregateData.setHistogramEstimator(mergeHistogramEstimator(aggregateColStats.getColName(), oldKllEst, newKllEst));
 
     aggregateColStats.getStatsData().setDoubleStats(aggregateData);
   }
 
-  public void setLowValue(DoubleColumnStatsDataInspector aggregateData, DoubleColumnStatsDataInspector newData) {
-    final double lowValue;
-
-    if (aggregateData.isSetLowValue() && newData.isSetLowValue()) {
-      lowValue = Math.min(aggregateData.getLowValue(), newData.getLowValue());
-    } else if (aggregateData.isSetLowValue()) {
-      lowValue = aggregateData.getLowValue();
-    } else if (newData.isSetLowValue()) {
-      lowValue = newData.getLowValue();
-    } else {
-      return;
-    }
+  public Double getLowValue(DoubleColumnStatsDataInspector data) {
+    return data.isSetLowValue() ? data.getLowValue() : null;
+  }
 
-    aggregateData.setLowValue(lowValue);
+  public Double getHighValue(DoubleColumnStatsDataInspector data) {
+    return data.isSetHighValue() ? data.getHighValue() : null;
   }
 
-  public void setHighValue(DoubleColumnStatsDataInspector aggregateData, DoubleColumnStatsDataInspector newData) {
-    final double highValue;
-
-    if (aggregateData.isSetHighValue() && newData.isSetHighValue()) {
-      highValue = Math.max(aggregateData.getHighValue(), newData.getHighValue());
-    } else if (aggregateData.isSetHighValue()) {
-      highValue = aggregateData.getHighValue();
-    } else if (newData.isSetHighValue()) {
-      highValue = newData.getHighValue();
-    } else {
-      return;
+  @Override
+  public Double mergeLowValue(Double oldValue, Double newValue) {
+    if (oldValue != null && newValue != null) {
+      return Math.min(oldValue, newValue);
+    } else if (oldValue != null) {
+      return oldValue;
     }
+    // it can be null
+    return newValue;
+  }
 
-    aggregateData.setHighValue(highValue);
+  @Override
+  public Double mergeHighValue(Double oldValue, Double newValue) {
+      if (oldValue != null && newValue != null) {
+        return Math.max(oldValue, newValue);
+      } else if (oldValue != null) {
+        return oldValue;
+      }
+      // it can be null
+      return newValue;
   }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/LongColumnStatsMerger.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/LongColumnStatsMerger.java
index 9cd4ba72c5d..ee2753820d8 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/LongColumnStatsMerger.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/LongColumnStatsMerger.java
@@ -26,9 +26,12 @@ import org.apache.hadoop.hive.metastore.columnstats.cache.LongColumnStatsDataIns
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
+import java.util.List;
+
 import static org.apache.hadoop.hive.metastore.columnstats.ColumnsStatsUtils.longInspectorFromStats;
 
-public class LongColumnStatsMerger extends ColumnStatsMerger {
+public class LongColumnStatsMerger extends ColumnStatsMerger<Long> {
 
   private static final Logger LOG = LoggerFactory.getLogger(LongColumnStatsMerger.class);
 
@@ -38,63 +41,58 @@ public class LongColumnStatsMerger extends ColumnStatsMerger {
 
     LongColumnStatsDataInspector aggregateData = longInspectorFromStats(aggregateColStats);
     LongColumnStatsDataInspector newData = longInspectorFromStats(newColStats);
-    setLowValue(aggregateData, newData);
-    setHighValue(aggregateData, newData);
-    aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
-    if (aggregateData.getNdvEstimator() == null || newData.getNdvEstimator() == null) {
-      aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
-    } else {
-      NumDistinctValueEstimator oldEst = aggregateData.getNdvEstimator();
-      NumDistinctValueEstimator newEst = newData.getNdvEstimator();
-      final long ndv;
-      if (oldEst.canMerge(newEst)) {
-        oldEst.mergeEstimators(newEst);
-        ndv = oldEst.estimateNumDistinctValues();
-        aggregateData.setNdvEstimator(oldEst);
-      } else {
-        ndv = Math.max(aggregateData.getNumDVs(), newData.getNumDVs());
-      }
-      LOG.debug("Use bitvector to merge column {}'s ndvs of {} and {} to be {}", aggregateColStats.getColName(),
-          aggregateData.getNumDVs(), newData.getNumDVs(), ndv);
-      aggregateData.setNumDVs(ndv);
+
+    Long lowValue = mergeLowValue(getLowValue(aggregateData), getLowValue(newData));
+    if (lowValue != null) {
+      aggregateData.setLowValue(lowValue);
+    }
+    Long highValue = mergeHighValue(getHighValue(aggregateData), getHighValue(newData));
+    if (highValue != null) {
+      aggregateData.setHighValue(highValue);
     }
+    aggregateData.setNumNulls(mergeNumNulls(aggregateData.getNumNulls(), newData.getNumNulls()));
 
-    KllHistogramEstimator oldEst = aggregateData.getHistogramEstimator();
-    KllHistogramEstimator newEst = newData.getHistogramEstimator();
-    aggregateData.setHistogramEstimator(mergeHistogramEstimator(aggregateColStats.getColName(), oldEst, newEst));
+    NumDistinctValueEstimator oldNDVEst = aggregateData.getNdvEstimator();
+    NumDistinctValueEstimator newNDVEst = newData.getNdvEstimator();
+    List<NumDistinctValueEstimator> ndvEstimatorsList = Arrays.asList(oldNDVEst, newNDVEst);
+    aggregateData.setNumDVs(mergeNumDistinctValueEstimator(aggregateColStats.getColName(),
+        ndvEstimatorsList, aggregateData.getNumDVs(), newData.getNumDVs()));
+    aggregateData.setNdvEstimator(ndvEstimatorsList.get(0));
+
+    KllHistogramEstimator oldKllEst = aggregateData.getHistogramEstimator();
+    KllHistogramEstimator newKllEst = newData.getHistogramEstimator();
+    aggregateData.setHistogramEstimator(mergeHistogramEstimator(aggregateColStats.getColName(), oldKllEst, newKllEst));
 
     aggregateColStats.getStatsData().setLongStats(aggregateData);
   }
 
-  public void setLowValue(LongColumnStatsDataInspector aggregateData, LongColumnStatsDataInspector newData) {
-    final long lowValue;
-
-    if (aggregateData.isSetLowValue() && newData.isSetLowValue()) {
-      lowValue = Math.min(aggregateData.getLowValue(), newData.getLowValue());
-    } else if (aggregateData.isSetLowValue()) {
-      lowValue = aggregateData.getLowValue();
-    } else if (newData.isSetLowValue()) {
-      lowValue = newData.getLowValue();
-    } else {
-      return;
-    }
+  public Long getLowValue(LongColumnStatsDataInspector data) {
+    return data.isSetLowValue() ? data.getLowValue() : null;
+  }
 
-    aggregateData.setLowValue(lowValue);
+  public Long getHighValue(LongColumnStatsDataInspector data) {
+    return data.isSetHighValue() ? data.getHighValue() : null;
   }
 
-  public void setHighValue(LongColumnStatsDataInspector aggregateData, LongColumnStatsDataInspector newData) {
-    final long highValue;
-
-    if (aggregateData.isSetHighValue() && newData.isSetHighValue()) {
-      highValue = Math.max(aggregateData.getHighValue(), newData.getHighValue());
-    } else if (aggregateData.isSetHighValue()) {
-      highValue = aggregateData.getHighValue();
-    } else if (newData.isSetHighValue()) {
-      highValue = newData.getHighValue();
-    } else {
-      return;
+  @Override
+  public Long mergeLowValue(Long oldValue, Long newValue) {
+    if (oldValue != null && newValue != null) {
+      return Math.min(oldValue, newValue);
+    } else if (oldValue != null) {
+      return oldValue;
     }
+    // it can be null
+    return newValue;
+  }
 
-    aggregateData.setHighValue(highValue);
+  @Override
+  public Long mergeHighValue(Long oldValue, Long newValue) {
+    if (oldValue != null && newValue != null) {
+      return Math.max(oldValue, newValue);
+    } else if (oldValue != null) {
+      return oldValue;
+    }
+    // it can be null
+    return newValue;
   }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/StringColumnStatsMerger.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/StringColumnStatsMerger.java
index 7bd5b72802e..591c53437fa 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/StringColumnStatsMerger.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/StringColumnStatsMerger.java
@@ -25,9 +25,12 @@ import org.apache.hadoop.hive.metastore.columnstats.cache.StringColumnStatsDataI
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
+import java.util.List;
+
 import static org.apache.hadoop.hive.metastore.columnstats.ColumnsStatsUtils.stringInspectorFromStats;
 
-public class StringColumnStatsMerger extends ColumnStatsMerger {
+public class StringColumnStatsMerger extends ColumnStatsMerger<String> {
 
   private static final Logger LOG = LoggerFactory.getLogger(StringColumnStatsMerger.class);
 
@@ -37,26 +40,17 @@ public class StringColumnStatsMerger extends ColumnStatsMerger {
 
     StringColumnStatsDataInspector aggregateData = stringInspectorFromStats(aggregateColStats);
     StringColumnStatsDataInspector newData = stringInspectorFromStats(newColStats);
-    aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen()));
-    aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen()));
-    aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
-    if (aggregateData.getNdvEstimator() == null || newData.getNdvEstimator() == null) {
-      aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
-    } else {
-      NumDistinctValueEstimator oldEst = aggregateData.getNdvEstimator();
-      NumDistinctValueEstimator newEst = newData.getNdvEstimator();
-      final long ndv;
-      if (oldEst.canMerge(newEst)) {
-        oldEst.mergeEstimators(newEst);
-        ndv = oldEst.estimateNumDistinctValues();
-        aggregateData.setNdvEstimator(oldEst);
-      } else {
-        ndv = Math.max(aggregateData.getNumDVs(), newData.getNumDVs());
-      }
-      LOG.debug("Use bitvector to merge column {}'s ndvs of {} and {} to be {}", aggregateColStats.getColName(),
-          aggregateData.getNumDVs(), newData.getNumDVs(), ndv);
-      aggregateData.setNumDVs(ndv);
-    }
+
+    aggregateData.setMaxColLen(mergeMaxColLen(aggregateData.getMaxColLen(), newData.getMaxColLen()));
+    aggregateData.setAvgColLen(mergeAvgColLen(aggregateData.getAvgColLen(), newData.getAvgColLen()));
+    aggregateData.setNumNulls(mergeNumNulls(aggregateData.getNumNulls(), newData.getNumNulls()));
+
+    NumDistinctValueEstimator oldNDVEst = aggregateData.getNdvEstimator();
+    NumDistinctValueEstimator newNDVEst = newData.getNdvEstimator();
+    List<NumDistinctValueEstimator> ndvEstimatorsList = Arrays.asList(oldNDVEst, newNDVEst);
+    aggregateData.setNumDVs(mergeNumDistinctValueEstimator(aggregateColStats.getColName(),
+        ndvEstimatorsList, aggregateData.getNumDVs(), newData.getNumDVs()));
+    aggregateData.setNdvEstimator(ndvEstimatorsList.get(0));
 
     aggregateColStats.getStatsData().setStringStats(aggregateData);
   }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/TimestampColumnStatsMerger.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/TimestampColumnStatsMerger.java
index 7ecdc2139ad..7198d909a9d 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/TimestampColumnStatsMerger.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/merge/TimestampColumnStatsMerger.java
@@ -31,7 +31,10 @@ import com.google.common.base.MoreObjects;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TimestampColumnStatsMerger extends ColumnStatsMerger {
+import java.util.Arrays;
+import java.util.List;
+
+public class TimestampColumnStatsMerger extends ColumnStatsMerger<Timestamp> {
 
   private static final Logger LOG = LoggerFactory.getLogger(TimestampColumnStatsMerger.class);
 
@@ -42,64 +45,57 @@ public class TimestampColumnStatsMerger extends ColumnStatsMerger {
     TimestampColumnStatsDataInspector aggregateData = timestampInspectorFromStats(aggregateColStats);
     TimestampColumnStatsDataInspector newData = timestampInspectorFromStats(newColStats);
 
-    setLowValue(aggregateData, newData);
-    setHighValue(aggregateData, newData);
-
-    aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
-    if (aggregateData.getNdvEstimator() == null || newData.getNdvEstimator() == null) {
-      aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
-    } else {
-      NumDistinctValueEstimator oldEst = aggregateData.getNdvEstimator();
-      NumDistinctValueEstimator newEst = newData.getNdvEstimator();
-      final long ndv;
-      if (oldEst.canMerge(newEst)) {
-        oldEst.mergeEstimators(newEst);
-        ndv = oldEst.estimateNumDistinctValues();
-        aggregateData.setNdvEstimator(oldEst);
-      } else {
-        ndv = Math.max(aggregateData.getNumDVs(), newData.getNumDVs());
-      }
-      LOG.debug("Use bitvector to merge column {}'s ndvs of {} and {} to be {}", aggregateColStats.getColName(),
-          aggregateData.getNumDVs(), newData.getNumDVs(), ndv);
-      aggregateData.setNumDVs(ndv);
+    Timestamp lowValue = mergeLowValue(getLowValue(aggregateData), getLowValue(newData));
+    if (lowValue != null) {
+      aggregateData.setLowValue(lowValue);
+    }
+    Timestamp highValue = mergeHighValue(getHighValue(aggregateData), getHighValue(newData));
+    if (highValue != null) {
+      aggregateData.setHighValue(highValue);
     }
+    aggregateData.setNumNulls(mergeNumNulls(aggregateData.getNumNulls(), newData.getNumNulls()));
+
+    NumDistinctValueEstimator oldNDVEst = aggregateData.getNdvEstimator();
+    NumDistinctValueEstimator newNDVEst = newData.getNdvEstimator();
+    List<NumDistinctValueEstimator> ndvEstimatorsList = Arrays.asList(oldNDVEst, newNDVEst);
+    aggregateData.setNumDVs(mergeNumDistinctValueEstimator(aggregateColStats.getColName(),
+        ndvEstimatorsList, aggregateData.getNumDVs(), newData.getNumDVs()));
+    aggregateData.setNdvEstimator(ndvEstimatorsList.get(0));
 
-    KllHistogramEstimator oldEst = aggregateData.getHistogramEstimator();
-    KllHistogramEstimator newEst = newData.getHistogramEstimator();
-    aggregateData.setHistogramEstimator(mergeHistogramEstimator(aggregateColStats.getColName(), oldEst, newEst));
+    KllHistogramEstimator oldKllEst = aggregateData.getHistogramEstimator();
+    KllHistogramEstimator newKllEst = newData.getHistogramEstimator();
+    aggregateData.setHistogramEstimator(mergeHistogramEstimator(aggregateColStats.getColName(), oldKllEst, newKllEst));
 
     aggregateColStats.getStatsData().setTimestampStats(aggregateData);
   }
 
-  public void setLowValue(TimestampColumnStatsDataInspector aggregateData, TimestampColumnStatsDataInspector newData) {
-    final Timestamp aggregateLowValue = aggregateData.getLowValue();
-    final Timestamp newLowValue = newData.getLowValue();
-
-    final Timestamp mergedLowValue;
-    if (!aggregateData.isSetLowValue() && !newData.isSetLowValue()) {
-      return;
-    } else if (aggregateData.isSetLowValue() && newData.isSetLowValue()) {
-      mergedLowValue = ObjectUtils.min(newLowValue, aggregateLowValue);
-    } else {
-      mergedLowValue = MoreObjects.firstNonNull(aggregateLowValue, newLowValue);
-    }
+  public Timestamp getLowValue(TimestampColumnStatsDataInspector data) {
+    return data.isSetLowValue() ? data.getLowValue() : null;
+  }
 
-    aggregateData.setLowValue(mergedLowValue);
+  public Timestamp getHighValue(TimestampColumnStatsDataInspector data) {
+    return data.isSetHighValue() ? data.getHighValue() : null;
   }
 
-  public void setHighValue(TimestampColumnStatsDataInspector aggregateData, TimestampColumnStatsDataInspector newData) {
-    final Timestamp aggregateHighValue = aggregateData.getHighValue();
-    final Timestamp newHighValue = newData.getHighValue();
-
-    final Timestamp mergedHighValue;
-    if (!aggregateData.isSetHighValue() && !newData.isSetHighValue()) {
-      return;
-    } else if (aggregateData.isSetHighValue() && newData.isSetHighValue()) {
-      mergedHighValue = ObjectUtils.max(aggregateHighValue, newHighValue);
-    } else {
-      mergedHighValue = MoreObjects.firstNonNull(aggregateHighValue, newHighValue);
+  @Override
+  public Timestamp mergeLowValue(Timestamp oldValue, Timestamp newValue) {
+    if (oldValue != null && newValue != null) {
+      return ObjectUtils.min(oldValue, newValue);
     }
+    if (oldValue != null || newValue != null) {
+      return MoreObjects.firstNonNull(oldValue, newValue);
+    }
+    return null;
+  }
 
-    aggregateData.setHighValue(mergedHighValue);
+  @Override
+  public Timestamp mergeHighValue(Timestamp oldValue, Timestamp newValue) {
+    if (oldValue != null && newValue != null) {
+      return ObjectUtils.max(oldValue, newValue);
+    }
+    if (oldValue != null || newValue != null) {
+      return MoreObjects.firstNonNull(oldValue, newValue);
+    }
+    return null;
   }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java
index 5e3a3311294..d487752f1a3 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java
@@ -763,7 +763,7 @@ public class MetaStoreServerUtils {
         assert (statsObjNew.getStatsData().getSetField() == statsObjOld.getStatsData()
             .getSetField());
         // If statsObjOld is found, we can merge.
-        ColumnStatsMerger merger = ColumnStatsMergerFactory.getColumnStatsMerger(statsObjNew,
+        ColumnStatsMerger<?> merger = ColumnStatsMergerFactory.getColumnStatsMerger(statsObjNew,
             statsObjOld);
         merger.merge(statsObjNew, statsObjOld);
       }
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/BinaryColumnStatsMergerTest.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/BinaryColumnStatsMergerTest.java
new file mode 100644
index 00000000000..0b49f5a172c
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/BinaryColumnStatsMergerTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.merge;
+
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.columnstats.ColStatsBuilder;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.apache.hadoop.hive.metastore.columnstats.merge.ColumnStatsMergerTest.createColumnStatisticsObj;
+import static org.junit.Assert.assertEquals;
+
+@Category(MetastoreUnitTest.class)
+public class BinaryColumnStatsMergerTest {
+  private static final BinaryColumnStatsMerger MERGER = new BinaryColumnStatsMerger();
+
+  @Test
+  public void testMergeNonNullValues() {
+    ColumnStatisticsObj aggrObj = createColumnStatisticsObj(new ColStatsBuilder<>(byte[].class)
+        .avgColLen(3)
+        .maxColLen(2)
+        .numNulls(2)
+        .build());
+    ColumnStatisticsObj newObj = createColumnStatisticsObj(new ColStatsBuilder<>(byte[].class)
+        .avgColLen(2)
+        .maxColLen(3)
+        .numNulls(3)
+        .build());
+    MERGER.merge(aggrObj, newObj);
+
+    newObj = createColumnStatisticsObj(new ColStatsBuilder<>(byte[].class)
+        .avgColLen(3)
+        .maxColLen(3)
+        .numNulls(1)
+        .build());
+    MERGER.merge(aggrObj, newObj);
+
+    ColumnStatisticsData expectedColumnStatisticsData = new ColStatsBuilder<>(byte[].class)
+        .avgColLen(3)
+        .maxColLen(3)
+        .numNulls(6)
+        .build();
+    assertEquals(expectedColumnStatisticsData, aggrObj.getStatsData());
+  }
+}
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/BooleanColumnStatsMergerTest.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/BooleanColumnStatsMergerTest.java
new file mode 100644
index 00000000000..03eac017a65
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/BooleanColumnStatsMergerTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.merge;
+
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.columnstats.ColStatsBuilder;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.apache.hadoop.hive.metastore.columnstats.merge.ColumnStatsMergerTest.createColumnStatisticsObj;
+import static org.junit.Assert.assertEquals;
+
+@Category(MetastoreUnitTest.class)
+public class BooleanColumnStatsMergerTest {
+  private static final BooleanColumnStatsMerger MERGER = new BooleanColumnStatsMerger();
+
+  @Test
+  public void testMergeNonNullValues() {
+    ColumnStatisticsObj aggrObj = createColumnStatisticsObj(new ColStatsBuilder<>(Boolean.class)
+        .numFalses(1)
+        .numTrues(2)
+        .numNulls(2)
+        .build());
+    ColumnStatisticsObj newObj = createColumnStatisticsObj(new ColStatsBuilder<>(Boolean.class)
+        .numFalses(1)
+        .numTrues(2)
+        .numNulls(3)
+        .build());
+    MERGER.merge(aggrObj, newObj);
+
+    newObj = createColumnStatisticsObj(new ColStatsBuilder<>(Boolean.class)
+        .numFalses(1)
+        .numTrues(1)
+        .numNulls(1)
+        .build());
+    MERGER.merge(aggrObj, newObj);
+
+    ColumnStatisticsData expectedColumnStatisticsData = new ColStatsBuilder<>(Boolean.class)
+        .numFalses(3)
+        .numTrues(5)
+        .numNulls(6)
+        .build();
+    assertEquals(expectedColumnStatisticsData, aggrObj.getStatsData());
+  }
+}
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/ColumnStatsMergerTest.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/ColumnStatsMergerTest.java
index 30798fc875c..7386782b981 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/ColumnStatsMergerTest.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/ColumnStatsMergerTest.java
@@ -23,24 +23,72 @@ import com.google.common.primitives.Longs;
 import org.apache.datasketches.kll.KllFloatsSketch;
 import org.apache.hadoop.hive.common.histogram.KllHistogramEstimator;
 import org.apache.hadoop.hive.common.histogram.KllHistogramEstimatorFactory;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
+import org.apache.hadoop.hive.common.ndv.hll.HyperLogLog;
 import org.apache.hadoop.hive.metastore.StatisticsTestUtils;
 import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
-import org.junit.Assert;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
 @Category(MetastoreUnitTest.class)
 public class ColumnStatsMergerTest {
 
   // the implementation we use does not matter, as we only tests methods from the parent plan here
-  private final static ColumnStatsMerger MERGER = new DateColumnStatsMerger();
+  private final static ColumnStatsMerger<?> MERGER = new DateColumnStatsMerger();
+
+  private final static List<ColumnStatsMerger<?>> MERGERS = Arrays.asList(
+      new BinaryColumnStatsMerger(),
+      new BooleanColumnStatsMerger(),
+      new DateColumnStatsMerger(),
+      new DecimalColumnStatsMerger(),
+      new DoubleColumnStatsMerger(),
+      new LongColumnStatsMerger(),
+      new StringColumnStatsMerger(),
+      new TimestampColumnStatsMerger()
+  );
 
   private final static long[] VALUES_1 = { 1, 2 };
   private final static long[] VALUES_2 = { 1, 3 };
 
+  private final static HyperLogLog HLL_1 = StatisticsTestUtils.createHll(VALUES_1);
+  private final static HyperLogLog HLL_2 = StatisticsTestUtils.createHll(VALUES_2);
+
   private final static KllFloatsSketch KLL_1 = StatisticsTestUtils.createKll(VALUES_1);
   private final static KllFloatsSketch KLL_2 = StatisticsTestUtils.createKll(VALUES_2);
 
+  @Test
+  public void testMergeNumDVs() {
+    assertEquals(3, MERGER.mergeNumDVs(1, 3));
+    assertEquals(3, MERGER.mergeNumDVs(3, 1));
+  }
+
+  @Test
+  public void testMergeNumNulls() {
+    assertEquals(4, MERGER.mergeNumNulls(1, 3));
+    assertEquals(4, MERGER.mergeNumNulls(3, 1));
+  }
+
+  @Test
+  public void testMergeMaxColLen() {
+    assertEquals(3, MERGER.mergeMaxColLen(1, 3));
+    assertEquals(3, MERGER.mergeMaxColLen(3, 1));
+  }
+
+  @Test
+  public void testMergeAvgColLen() {
+    assertEquals(3, MERGER.mergeAvgColLen(1, 3), Double.MIN_VALUE);
+    assertEquals(3, MERGER.mergeAvgColLen(3, 1), Double.MIN_VALUE);
+  }
+
   @Test
   public void testMergeNonNullHistogramEstimators() {
     KllHistogramEstimator estimator1 =
@@ -54,7 +102,7 @@ public class ColumnStatsMergerTest {
     KllHistogramEstimator expectedEstimator =
         KllHistogramEstimatorFactory.getKllHistogramEstimator(expectedKll.toByteArray());
 
-    Assert.assertEquals(expectedEstimator.getSketch().toString(), computedEstimator.getSketch().toString());
+    assertEquals(expectedEstimator.getSketch().toString(), computedEstimator.getSketch().toString());
   }
 
   @Test
@@ -64,7 +112,7 @@ public class ColumnStatsMergerTest {
 
     KllHistogramEstimator computedEstimator = MERGER.mergeHistogramEstimator("", null, estimator2);
 
-    Assert.assertEquals(estimator2.getSketch().toString(), computedEstimator.getSketch().toString());
+    assertEquals(estimator2.getSketch().toString(), computedEstimator.getSketch().toString());
   }
 
   @Test
@@ -74,11 +122,70 @@ public class ColumnStatsMergerTest {
 
     KllHistogramEstimator computedEstimator = MERGER.mergeHistogramEstimator("", estimator1, null);
 
-    Assert.assertEquals(estimator1.getSketch().toString(), computedEstimator.getSketch().toString());
+    assertEquals(estimator1.getSketch().toString(), computedEstimator.getSketch().toString());
   }
 
   @Test
   public void testMergeNullHistogramEstimators() {
-    Assert.assertNull(MERGER.mergeHistogramEstimator("", null, null));
+    assertNull(MERGER.mergeHistogramEstimator("", null, null));
+  }
+
+  @Test
+  public void testMergeNonNullNDVEstimators() {
+    NumDistinctValueEstimator estimator1 =
+        NumDistinctValueEstimatorFactory.getNumDistinctValueEstimator(HLL_1.serialize());
+    NumDistinctValueEstimator estimator2 =
+        NumDistinctValueEstimatorFactory.getNumDistinctValueEstimator(HLL_2.serialize());
+
+    for (ColumnStatsMerger<?> MERGER : MERGERS) {
+      long computedNDV = MERGER.mergeNumDistinctValueEstimator(
+          "", Arrays.asList(estimator1, estimator2), 2, 2);
+      assertEquals(3, computedNDV);
+    }
+  }
+
+  @Test
+  public void testMergeNDVEstimatorsFirstNull() {
+    NumDistinctValueEstimator estimator2 =
+        NumDistinctValueEstimatorFactory.getNumDistinctValueEstimator(HLL_2.serialize());
+
+    for (ColumnStatsMerger<?> MERGER : MERGERS) {
+      List<NumDistinctValueEstimator> estimatorList = Arrays.asList(null, estimator2);
+      long computedNDV = MERGER.mergeNumDistinctValueEstimator("", estimatorList, 1, 2);
+
+      assertEquals(estimator2, estimatorList.get(0));
+      assertEquals(2, computedNDV);
+    }
+  }
+
+  @Test
+  public void testMergeNDVEstimatorsSecondNull() {
+    NumDistinctValueEstimator estimator1 =
+        NumDistinctValueEstimatorFactory.getNumDistinctValueEstimator(HLL_1.serialize());
+
+    for (ColumnStatsMerger<?> MERGER : MERGERS) {
+      List<NumDistinctValueEstimator> estimatorList = Arrays.asList(estimator1, null);
+      long computedNDV = MERGER.mergeNumDistinctValueEstimator("", estimatorList, 2, 1);
+
+      assertEquals(Arrays.asList(estimator1, null), estimatorList);
+      assertEquals(2, computedNDV);
+    }
+  }
+
+  @Test
+  public void testMergeNullNDVEstimators() {
+    List<NumDistinctValueEstimator> estimatorList = Arrays.asList(null, null);
+
+    for (ColumnStatsMerger<?> MERGER : MERGERS) {
+      long computedNDV = MERGER.mergeNumDistinctValueEstimator("", estimatorList, 1, 2);
+      assertEquals(2, computedNDV);
+      assertEquals(Arrays.asList(null, null), estimatorList);
+    }
+  }
+
+  protected static ColumnStatisticsObj createColumnStatisticsObj(ColumnStatisticsData columnStatisticsData) {
+    ColumnStatisticsObj columnStatisticsObj = new ColumnStatisticsObj();
+    columnStatisticsObj.setStatsData(columnStatisticsData);
+    return columnStatisticsObj;
   }
 }
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/DateColumnStatsMergerTest.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/DateColumnStatsMergerTest.java
index e41339d84b1..e4ff7698fba 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/DateColumnStatsMergerTest.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/DateColumnStatsMergerTest.java
@@ -23,11 +23,15 @@ import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Date;
+import org.apache.hadoop.hive.metastore.columnstats.ColStatsBuilder;
 import org.apache.hadoop.hive.metastore.columnstats.cache.DateColumnStatsDataInspector;
-import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.apache.hadoop.hive.metastore.columnstats.merge.ColumnStatsMergerTest.createColumnStatisticsObj;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
 @Category(MetastoreUnitTest.class)
 public class DateColumnStatsMergerTest {
 
@@ -35,74 +39,210 @@ public class DateColumnStatsMergerTest {
   private static final Date DATE_2 = new Date(2);
   private static final Date DATE_3 = new Date(3);
 
-  private ColumnStatsMerger merger = new DateColumnStatsMerger();
-
-  @Test
-  public void testMergeNullMinMaxValues() {
-    ColumnStatisticsObj old = new ColumnStatisticsObj();
-    createData(old, null, null);
+  private static final DateColumnStatsDataInspector DATA_1 = new DateColumnStatsDataInspector();
+  private static final DateColumnStatsDataInspector DATA_2 = new DateColumnStatsDataInspector();
+  private static final DateColumnStatsDataInspector DATA_3 = new DateColumnStatsDataInspector();
+
+  static {
+    DATA_1.setLowValue(DATE_1);
+    DATA_1.setHighValue(DATE_1);
+    DATA_2.setLowValue(DATE_2);
+    DATA_2.setHighValue(DATE_2);
+    DATA_3.setLowValue(DATE_3);
+    DATA_3.setHighValue(DATE_3);
+  }
 
-    merger.merge(old, old);
+  private final DateColumnStatsMerger merger = new DateColumnStatsMerger();
 
-    Assert.assertNull(old.getStatsData().getDateStats().getLowValue());
-    Assert.assertNull(old.getStatsData().getDateStats().getHighValue());
+  @Test
+  public void testMergeNullValues() {
+    ColumnStatisticsObj aggrObj = createColumnStatisticsObj(new ColStatsBuilder<>(Date.class)
+        .low(null)
+        .high(null)
+        .numNulls(1)
+        .numDVs(0)
+        .build());
+    merger.merge(aggrObj, aggrObj);
+
+    ColumnStatisticsData expectedColumnStatisticsData = new ColStatsBuilder<>(Date.class)
+        .low(null)
+        .high(null)
+        .numNulls(2)
+        .numDVs(0)
+        .build();
+
+    assertEquals(expectedColumnStatisticsData, aggrObj.getStatsData());
   }
 
   @Test
-  public void testMergeNulls() {
-    ColumnStatisticsObj oldObj = new ColumnStatisticsObj();
-    createData(oldObj, null, null);
-
-    ColumnStatisticsObj newObj;
-
-    newObj = new ColumnStatisticsObj();
-    createData(newObj, null, null);
-    merger.merge(oldObj, newObj);
-
-    Assert.assertEquals(null, oldObj.getStatsData().getDateStats().getLowValue());
-    Assert.assertEquals(null, oldObj.getStatsData().getDateStats().getHighValue());
+  public void testMergeNullWithNonNullValues() {
+    ColumnStatisticsObj aggrObj = createColumnStatisticsObj(new ColStatsBuilder<>(Date.class)
+        .low(null)
+        .high(null)
+        .numNulls(0)
+        .numDVs(0)
+        .build());
+    ColumnStatisticsObj newObj = createColumnStatisticsObj(new ColStatsBuilder<>(Date.class)
+        .low(DATE_1)
+        .high(DATE_3)
+        .numNulls(4)
+        .numDVs(2)
+        .hll(DATE_1.getDaysSinceEpoch(), DATE_3.getDaysSinceEpoch(), DATE_3.getDaysSinceEpoch())
+        .kll(DATE_1.getDaysSinceEpoch(), DATE_3.getDaysSinceEpoch(), DATE_3.getDaysSinceEpoch())
+        .build());
+    merger.merge(aggrObj, newObj);
+
+    ColumnStatisticsData expectedColumnStatisticsData = new ColStatsBuilder<>(Date.class)
+        .low(DATE_1)
+        .high(DATE_3)
+        .numNulls(4)
+        .numDVs(2)
+        .hll(DATE_1.getDaysSinceEpoch(), DATE_3.getDaysSinceEpoch(), DATE_3.getDaysSinceEpoch())
+        .kll(DATE_1.getDaysSinceEpoch(), DATE_3.getDaysSinceEpoch(), DATE_3.getDaysSinceEpoch())
+        .build();
+    assertEquals(expectedColumnStatisticsData, aggrObj.getStatsData());
+  }
 
-    newObj = new ColumnStatisticsObj();
-    createData(newObj, DATE_1, DATE_3);
-    merger.merge(oldObj, newObj);
+  @Test
+  public void testMergeNonNullWithNullValues() {
+    ColumnStatisticsObj aggrObj = createColumnStatisticsObj(new ColStatsBuilder<>(Date.class)
+        .low(DATE_1)
+        .high(DATE_3)
+        .numNulls(4)
+        .numDVs(2)
+        .hll(DATE_1.getDaysSinceEpoch(), DATE_3.getDaysSinceEpoch(), DATE_3.getDaysSinceEpoch())
+        .kll(DATE_1.getDaysSinceEpoch(), DATE_3.getDaysSinceEpoch(), DATE_3.getDaysSinceEpoch())
+        .build());
+
+    ColumnStatisticsObj newObj = createColumnStatisticsObj(new ColStatsBuilder<>(Date.class)
+        .low(null)
+        .high(null)
+        .numNulls(2)
+        .numDVs(0)
+        .build());
+    merger.merge(aggrObj, newObj);
+
+    ColumnStatisticsData expectedColumnStatisticsData = new ColStatsBuilder<>(Date.class)
+        .low(DATE_1)
+        .high(DATE_3)
+        .numNulls(6)
+        .numDVs(2)
+        .hll(DATE_1.getDaysSinceEpoch(), DATE_3.getDaysSinceEpoch(), DATE_3.getDaysSinceEpoch())
+        .kll(DATE_1.getDaysSinceEpoch(), DATE_3.getDaysSinceEpoch(), DATE_3.getDaysSinceEpoch())
+        .build();
+    assertEquals(expectedColumnStatisticsData, aggrObj.getStatsData());
+  }
+  
+  @Test
+  public void testMergeNonNullValues() {
+    ColumnStatisticsObj aggrObj = createColumnStatisticsObj(new ColStatsBuilder<>(Date.class)
+        .low(DATE_2)
+        .high(DATE_2)
+        .numNulls(2)
+        .numDVs(1)
+        .hll(DATE_2.getDaysSinceEpoch())
+        .kll(DATE_2.getDaysSinceEpoch())
+        .build());
+    ColumnStatisticsObj newObj = createColumnStatisticsObj(new ColStatsBuilder<>(Date.class)
+        .low(DATE_3)
+        .high(DATE_3)
+        .numNulls(3)
+        .numDVs(1)
+        .hll(DATE_3.getDaysSinceEpoch())
+        .kll(DATE_3.getDaysSinceEpoch())
+        .build());
+    merger.merge(aggrObj, newObj);
+
+    newObj = createColumnStatisticsObj(new ColStatsBuilder<>(Date.class)
+        .low(DATE_1)
+        .high(DATE_1)
+        .numNulls(1)
+        .numDVs(1)
+        .hll(DATE_1.getDaysSinceEpoch(), DATE_1.getDaysSinceEpoch())
+        .kll(DATE_1.getDaysSinceEpoch(), DATE_1.getDaysSinceEpoch())
+        .build());
+    merger.merge(aggrObj, newObj);
+
+    ColumnStatisticsData expectedColumnStatisticsData = new ColStatsBuilder<>(Date.class)
+        .low(DATE_1)
+        .high(DATE_3)
+        .numNulls(6)
+        .numDVs(3)
+        .hll(DATE_2.getDaysSinceEpoch(), DATE_3.getDaysSinceEpoch(),
+            DATE_1.getDaysSinceEpoch(), DATE_1.getDaysSinceEpoch())
+        .kll(DATE_2.getDaysSinceEpoch(), DATE_3.getDaysSinceEpoch(),
+            DATE_1.getDaysSinceEpoch(), DATE_1.getDaysSinceEpoch())
+        .build();
+    assertEquals(expectedColumnStatisticsData, aggrObj.getStatsData());
+  }
 
-    newObj = new ColumnStatisticsObj();
-    createData(newObj, null, null);
-    merger.merge(oldObj, newObj);
+  @Test
+  public void testCompareSimple() {
+    DateColumnStatsDataInspector data1 = new DateColumnStatsDataInspector(DATA_1);
+    DateColumnStatsDataInspector data2 = new DateColumnStatsDataInspector(DATA_2);
+    assertEquals(DATE_2, merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)));
+  }
 
-    Assert.assertEquals(DATE_1, oldObj.getStatsData().getDateStats().getLowValue());
-    Assert.assertEquals(DATE_3, oldObj.getStatsData().getDateStats().getHighValue());
+  @Test
+  public void testCompareSimpleFlipped() {
+    DateColumnStatsDataInspector data1 = new DateColumnStatsDataInspector(DATA_2);
+    DateColumnStatsDataInspector data2 = new DateColumnStatsDataInspector(DATA_1);
+    assertEquals(DATE_2, merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)));
   }
 
   @Test
-  public void testMergeNonNullAndNullLowerValuesNewIsNull() {
-    ColumnStatisticsObj oldObj = new ColumnStatisticsObj();
-    createData(oldObj, DATE_2, DATE_2);
+  public void testCompareSimpleReversed() {
+    DateColumnStatsDataInspector data1 = new DateColumnStatsDataInspector(DATA_1);
+    DateColumnStatsDataInspector data2 = new DateColumnStatsDataInspector(DATA_2);
+    assertEquals(DATE_1, merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)));
+  }
 
-    ColumnStatisticsObj newObj;
+  @Test
+  public void testCompareSimpleFlippedReversed() {
+    DateColumnStatsDataInspector data1 = new DateColumnStatsDataInspector(DATA_2);
+    DateColumnStatsDataInspector data2 = new DateColumnStatsDataInspector(DATA_1);
+    assertEquals(DATE_1, merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)));
+  }
 
-    newObj = new ColumnStatisticsObj();
-    createData(newObj, DATE_3, DATE_3);
-    merger.merge(oldObj, newObj);
+  @Test
+  public void testCompareNullsMin() {
+    DateColumnStatsDataInspector data1 = new DateColumnStatsDataInspector();
+    DateColumnStatsDataInspector data2 = new DateColumnStatsDataInspector();
+    assertNull(merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)));
+  }
 
-    newObj = new ColumnStatisticsObj();
-    createData(newObj, DATE_1, DATE_1);
-    merger.merge(oldObj, newObj);
+  @Test
+  public void testCompareNullsMax() {
+    DateColumnStatsDataInspector data1 = new DateColumnStatsDataInspector();
+    DateColumnStatsDataInspector data2 = new DateColumnStatsDataInspector();
+    assertNull(merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)));
+  }
 
-    Assert.assertEquals(DATE_1, oldObj.getStatsData().getDateStats().getLowValue());
-    Assert.assertEquals(DATE_3, oldObj.getStatsData().getDateStats().getHighValue());
+  @Test
+  public void testCompareFirstNullMin() {
+    DateColumnStatsDataInspector data1 = new DateColumnStatsDataInspector();
+    DateColumnStatsDataInspector data2 = new DateColumnStatsDataInspector(DATA_1);
+    assertEquals(DATE_1, merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)));
   }
 
-  private DateColumnStatsDataInspector createData(ColumnStatisticsObj objNulls, Date lowValue,
-      Date highValue) {
-    ColumnStatisticsData statisticsData = new ColumnStatisticsData();
-    DateColumnStatsDataInspector data = new DateColumnStatsDataInspector();
+  @Test
+  public void testCompareSecondNullMin() {
+    DateColumnStatsDataInspector data1 = new DateColumnStatsDataInspector(DATA_1);
+    DateColumnStatsDataInspector data2 = new DateColumnStatsDataInspector();
+    assertEquals(DATE_1, merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)));
+  }
 
-    statisticsData.setDateStats(data);
-    objNulls.setStatsData(statisticsData);
+  @Test
+  public void testCompareFirstNullMax() {
+    DateColumnStatsDataInspector data1 = new DateColumnStatsDataInspector(DATA_1);
+    DateColumnStatsDataInspector data2 = new DateColumnStatsDataInspector();
+    assertEquals(DATE_1, merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)));
+  }
 
-    data.setLowValue(lowValue);
-    data.setHighValue(highValue);
-    return data;
+  @Test
+  public void testCompareSecondNullMax() {
+    DateColumnStatsDataInspector data1 = new DateColumnStatsDataInspector();
+    DateColumnStatsDataInspector data2 = new DateColumnStatsDataInspector(DATA_1);
+    assertEquals(DATE_1, merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)));
   }
 }
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/DecimalColumnStatsMergerTest.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/DecimalColumnStatsMergerTest.java
index a9d55eadf04..7e19cbfcad3 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/DecimalColumnStatsMergerTest.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/DecimalColumnStatsMergerTest.java
@@ -24,14 +24,21 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Decimal;
 import org.apache.hadoop.hive.metastore.api.utils.DecimalUtils;
+import org.apache.hadoop.hive.metastore.columnstats.ColStatsBuilder;
 import org.apache.hadoop.hive.metastore.columnstats.cache.DecimalColumnStatsDataInspector;
-import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.apache.hadoop.hive.metastore.columnstats.merge.ColumnStatsMergerTest.createColumnStatisticsObj;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 @Category(MetastoreUnitTest.class)
 public class DecimalColumnStatsMergerTest {
 
+  private static final Decimal DECIMAL_1 = DecimalUtils.getDecimal(1, 0);
   private static final Decimal DECIMAL_3 = DecimalUtils.getDecimal(3, 0);
   private static final Decimal DECIMAL_5 = DecimalUtils.getDecimal(5, 0);
   private static final Decimal DECIMAL_20 = DecimalUtils.getDecimal(2, 1);
@@ -49,231 +56,212 @@ public class DecimalColumnStatsMergerTest {
     DATA_20.setHighValue(DECIMAL_20);
   }
 
-  private DecimalColumnStatsMerger merger = new DecimalColumnStatsMerger();
-
-  @Test
-  public void testMergeNullMinMaxValues() {
-    ColumnStatisticsObj objNulls = new ColumnStatisticsObj();
-    createData(objNulls, null, null);
-
-    merger.merge(objNulls, objNulls);
-
-    Assert.assertNull(objNulls.getStatsData().getDecimalStats().getLowValue());
-    Assert.assertNull(objNulls.getStatsData().getDecimalStats().getHighValue());
-  }
-
-  @Test
-  public void testMergeNonNullAndNullLowerValuesOldIsNull() {
-    ColumnStatisticsObj oldObj = new ColumnStatisticsObj();
-    createData(oldObj, null, null);
-
-    ColumnStatisticsObj newObj = new ColumnStatisticsObj();
-    createData(newObj, DECIMAL_3, null);
-
-    merger.merge(oldObj, newObj);
-
-    Assert.assertEquals(DECIMAL_3, oldObj.getStatsData().getDecimalStats().getLowValue());
-  }
-
-  @Test
-  public void testMergeNonNullAndNullLowerValuesNewIsNull() {
-    ColumnStatisticsObj oldObj = new ColumnStatisticsObj();
-    createData(oldObj, DECIMAL_3, null);
-
-    ColumnStatisticsObj newObj = new ColumnStatisticsObj();
-    createData(newObj, null, null);
-
-    merger.merge(oldObj, newObj);
-
-    Assert.assertEquals(DECIMAL_3, oldObj.getStatsData().getDecimalStats().getLowValue());
-  }
-
-  @Test
-  public void testMergeNonNullAndNullHigherValuesOldIsNull() {
-    ColumnStatisticsObj oldObj = new ColumnStatisticsObj();
-    createData(oldObj, null, null);
-
-    ColumnStatisticsObj newObj = new ColumnStatisticsObj();
-    createData(newObj, null, DECIMAL_3);
-
-    merger.merge(oldObj, newObj);
-
-    Assert.assertEquals(DECIMAL_3, oldObj.getStatsData().getDecimalStats().getHighValue());
-  }
-
-  @Test
-  public void testMergeNonNullAndNullHigherValuesNewIsNull() {
-    ColumnStatisticsObj oldObj = new ColumnStatisticsObj();
-    createData(oldObj, null, DECIMAL_3);
-
-    ColumnStatisticsObj newObj = new ColumnStatisticsObj();
-    createData(newObj, null, null);
-
-    merger.merge(oldObj, newObj);
-
-    Assert.assertEquals(DECIMAL_3, oldObj.getStatsData().getDecimalStats().getHighValue());
-  }
+  private final DecimalColumnStatsMerger merger = new DecimalColumnStatsMerger();
 
   @Test
-  public void testMergeLowValuesFirstWins() {
-    ColumnStatisticsObj oldObj = new ColumnStatisticsObj();
-    createData(oldObj, DECIMAL_3, null);
-
-    ColumnStatisticsObj newObj = new ColumnStatisticsObj();
-    createData(newObj, DECIMAL_5, null);
-
-    merger.merge(oldObj, newObj);
-
-    Assert.assertEquals(DECIMAL_3, oldObj.getStatsData().getDecimalStats().getLowValue());
+  public void testMergeNullValues() {
+    ColumnStatisticsObj aggrObj = createColumnStatisticsObj(new ColStatsBuilder<>(Decimal.class)
+        .low(null)
+        .high(null)
+        .numNulls(1)
+        .numDVs(0)
+        .build());
+    merger.merge(aggrObj, aggrObj);
+
+    ColumnStatisticsData expectedColumnStatisticsData = new ColStatsBuilder<>(Decimal.class)
+        .low(null)
+        .high(null)
+        .numNulls(2)
+        .numDVs(0)
+        .build();
+
+    assertEquals(expectedColumnStatisticsData, aggrObj.getStatsData());
   }
 
   @Test
-  public void testMergeLowValuesSecondWins() {
-    ColumnStatisticsObj oldObj = new ColumnStatisticsObj();
-    createData(oldObj, DECIMAL_5, null);
-
-    ColumnStatisticsObj newObj = new ColumnStatisticsObj();
-    createData(newObj, DECIMAL_3, null);
-
-    merger.merge(oldObj, newObj);
-
-    Assert.assertEquals(DECIMAL_3, oldObj.getStatsData().getDecimalStats().getLowValue());
+  public void testMergeNullWithNonNullValues() {
+    ColumnStatisticsObj aggrObj = createColumnStatisticsObj(new ColStatsBuilder<>(Decimal.class)
+        .low(null)
+        .high(null)
+        .numNulls(0)
+        .numDVs(0)
+        .build());
+    ColumnStatisticsObj newObj = createColumnStatisticsObj(new ColStatsBuilder<>(Decimal.class)
+        .low(DECIMAL_1)
+        .high(DECIMAL_3)
+        .numNulls(4)
+        .numDVs(2)
+        .hll(1, 3, 3)
+        .kll(1, 3, 3)
+        .build());
+    merger.merge(aggrObj, newObj);
+
+    ColumnStatisticsData expectedColumnStatisticsData = new ColStatsBuilder<>(Decimal.class)
+        .low(DECIMAL_1)
+        .high(DECIMAL_3)
+        .numNulls(4)
+        .numDVs(2)
+        .hll(1, 3, 3)
+        .kll(1, 3, 3)
+        .build();
+    assertEquals(expectedColumnStatisticsData, aggrObj.getStatsData());
   }
 
   @Test
-  public void testMergeHighValuesFirstWins() {
-    ColumnStatisticsObj oldObj = new ColumnStatisticsObj();
-    createData(oldObj, null, DECIMAL_5);
-
-    ColumnStatisticsObj newObj = new ColumnStatisticsObj();
-    createData(newObj, null, DECIMAL_3);
-
-    merger.merge(oldObj, newObj);
-
-    Assert.assertEquals(DECIMAL_5, oldObj.getStatsData().getDecimalStats().getHighValue());
+  public void testMergeNonNullWithNullValues() {
+    ColumnStatisticsObj aggrObj = createColumnStatisticsObj(new ColStatsBuilder<>(Decimal.class)
+        .low(DECIMAL_1)
+        .high(DECIMAL_3)
+        .numNulls(4)
+        .numDVs(2)
+        .hll(1, 3, 3)
+        .kll(1, 3, 3)
+        .build());
+
+    ColumnStatisticsObj newObj = createColumnStatisticsObj(new ColStatsBuilder<>(Decimal.class)
+        .low(null)
+        .high(null)
+        .numNulls(2)
+        .numDVs(0)
+        .build());
+    merger.merge(aggrObj, newObj);
+
+    ColumnStatisticsData expectedColumnStatisticsData = new ColStatsBuilder<>(Decimal.class)
+        .low(DECIMAL_1)
+        .high(DECIMAL_3)
+        .numNulls(6)
+        .numDVs(2)
+        .hll(1, 3, 3)
+        .kll(1, 3, 3)
+        .build();
+    assertEquals(expectedColumnStatisticsData, aggrObj.getStatsData());
   }
 
   @Test
-  public void testMergeHighValuesSecondWins() {
-    ColumnStatisticsObj oldObj = new ColumnStatisticsObj();
-    createData(oldObj, null, DECIMAL_3);
-
-    ColumnStatisticsObj newObj = new ColumnStatisticsObj();
-    createData(newObj, null, DECIMAL_5);
-
-    merger.merge(oldObj, newObj);
-
-    Assert.assertEquals(DECIMAL_5, oldObj.getStatsData().getDecimalStats().getHighValue());
+  public void testMergeNonNullValues() {
+    ColumnStatisticsObj aggrObj = createColumnStatisticsObj(new ColStatsBuilder<>(Decimal.class)
+        .low(DECIMAL_1)
+        .high(DECIMAL_1)
+        .numNulls(2)
+        .numDVs(1)
+        .hll(2)
+        .kll(2)
+        .build());
+    ColumnStatisticsObj newObj = createColumnStatisticsObj(new ColStatsBuilder<>(Decimal.class)
+        .low(DECIMAL_3)
+        .high(DECIMAL_3)
+        .numNulls(3)
+        .numDVs(1)
+        .hll(3)
+        .kll(3)
+        .build());
+    merger.merge(aggrObj, newObj);
+
+    newObj = createColumnStatisticsObj(new ColStatsBuilder<>(Decimal.class)
+        .low(DECIMAL_1)
+        .high(DECIMAL_1)
+        .numNulls(1)
+        .numDVs(1)
+        .hll(1, 1)
+        .kll(1, 1)
+        .build());
+    merger.merge(aggrObj, newObj);
+
+    ColumnStatisticsData expectedColumnStatisticsData = new ColStatsBuilder<>(Decimal.class)
+        .low(DECIMAL_1)
+        .high(DECIMAL_3)
+        .numNulls(6)
+        .numDVs(3)
+        .hll(2, 3, 1, 1)
+        .kll(2, 3, 1, 1)
+        .build();
+    assertEquals(expectedColumnStatisticsData, aggrObj.getStatsData());
   }
 
   @Test
   public void testDecimalCompareEqual() {
-    Assert.assertTrue(DECIMAL_3.equals(DECIMAL_3));
+    assertTrue(DECIMAL_3.equals(DECIMAL_3));
   }
 
   @Test
   public void testDecimalCompareDoesntEqual() {
-    Assert.assertTrue(!DECIMAL_3.equals(DECIMAL_5));
+    assertFalse(DECIMAL_3.equals(DECIMAL_5));
   }
 
   @Test
   public void testCompareSimple() {
     DecimalColumnStatsDataInspector data1 = new DecimalColumnStatsDataInspector(DATA_3);
     DecimalColumnStatsDataInspector data2 = new DecimalColumnStatsDataInspector(DATA_5);
-    merger.setHighValue(data1, data2);
-    Assert.assertEquals(DECIMAL_5, data1.getHighValue());
+    assertEquals(DECIMAL_5, merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)));
   }
 
   @Test
   public void testCompareSimpleFlipped() {
     DecimalColumnStatsDataInspector data1 = new DecimalColumnStatsDataInspector(DATA_5);
     DecimalColumnStatsDataInspector data2 = new DecimalColumnStatsDataInspector(DATA_3);
-    merger.setHighValue(data1, data2);
-    Assert.assertEquals(DECIMAL_5, data1.getHighValue());
+    assertEquals(DECIMAL_5, merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)));
   }
 
   @Test
   public void testCompareSimpleReversed() {
     DecimalColumnStatsDataInspector data1 = new DecimalColumnStatsDataInspector(DATA_3);
     DecimalColumnStatsDataInspector data2 = new DecimalColumnStatsDataInspector(DATA_5);
-    merger.setLowValue(data1, data2);
-    Assert.assertEquals(DECIMAL_3, data1.getLowValue());
+    assertEquals(DECIMAL_3, merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)));
   }
 
   @Test
   public void testCompareSimpleFlippedReversed() {
     DecimalColumnStatsDataInspector data1 = new DecimalColumnStatsDataInspector(DATA_5);
     DecimalColumnStatsDataInspector data2 = new DecimalColumnStatsDataInspector(DATA_3);
-    merger.setLowValue(data1, data2);
-    Assert.assertEquals(DECIMAL_3, data1.getLowValue());
+    assertEquals(DECIMAL_3, merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)));
   }
 
   @Test
   public void testCompareUnscaledValue() {
     DecimalColumnStatsDataInspector data1 = new DecimalColumnStatsDataInspector(DATA_3);
     DecimalColumnStatsDataInspector data2 = new DecimalColumnStatsDataInspector(DATA_20);
-    merger.setHighValue(data1, data2);
-    Assert.assertEquals(DECIMAL_20, data1.getHighValue());
+    assertEquals(DECIMAL_20, merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)));
   }
 
   @Test
   public void testCompareNullsMin() {
     DecimalColumnStatsDataInspector data1 = new DecimalColumnStatsDataInspector();
     DecimalColumnStatsDataInspector data2 = new DecimalColumnStatsDataInspector();
-    merger.setLowValue(data1, data2);
-    Assert.assertNull(data1.getLowValue());
+    assertNull(merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)));
   }
 
   @Test
   public void testCompareNullsMax() {
     DecimalColumnStatsDataInspector data1 = new DecimalColumnStatsDataInspector();
     DecimalColumnStatsDataInspector data2 = new DecimalColumnStatsDataInspector();
-    merger.setHighValue(data1, data2);
-    Assert.assertNull(data1.getHighValue());
+    assertNull(merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)));
   }
 
   @Test
   public void testCompareFirstNullMin() {
     DecimalColumnStatsDataInspector data1 = new DecimalColumnStatsDataInspector();
     DecimalColumnStatsDataInspector data2 = new DecimalColumnStatsDataInspector(DATA_3);
-    merger.setLowValue(data1, data2);
-    Assert.assertEquals(DECIMAL_3, data1.getLowValue());
+    assertEquals(DECIMAL_3, merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)));
   }
 
   @Test
   public void testCompareSecondNullMin() {
     DecimalColumnStatsDataInspector data1 = new DecimalColumnStatsDataInspector(DATA_3);
     DecimalColumnStatsDataInspector data2 = new DecimalColumnStatsDataInspector();
-    merger.setLowValue(data1, data2);
-    Assert.assertEquals(DECIMAL_3, data1.getLowValue());
+    assertEquals(DECIMAL_3, merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)));
   }
 
   @Test
   public void testCompareFirstNullMax() {
     DecimalColumnStatsDataInspector data1 = new DecimalColumnStatsDataInspector(DATA_3);
     DecimalColumnStatsDataInspector data2 = new DecimalColumnStatsDataInspector();
-    merger.setHighValue(data1, data2);
-    Assert.assertEquals(DECIMAL_3, data1.getHighValue());
+    assertEquals(DECIMAL_3, merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)));
   }
 
   @Test
   public void testCompareSecondNullMax() {
     DecimalColumnStatsDataInspector data1 = new DecimalColumnStatsDataInspector();
     DecimalColumnStatsDataInspector data2 = new DecimalColumnStatsDataInspector(DATA_3);
-    merger.setHighValue(data1, data2);
-    Assert.assertEquals(DECIMAL_3, data1.getHighValue());
-  }
-
-  private DecimalColumnStatsDataInspector createData(ColumnStatisticsObj objNulls, Decimal lowValue,
-      Decimal highValue) {
-    ColumnStatisticsData statisticsData = new ColumnStatisticsData();
-    DecimalColumnStatsDataInspector data = new DecimalColumnStatsDataInspector();
-
-    statisticsData.setDecimalStats(data);
-    objNulls.setStatsData(statisticsData);
-
-    data.setLowValue(lowValue);
-    data.setHighValue(highValue);
-    return data;
+    assertEquals(DECIMAL_3, merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)));
   }
 }
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/DoubleColumnStatsMergerTest.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/DoubleColumnStatsMergerTest.java
new file mode 100644
index 00000000000..9ecc960771b
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/DoubleColumnStatsMergerTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.merge;
+
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.columnstats.ColStatsBuilder;
+import org.apache.hadoop.hive.metastore.columnstats.cache.DoubleColumnStatsDataInspector;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.apache.hadoop.hive.metastore.columnstats.merge.ColumnStatsMergerTest.createColumnStatisticsObj;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+@Category(MetastoreUnitTest.class)
+public class DoubleColumnStatsMergerTest {
+  private final DoubleColumnStatsMerger merger = new DoubleColumnStatsMerger();
+
+  private static final DoubleColumnStatsDataInspector DATA_1 = new DoubleColumnStatsDataInspector();
+  private static final DoubleColumnStatsDataInspector DATA_2 = new DoubleColumnStatsDataInspector();
+  private static final DoubleColumnStatsDataInspector DATA_3 = new DoubleColumnStatsDataInspector();
+
+  static {
+    DATA_1.setLowValue(1d);
+    DATA_1.setHighValue(1d);
+    DATA_2.setLowValue(2d);
+    DATA_2.setHighValue(2d);
+    DATA_3.setLowValue(3d);
+    DATA_3.setHighValue(3d);
+  }
+
+  @Test
+  public void testMergeNullValues() {
+    ColumnStatisticsObj aggrObj = createColumnStatisticsObj(new ColStatsBuilder<>(double.class)
+        .low(null)
+        .high(null)
+        .numNulls(1)
+        .numDVs(0)
+        .build());
+    merger.merge(aggrObj, aggrObj);
+
+    ColumnStatisticsData expectedColumnStatisticsData = new ColStatsBuilder<>(double.class)
+        .low(null)
+        .high(null)
+        .numNulls(2)
+        .numDVs(0)
+        .build();
+
+    assertEquals(expectedColumnStatisticsData, aggrObj.getStatsData());
+  }
+
+  @Test
+  public void testMergeNullWithNonNullValues() {
+    ColumnStatisticsObj aggrObj = createColumnStatisticsObj(new ColStatsBuilder<>(double.class)
+        .low(null)
+        .high(null)
+        .numNulls(0)
+        .numDVs(0)
+        .build());
+    ColumnStatisticsObj newObj = createColumnStatisticsObj(new ColStatsBuilder<>(double.class)
+        .low(1d)
+        .high(3d)
+        .numNulls(4)
+        .numDVs(2)
+        .hll(1d, 3d, 3d)
+        .kll(1d, 3d, 3d)
+        .build());
+    merger.merge(aggrObj, newObj);
+
+    ColumnStatisticsData expectedColumnStatisticsData = new ColStatsBuilder<>(double.class)
+        .low(1d)
+        .high(3d)
+        .numNulls(4)
+        .numDVs(2)
+        .hll(1d, 3d, 3d)
+        .kll(1d, 3d, 3d)
+        .build();
+    assertEquals(expectedColumnStatisticsData, aggrObj.getStatsData());
+  }
+
+  @Test
+  public void testMergeNonNullWithNullValues() {
+    ColumnStatisticsObj aggrObj = createColumnStatisticsObj(new ColStatsBuilder<>(double.class)
+        .low(1d)
+        .high(3d)
+        .numNulls(4)
+        .numDVs(2)
+        .hll(1d, 3d, 3d)
+        .kll(1d, 3d, 3d)
+        .build());
+
+    ColumnStatisticsObj newObj = createColumnStatisticsObj(new ColStatsBuilder<>(double.class)
+        .low(null)
+        .high(null)
+        .numNulls(2)
+        .numDVs(0)
+        .build());
+    merger.merge(aggrObj, newObj);
+
+    ColumnStatisticsData expectedColumnStatisticsData = new ColStatsBuilder<>(double.class)
+        .low(1d)
+        .high(3d)
+        .numNulls(6)
+        .numDVs(2)
+        .hll(1d, 3d, 3d)
+        .kll(1d, 3d, 3d)
+        .build();
+    assertEquals(expectedColumnStatisticsData, aggrObj.getStatsData());
+  }
+
+  @Test
+  public void testMergeNonNullValues() {
+    ColumnStatisticsObj aggrObj = createColumnStatisticsObj(new ColStatsBuilder<>(double.class)
+        .low(2d)
+        .high(2d)
+        .numNulls(2)
+        .numDVs(1)
+        .hll(2d)
+        .kll(2d)
+        .build());
+    ColumnStatisticsObj newObj = createColumnStatisticsObj(new ColStatsBuilder<>(double.class)
+        .low(3d)
+        .high(3d)
+        .numNulls(3)
+        .numDVs(1)
+        .hll(3d)
+        .kll(3d)
+        .build());
+    merger.merge(aggrObj, newObj);
+
+    newObj = createColumnStatisticsObj(new ColStatsBuilder<>(double.class)
+        .low(1d)
+        .high(1d)
+        .numNulls(1)
+        .numDVs(1)
+        .hll(1d, 1d)
+        .kll(1d, 1d)
+        .build());
+    merger.merge(aggrObj, newObj);
+
+    ColumnStatisticsData expectedColumnStatisticsData = new ColStatsBuilder<>(double.class)
+        .low(1d)
+        .high(3d)
+        .numNulls(6)
+        .numDVs(3)
+        .hll(2d, 3d, 1d, 1d)
+        .kll(2d, 3d, 1d, 1d)
+        .build();
+    assertEquals(expectedColumnStatisticsData, aggrObj.getStatsData());
+  }
+
+  @Test
+  public void testCompareSimple() {
+    DoubleColumnStatsDataInspector data1 = new DoubleColumnStatsDataInspector(DATA_1);
+    DoubleColumnStatsDataInspector data2 = new DoubleColumnStatsDataInspector(DATA_2);
+    assertEquals(2, merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)), Double.MIN_VALUE);
+  }
+
+  @Test
+  public void testCompareSimpleFlipped() {
+    DoubleColumnStatsDataInspector data1 = new DoubleColumnStatsDataInspector(DATA_2);
+    DoubleColumnStatsDataInspector data2 = new DoubleColumnStatsDataInspector(DATA_1);
+    assertEquals(2, merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)), Double.MIN_VALUE);
+  }
+
+  @Test
+  public void testCompareSimpleReversed() {
+    DoubleColumnStatsDataInspector data1 = new DoubleColumnStatsDataInspector(DATA_1);
+    DoubleColumnStatsDataInspector data2 = new DoubleColumnStatsDataInspector(DATA_2);
+    assertEquals(1, merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)), Double.MIN_VALUE);
+  }
+
+  @Test
+  public void testCompareSimpleFlippedReversed() {
+    DoubleColumnStatsDataInspector data1 = new DoubleColumnStatsDataInspector(DATA_2);
+    DoubleColumnStatsDataInspector data2 = new DoubleColumnStatsDataInspector(DATA_1);
+    assertEquals(1, merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)), Double.MIN_VALUE);
+  }
+
+  @Test
+  public void testCompareNullsMin() {
+    DoubleColumnStatsDataInspector data1 = new DoubleColumnStatsDataInspector();
+    DoubleColumnStatsDataInspector data2 = new DoubleColumnStatsDataInspector();
+    assertNull(merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)));
+  }
+
+  @Test
+  public void testCompareNullsMax() {
+    DoubleColumnStatsDataInspector data1 = new DoubleColumnStatsDataInspector();
+    DoubleColumnStatsDataInspector data2 = new DoubleColumnStatsDataInspector();
+    assertNull(merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)));
+  }
+
+  @Test
+  public void testCompareFirstNullMin() {
+    DoubleColumnStatsDataInspector data1 = new DoubleColumnStatsDataInspector();
+    DoubleColumnStatsDataInspector data2 = new DoubleColumnStatsDataInspector(DATA_1);
+    assertEquals(1, merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)), Double.MIN_VALUE);
+  }
+
+  @Test
+  public void testCompareSecondNullMin() {
+    DoubleColumnStatsDataInspector data1 = new DoubleColumnStatsDataInspector(DATA_1);
+    DoubleColumnStatsDataInspector data2 = new DoubleColumnStatsDataInspector();
+    assertEquals(1, merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)), Double.MIN_VALUE);
+  }
+
+  @Test
+  public void testCompareFirstNullMax() {
+    DoubleColumnStatsDataInspector data1 = new DoubleColumnStatsDataInspector(DATA_1);
+    DoubleColumnStatsDataInspector data2 = new DoubleColumnStatsDataInspector();
+    assertEquals(1, merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)), Double.MIN_VALUE);
+  }
+
+  @Test
+  public void testCompareSecondNullMax() {
+    DoubleColumnStatsDataInspector data1 = new DoubleColumnStatsDataInspector();
+    DoubleColumnStatsDataInspector data2 = new DoubleColumnStatsDataInspector(DATA_1);
+    assertEquals(1, merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)), Double.MIN_VALUE);
+  }
+}
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/LongColumnStatsMergerTest.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/LongColumnStatsMergerTest.java
new file mode 100644
index 00000000000..54a9574c918
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/LongColumnStatsMergerTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.merge;
+
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.columnstats.ColStatsBuilder;
+import org.apache.hadoop.hive.metastore.columnstats.cache.LongColumnStatsDataInspector;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.apache.hadoop.hive.metastore.columnstats.merge.ColumnStatsMergerTest.createColumnStatisticsObj;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+@Category(MetastoreUnitTest.class)
+public class LongColumnStatsMergerTest {
+  private final LongColumnStatsMerger merger = new LongColumnStatsMerger();
+  
+  private static final LongColumnStatsDataInspector DATA_1 = new LongColumnStatsDataInspector();
+  private static final LongColumnStatsDataInspector DATA_2 = new LongColumnStatsDataInspector();
+  private static final LongColumnStatsDataInspector DATA_3 = new LongColumnStatsDataInspector();
+
+  static {
+    DATA_1.setLowValue(1);
+    DATA_1.setHighValue(1);
+    DATA_2.setLowValue(2);
+    DATA_2.setHighValue(2);
+    DATA_3.setLowValue(3);
+    DATA_3.setHighValue(3);
+  }
+
+  @Test
+  public void testMergeNullValues() {
+    ColumnStatisticsObj aggrObj = createColumnStatisticsObj(new ColStatsBuilder<>(long.class)
+        .low(null)
+        .high(null)
+        .numNulls(1)
+        .numDVs(0)
+        .build());
+    merger.merge(aggrObj, aggrObj);
+
+    ColumnStatisticsData expectedColumnStatisticsData = new ColStatsBuilder<>(long.class)
+        .low(null)
+        .high(null)
+        .numNulls(2)
+        .numDVs(0)
+        .build();
+
+    assertEquals(expectedColumnStatisticsData, aggrObj.getStatsData());
+  }
+
+  @Test
+  public void testMergeNullWithNonNullValues() {
+    ColumnStatisticsObj aggrObj = createColumnStatisticsObj(new ColStatsBuilder<>(long.class)
+        .low(null)
+        .high(null)
+        .numNulls(0)
+        .numDVs(0)
+        .build());
+    ColumnStatisticsObj newObj = createColumnStatisticsObj(new ColStatsBuilder<>(long.class)
+        .low(1L)
+        .high(3L)
+        .numNulls(4)
+        .numDVs(2)
+        .hll(1, 3, 3)
+        .kll(1, 3, 3)
+        .build());
+    merger.merge(aggrObj, newObj);
+
+    ColumnStatisticsData expectedColumnStatisticsData = new ColStatsBuilder<>(long.class)
+        .low(1L)
+        .high(3L)
+        .numNulls(4)
+        .numDVs(2)
+        .hll(1, 3, 3)
+        .kll(1, 3, 3)
+        .build();
+    assertEquals(expectedColumnStatisticsData, aggrObj.getStatsData());
+  }
+
+  @Test
+  public void testMergeNonNullWithNullValues() {
+    ColumnStatisticsObj aggrObj = createColumnStatisticsObj(new ColStatsBuilder<>(long.class)
+        .low(1L)
+        .high(3L)
+        .numNulls(4)
+        .numDVs(2)
+        .hll(1, 3, 3)
+        .kll(1, 3, 3)
+        .build());
+
+    ColumnStatisticsObj newObj = createColumnStatisticsObj(new ColStatsBuilder<>(long.class)
+        .low(null)
+        .high(null)
+        .numNulls(2)
+        .numDVs(0)
+        .build());
+    merger.merge(aggrObj, newObj);
+
+    ColumnStatisticsData expectedColumnStatisticsData = new ColStatsBuilder<>(long.class)
+        .low(1L)
+        .high(3L)
+        .numNulls(6)
+        .numDVs(2)
+        .hll(1, 3, 3)
+        .kll(1, 3, 3)
+        .build();
+    assertEquals(expectedColumnStatisticsData, aggrObj.getStatsData());
+  }
+
+  @Test
+  public void testMergeNonNullValues() {
+    ColumnStatisticsObj aggrObj = createColumnStatisticsObj(new ColStatsBuilder<>(long.class)
+        .low(2L)
+        .high(2L)
+        .numNulls(2)
+        .numDVs(1)
+        .hll(2L)
+        .kll(2L)
+        .build());
+    ColumnStatisticsObj newObj = createColumnStatisticsObj(new ColStatsBuilder<>(long.class)
+        .low(3L)
+        .high(3L)
+        .numNulls(3)
+        .numDVs(1)
+        .hll(3L)
+        .kll(3L)
+        .build());
+    merger.merge(aggrObj, newObj);
+
+    newObj = createColumnStatisticsObj(new ColStatsBuilder<>(long.class)
+        .low(1L)
+        .high(1L)
+        .numNulls(1)
+        .numDVs(1)
+        .hll(1L, 1L)
+        .kll(1L, 1L)
+        .build());
+    merger.merge(aggrObj, newObj);
+
+    ColumnStatisticsData expectedColumnStatisticsData = new ColStatsBuilder<>(long.class)
+        .low(1L)
+        .high(3L)
+        .numNulls(6)
+        .numDVs(3)
+        .hll(2L, 3L, 1L, 1L)
+        .kll(2L, 3L, 1L, 1L)
+        .build();
+    assertEquals(expectedColumnStatisticsData, aggrObj.getStatsData());
+  }
+
+  @Test
+  public void testCompareSimple() {
+    LongColumnStatsDataInspector data1 = new LongColumnStatsDataInspector(DATA_1);
+    LongColumnStatsDataInspector data2 = new LongColumnStatsDataInspector(DATA_2);
+    assertEquals(2, (long)  merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)));
+  }
+
+  @Test
+  public void testCompareSimpleFlipped() {
+    LongColumnStatsDataInspector data1 = new LongColumnStatsDataInspector(DATA_2);
+    LongColumnStatsDataInspector data2 = new LongColumnStatsDataInspector(DATA_1);
+    assertEquals(2, (long) merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)));
+  }
+
+  @Test
+  public void testCompareSimpleReversed() {
+    LongColumnStatsDataInspector data1 = new LongColumnStatsDataInspector(DATA_1);
+    LongColumnStatsDataInspector data2 = new LongColumnStatsDataInspector(DATA_2);
+    assertEquals(1, (long) merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)));
+  }
+
+  @Test
+  public void testCompareSimpleFlippedReversed() {
+    LongColumnStatsDataInspector data1 = new LongColumnStatsDataInspector(DATA_2);
+    LongColumnStatsDataInspector data2 = new LongColumnStatsDataInspector(DATA_1);
+    assertEquals(1, (long)  merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)));
+  }
+
+  @Test
+  public void testCompareNullsMin() {
+    LongColumnStatsDataInspector data1 = new LongColumnStatsDataInspector();
+    LongColumnStatsDataInspector data2 = new LongColumnStatsDataInspector();
+    assertNull(merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)));
+  }
+
+  @Test
+  public void testCompareNullsMax() {
+    LongColumnStatsDataInspector data1 = new LongColumnStatsDataInspector();
+    LongColumnStatsDataInspector data2 = new LongColumnStatsDataInspector();
+    assertNull(merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)));
+  }
+
+  @Test
+  public void testCompareFirstNullMin() {
+    LongColumnStatsDataInspector data1 = new LongColumnStatsDataInspector();
+    LongColumnStatsDataInspector data2 = new LongColumnStatsDataInspector(DATA_1);
+    assertEquals(1, (long)  merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)));
+  }
+
+  @Test
+  public void testCompareSecondNullMin() {
+    LongColumnStatsDataInspector data1 = new LongColumnStatsDataInspector(DATA_1);
+    LongColumnStatsDataInspector data2 = new LongColumnStatsDataInspector();
+    assertEquals(1, (long)  merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)));
+  }
+
+  @Test
+  public void testCompareFirstNullMax() {
+    LongColumnStatsDataInspector data1 = new LongColumnStatsDataInspector(DATA_1);
+    LongColumnStatsDataInspector data2 = new LongColumnStatsDataInspector();
+    assertEquals(1, (long)  merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)));
+  }
+
+  @Test
+  public void testCompareSecondNullMax() {
+    LongColumnStatsDataInspector data1 = new LongColumnStatsDataInspector();
+    LongColumnStatsDataInspector data2 = new LongColumnStatsDataInspector(DATA_1);
+    assertEquals(1, (long)  merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)));
+  }
+}
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/TimestampColumnStatsMergerTest.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/TimestampColumnStatsMergerTest.java
new file mode 100644
index 00000000000..875a8c1d456
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/columnstats/merge/TimestampColumnStatsMergerTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.merge;
+
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Timestamp;
+import org.apache.hadoop.hive.metastore.columnstats.ColStatsBuilder;
+import org.apache.hadoop.hive.metastore.columnstats.cache.TimestampColumnStatsDataInspector;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.apache.hadoop.hive.metastore.columnstats.merge.ColumnStatsMergerTest.createColumnStatisticsObj;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+@Category(MetastoreUnitTest.class)
+public class TimestampColumnStatsMergerTest {
+
+  private static final Timestamp TS_1 = new Timestamp(1);
+  private static final Timestamp TS_2 = new Timestamp(2);
+  private static final Timestamp TS_3 = new Timestamp(3);
+
+  private static final TimestampColumnStatsDataInspector DATA_1 = new TimestampColumnStatsDataInspector();
+  private static final TimestampColumnStatsDataInspector DATA_2 = new TimestampColumnStatsDataInspector();
+  private static final TimestampColumnStatsDataInspector DATA_3 = new TimestampColumnStatsDataInspector();
+
+  static {
+    DATA_1.setLowValue(TS_1);
+    DATA_1.setHighValue(TS_1);
+    DATA_2.setLowValue(TS_2);
+    DATA_2.setHighValue(TS_2);
+    DATA_3.setLowValue(TS_3);
+    DATA_3.setHighValue(TS_3);
+  }
+
+  private final TimestampColumnStatsMerger merger = new TimestampColumnStatsMerger();
+
+  @Test
+  public void testMergeNullValues() {
+    ColumnStatisticsObj aggrObj = createColumnStatisticsObj(new ColStatsBuilder<>(Timestamp.class)
+        .low(null)
+        .high(null)
+        .numNulls(1)
+        .numDVs(0)
+        .build());
+    merger.merge(aggrObj, aggrObj);
+
+    ColumnStatisticsData expectedColumnStatisticsData = new ColStatsBuilder<>(Timestamp.class)
+        .low(null)
+        .high(null)
+        .numNulls(2)
+        .numDVs(0)
+        .build();
+
+    assertEquals(expectedColumnStatisticsData, aggrObj.getStatsData());
+  }
+
+  @Test
+  public void testMergeNullWithNonNullValues() {
+    ColumnStatisticsObj aggrObj = createColumnStatisticsObj(new ColStatsBuilder<>(Timestamp.class)
+        .low(null)
+        .high(null)
+        .numNulls(0)
+        .numDVs(0)
+        .build());
+    ColumnStatisticsObj newObj = createColumnStatisticsObj(new ColStatsBuilder<>(Timestamp.class)
+        .low(TS_1)
+        .high(TS_3)
+        .numNulls(4)
+        .numDVs(2)
+        .hll(TS_1.getSecondsSinceEpoch(), TS_3.getSecondsSinceEpoch(), TS_3.getSecondsSinceEpoch())
+        .kll(TS_1.getSecondsSinceEpoch(), TS_3.getSecondsSinceEpoch(), TS_3.getSecondsSinceEpoch())
+        .build());
+
+    merger.merge(aggrObj, newObj);
+
+    assertEquals(newObj.getStatsData(), aggrObj.getStatsData());
+  }
+
+  @Test
+  public void testMergeNonNullWithNullValues() {
+    ColumnStatisticsObj aggrObj = createColumnStatisticsObj(new ColStatsBuilder<>(Timestamp.class)
+        .low(TS_1)
+        .high(TS_3)
+        .numNulls(4)
+        .numDVs(2)
+        .hll(TS_1.getSecondsSinceEpoch(), TS_3.getSecondsSinceEpoch(), TS_3.getSecondsSinceEpoch())
+        .kll(TS_1.getSecondsSinceEpoch(), TS_3.getSecondsSinceEpoch(), TS_3.getSecondsSinceEpoch())
+        .build());
+
+    ColumnStatisticsObj newObj = createColumnStatisticsObj(new ColStatsBuilder<>(Timestamp.class)
+        .low(null)
+        .high(null)
+        .numNulls(2)
+        .numDVs(0)
+        .build());
+    merger.merge(aggrObj, newObj);
+
+    ColumnStatisticsData expectedColumnStatisticsData = new ColStatsBuilder<>(Timestamp.class)
+        .low(TS_1)
+        .high(TS_3)
+        .numNulls(6)
+        .numDVs(2)
+        .hll(TS_1.getSecondsSinceEpoch(), TS_3.getSecondsSinceEpoch(), TS_3.getSecondsSinceEpoch())
+        .kll(TS_1.getSecondsSinceEpoch(), TS_3.getSecondsSinceEpoch(), TS_3.getSecondsSinceEpoch())
+        .build();
+    assertEquals(expectedColumnStatisticsData, aggrObj.getStatsData());
+  }
+
+  @Test
+  public void testMergeNonNullValues() {
+    ColumnStatisticsObj aggrObj = createColumnStatisticsObj(new ColStatsBuilder<>(Timestamp.class)
+        .low(TS_2)
+        .high(TS_2)
+        .numNulls(2)
+        .numDVs(1)
+        .hll(TS_2.getSecondsSinceEpoch())
+        .kll(TS_2.getSecondsSinceEpoch())
+        .build());
+    ColumnStatisticsObj newObj = createColumnStatisticsObj(new ColStatsBuilder<>(Timestamp.class)
+        .low(TS_3)
+        .high(TS_3)
+        .numNulls(3)
+        .numDVs(1)
+        .hll(TS_3.getSecondsSinceEpoch())
+        .kll(TS_3.getSecondsSinceEpoch())
+        .build());
+    merger.merge(aggrObj, newObj);
+
+    newObj = createColumnStatisticsObj(new ColStatsBuilder<>(Timestamp.class)
+        .low(TS_1)
+        .high(TS_1)
+        .numNulls(1)
+        .numDVs(1)
+        .hll(TS_1.getSecondsSinceEpoch(), TS_1.getSecondsSinceEpoch())
+        .kll(TS_1.getSecondsSinceEpoch(), TS_1.getSecondsSinceEpoch())
+        .build());
+    merger.merge(aggrObj, newObj);
+
+    ColumnStatisticsData expectedColumnStatisticsData = new ColStatsBuilder<>(Timestamp.class)
+        .low(TS_1)
+        .high(TS_3)
+        .numNulls(6)
+        .numDVs(3)
+        .hll(TS_2.getSecondsSinceEpoch(), TS_3.getSecondsSinceEpoch(),
+            TS_1.getSecondsSinceEpoch(), TS_1.getSecondsSinceEpoch())
+        .kll(TS_2.getSecondsSinceEpoch(), TS_3.getSecondsSinceEpoch(),
+            TS_1.getSecondsSinceEpoch(), TS_1.getSecondsSinceEpoch())
+        .build();
+    assertEquals(expectedColumnStatisticsData, aggrObj.getStatsData());
+  }
+
+  @Test
+  public void testCompareSimple() {
+    TimestampColumnStatsDataInspector data1 = new TimestampColumnStatsDataInspector(DATA_1);
+    TimestampColumnStatsDataInspector data2 = new TimestampColumnStatsDataInspector(DATA_2);
+    assertEquals(TS_2, merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)));
+  }
+
+  @Test
+  public void testCompareSimpleFlipped() {
+    TimestampColumnStatsDataInspector data1 = new TimestampColumnStatsDataInspector(DATA_2);
+    TimestampColumnStatsDataInspector data2 = new TimestampColumnStatsDataInspector(DATA_1);
+    assertEquals(TS_2, merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)));
+  }
+
+  @Test
+  public void testCompareSimpleReversed() {
+    TimestampColumnStatsDataInspector data1 = new TimestampColumnStatsDataInspector(DATA_1);
+    TimestampColumnStatsDataInspector data2 = new TimestampColumnStatsDataInspector(DATA_2);
+    assertEquals(TS_1, merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)));
+  }
+
+  @Test
+  public void testCompareSimpleFlippedReversed() {
+    TimestampColumnStatsDataInspector data1 = new TimestampColumnStatsDataInspector(DATA_2);
+    TimestampColumnStatsDataInspector data2 = new TimestampColumnStatsDataInspector(DATA_1);
+    assertEquals(TS_1, merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)));
+  }
+
+  @Test
+  public void testCompareNullsMin() {
+    TimestampColumnStatsDataInspector data1 = new TimestampColumnStatsDataInspector();
+    TimestampColumnStatsDataInspector data2 = new TimestampColumnStatsDataInspector();
+    assertNull(merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)));
+  }
+
+  @Test
+  public void testCompareNullsMax() {
+    TimestampColumnStatsDataInspector data1 = new TimestampColumnStatsDataInspector();
+    TimestampColumnStatsDataInspector data2 = new TimestampColumnStatsDataInspector();
+    assertNull(merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)));
+  }
+
+  @Test
+  public void testCompareFirstNullMin() {
+    TimestampColumnStatsDataInspector data1 = new TimestampColumnStatsDataInspector();
+    TimestampColumnStatsDataInspector data2 = new TimestampColumnStatsDataInspector(DATA_1);
+    assertEquals(TS_1, merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)));
+  }
+
+  @Test
+  public void testCompareSecondNullMin() {
+    TimestampColumnStatsDataInspector data1 = new TimestampColumnStatsDataInspector(DATA_1);
+    TimestampColumnStatsDataInspector data2 = new TimestampColumnStatsDataInspector();
+    assertEquals(TS_1, merger.mergeLowValue(merger.getLowValue(data1), merger.getLowValue(data2)));
+  }
+
+  @Test
+  public void testCompareFirstNullMax() {
+    TimestampColumnStatsDataInspector data1 = new TimestampColumnStatsDataInspector(DATA_1);
+    TimestampColumnStatsDataInspector data2 = new TimestampColumnStatsDataInspector();
+    assertEquals(TS_1, merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)));
+  }
+
+  @Test
+  public void testCompareSecondNullMax() {
+    TimestampColumnStatsDataInspector data1 = new TimestampColumnStatsDataInspector();
+    TimestampColumnStatsDataInspector data2 = new TimestampColumnStatsDataInspector(DATA_1);
+    assertEquals(TS_1, merger.mergeHighValue(merger.getHighValue(data1), merger.getHighValue(data2)));
+  }
+}