You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/04/04 22:36:53 UTC

[18/50] [abbrv] hive git commit: HIVE-12960: Migrate Column Stats Extrapolation and UniformDistribution to HBaseStore (Pengcheng Xiong, reviewed by Ashutosh Chauhan)

http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsExtrapolation.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsExtrapolation.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsExtrapolation.java
new file mode 100644
index 0000000..f4e55ed
--- /dev/null
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsExtrapolation.java
@@ -0,0 +1,717 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.hbase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.StatObjectConverter;
+import org.apache.hadoop.hive.metastore.api.AggrStats;
+import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class TestHBaseAggregateStatsExtrapolation {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(TestHBaseAggregateStatsExtrapolation.class.getName());
+
+  @Mock
+  HTableInterface htable;
+  private HBaseStore store;
+  SortedMap<String, Cell> rows = new TreeMap<>();
+
+  // NDV will be 3 for the bitVectors
+  String bitVectors = "{0, 4, 5, 7}{0, 1}{0, 1, 2}{0, 1, 4}{0}{0, 2}{0, 3}{0, 2, 3, 4}{0, 1, 4}{0, 1}{0}{0, 1, 3, 8}{0, 2}{0, 2}{0, 9}{0, 1, 4}";
+
+  @Before
+  public void before() throws IOException {
+    MockitoAnnotations.initMocks(this);
+    HiveConf conf = new HiveConf();
+    conf.setBoolean(HBaseReadWrite.NO_CACHE_CONF, true);
+    store = MockUtils.init(conf, htable, rows);
+    store.backdoor().getStatsCache().resetCounters();
+  }
+
+  private static interface Checker {
+    void checkStats(AggrStats aggrStats) throws Exception;
+  }
+
+  @Test
+  public void allPartitionsHaveBitVectorStatusLong() throws Exception {
+    String dbName = "default";
+    String tableName = "snp";
+    long now = System.currentTimeMillis();
+    List<FieldSchema> cols = new ArrayList<>();
+    cols.add(new FieldSchema("col1", "long", "nocomment"));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+        serde, null, null, Collections.<String, String> emptyMap());
+    List<FieldSchema> partCols = new ArrayList<>();
+    partCols.add(new FieldSchema("ds", "string", ""));
+    Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+        Collections.<String, String> emptyMap(), null, null, null);
+    store.createTable(table);
+
+    List<List<String>> partVals = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      List<String> partVal = Arrays.asList("" + i);
+      partVals.add(partVal);
+      StorageDescriptor psd = new StorageDescriptor(sd);
+      psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+      Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+          Collections.<String, String> emptyMap());
+      store.addPartition(part);
+      ColumnStatistics cs = new ColumnStatistics();
+      ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+      desc.setLastAnalyzed(now);
+      desc.setPartName("ds=" + partVal);
+      cs.setStatsDesc(desc);
+      ColumnStatisticsObj obj = new ColumnStatisticsObj();
+      obj.setColName("col1");
+      obj.setColType("long");
+      ColumnStatisticsData data = new ColumnStatisticsData();
+      LongColumnStatsData dcsd = new LongColumnStatsData();
+      dcsd.setHighValue(1000 + i);
+      dcsd.setLowValue(-1000 - i);
+      dcsd.setNumNulls(i);
+      dcsd.setNumDVs(10 * i + 1);
+      dcsd.setBitVectors(bitVectors);
+      data.setLongStats(dcsd);
+      obj.setStatsData(data);
+      cs.addToStatsObj(obj);
+      store.updatePartitionColumnStatistics(cs, partVal);
+    }
+
+    Checker statChecker = new Checker() {
+      @Override
+      public void checkStats(AggrStats aggrStats) throws Exception {
+        Assert.assertEquals(10, aggrStats.getPartsFound());
+        Assert.assertEquals(1, aggrStats.getColStatsSize());
+        ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+        Assert.assertEquals("col1", cso.getColName());
+        Assert.assertEquals("long", cso.getColType());
+        LongColumnStatsData lcsd = cso.getStatsData().getLongStats();
+        Assert.assertEquals(1009, lcsd.getHighValue(), 0.01);
+        Assert.assertEquals(-1009, lcsd.getLowValue(), 0.01);
+        Assert.assertEquals(45, lcsd.getNumNulls());
+        Assert.assertEquals(3, lcsd.getNumDVs());
+      }
+    };
+    List<String> partNames = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      partNames.add("ds=" + i);
+    }
+    AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+        Arrays.asList("col1"));
+    statChecker.checkStats(aggrStats);
+  }
+
+  @Test
+  public void allPartitionsHaveBitVectorStatusDecimal() throws Exception {
+    String dbName = "default";
+    String tableName = "snp";
+    long now = System.currentTimeMillis();
+    List<FieldSchema> cols = new ArrayList<>();
+    cols.add(new FieldSchema("col1_decimal", "decimal", "nocomment"));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+        serde, null, null, Collections.<String, String> emptyMap());
+    List<FieldSchema> partCols = new ArrayList<>();
+    partCols.add(new FieldSchema("ds", "string", ""));
+    Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+        Collections.<String, String> emptyMap(), null, null, null);
+    store.createTable(table);
+
+    List<List<String>> partVals = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      List<String> partVal = Arrays.asList("" + i);
+      partVals.add(partVal);
+      StorageDescriptor psd = new StorageDescriptor(sd);
+      psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+      Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+          Collections.<String, String> emptyMap());
+      store.addPartition(part);
+      ColumnStatistics cs = new ColumnStatistics();
+      ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+      desc.setLastAnalyzed(now);
+      desc.setPartName("ds=" + partVal);
+      cs.setStatsDesc(desc);
+      ColumnStatisticsObj obj = new ColumnStatisticsObj();
+      obj.setColName("col1_decimal");
+      obj.setColType("decimal");
+      ColumnStatisticsData data = new ColumnStatisticsData();
+      DecimalColumnStatsData dcsd = new DecimalColumnStatsData();
+      dcsd.setHighValue(StatObjectConverter.createThriftDecimal("" + (1000 + i)));
+      dcsd.setLowValue(StatObjectConverter.createThriftDecimal("" + (-1000 - i)));
+      dcsd.setNumNulls(i);
+      dcsd.setNumDVs(10 * i + 1);
+      dcsd.setBitVectors(bitVectors);
+      data.setDecimalStats(dcsd);
+      obj.setStatsData(data);
+      cs.addToStatsObj(obj);
+      store.updatePartitionColumnStatistics(cs, partVal);
+    }
+
+    Checker statChecker = new Checker() {
+      @Override
+      public void checkStats(AggrStats aggrStats) throws Exception {
+        Assert.assertEquals(10, aggrStats.getPartsFound());
+        Assert.assertEquals(1, aggrStats.getColStatsSize());
+        ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+        Assert.assertEquals("col1_decimal", cso.getColName());
+        Assert.assertEquals("decimal", cso.getColType());
+        DecimalColumnStatsData lcsd = cso.getStatsData().getDecimalStats();
+        Assert.assertEquals(1009, HBaseUtils.getDoubleValue(lcsd.getHighValue()), 0.01);
+        Assert.assertEquals(-1009, HBaseUtils.getDoubleValue(lcsd.getLowValue()), 0.01);
+        Assert.assertEquals(45, lcsd.getNumNulls());
+        Assert.assertEquals(3, lcsd.getNumDVs());
+      }
+    };
+    List<String> partNames = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      partNames.add("ds=" + i);
+    }
+    AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+        Arrays.asList("col1_decimal"));
+    statChecker.checkStats(aggrStats);
+  }
+
+  @Test
+  public void allPartitionsHaveBitVectorStatusDouble() throws Exception {
+    String dbName = "default";
+    String tableName = "snp";
+    long now = System.currentTimeMillis();
+    List<FieldSchema> cols = new ArrayList<>();
+    cols.add(new FieldSchema("col1_double", "double", "nocomment"));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+        serde, null, null, Collections.<String, String> emptyMap());
+    List<FieldSchema> partCols = new ArrayList<>();
+    partCols.add(new FieldSchema("ds", "string", ""));
+    Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+        Collections.<String, String> emptyMap(), null, null, null);
+    store.createTable(table);
+
+    List<List<String>> partVals = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      List<String> partVal = Arrays.asList("" + i);
+      partVals.add(partVal);
+      StorageDescriptor psd = new StorageDescriptor(sd);
+      psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+      Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+          Collections.<String, String> emptyMap());
+      store.addPartition(part);
+      ColumnStatistics cs = new ColumnStatistics();
+      ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+      desc.setLastAnalyzed(now);
+      desc.setPartName("ds=" + partVal);
+      cs.setStatsDesc(desc);
+      ColumnStatisticsObj obj = new ColumnStatisticsObj();
+      obj.setColName("col1_double");
+      obj.setColType("double");
+      ColumnStatisticsData data = new ColumnStatisticsData();
+      DoubleColumnStatsData dcsd = new DoubleColumnStatsData();
+      dcsd.setHighValue(1000 + i);
+      dcsd.setLowValue(-1000 - i);
+      dcsd.setNumNulls(i);
+      dcsd.setNumDVs(10 * i + 1);
+      dcsd.setBitVectors(bitVectors);
+      data.setDoubleStats(dcsd);
+      obj.setStatsData(data);
+      cs.addToStatsObj(obj);
+      store.updatePartitionColumnStatistics(cs, partVal);
+    }
+
+    Checker statChecker = new Checker() {
+      @Override
+      public void checkStats(AggrStats aggrStats) throws Exception {
+        Assert.assertEquals(10, aggrStats.getPartsFound());
+        Assert.assertEquals(1, aggrStats.getColStatsSize());
+        ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+        Assert.assertEquals("col1_double", cso.getColName());
+        Assert.assertEquals("double", cso.getColType());
+        DoubleColumnStatsData lcsd = cso.getStatsData().getDoubleStats();
+        Assert.assertEquals(1009, lcsd.getHighValue(), 0.01);
+        Assert.assertEquals(-1009, lcsd.getLowValue(), 0.01);
+        Assert.assertEquals(45, lcsd.getNumNulls());
+        Assert.assertEquals(3, lcsd.getNumDVs());
+      }
+    };
+    List<String> partNames = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      partNames.add("ds=" + i);
+    }
+    AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+        Arrays.asList("col1_double"));
+    statChecker.checkStats(aggrStats);
+  }
+  
+  @Test
+  public void allPartitionsHaveBitVectorStatusString() throws Exception {
+    String dbName = "default";
+    String tableName = "snp";
+    long now = System.currentTimeMillis();
+    List<FieldSchema> cols = new ArrayList<>();
+    cols.add(new FieldSchema("col1_string", "string", "nocomment"));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+        serde, null, null, Collections.<String, String> emptyMap());
+    List<FieldSchema> partCols = new ArrayList<>();
+    partCols.add(new FieldSchema("ds", "string", ""));
+    Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+        Collections.<String, String> emptyMap(), null, null, null);
+    store.createTable(table);
+
+    List<List<String>> partVals = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      List<String> partVal = Arrays.asList("" + i);
+      partVals.add(partVal);
+      StorageDescriptor psd = new StorageDescriptor(sd);
+      psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+      Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+          Collections.<String, String> emptyMap());
+      store.addPartition(part);
+      ColumnStatistics cs = new ColumnStatistics();
+      ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+      desc.setLastAnalyzed(now);
+      desc.setPartName("ds=" + partVal);
+      cs.setStatsDesc(desc);
+      ColumnStatisticsObj obj = new ColumnStatisticsObj();
+      obj.setColName("col1_string");
+      obj.setColType("string");
+      ColumnStatisticsData data = new ColumnStatisticsData();
+      StringColumnStatsData dcsd = new StringColumnStatsData();
+      dcsd.setAvgColLen(i + 1);
+      dcsd.setMaxColLen(i + 10);
+      dcsd.setNumNulls(i);
+      dcsd.setNumDVs(10 * i + 1);
+      dcsd.setBitVectors(bitVectors);
+      data.setStringStats(dcsd);
+      obj.setStatsData(data);
+      cs.addToStatsObj(obj);
+      store.updatePartitionColumnStatistics(cs, partVal);
+    }
+
+    Checker statChecker = new Checker() {
+      @Override
+      public void checkStats(AggrStats aggrStats) throws Exception {
+        Assert.assertEquals(10, aggrStats.getPartsFound());
+        Assert.assertEquals(1, aggrStats.getColStatsSize());
+        ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+        Assert.assertEquals("col1_string", cso.getColName());
+        Assert.assertEquals("string", cso.getColType());
+        StringColumnStatsData lcsd = cso.getStatsData().getStringStats();
+        Assert.assertEquals(10, lcsd.getAvgColLen(), 0.01);
+        Assert.assertEquals(19, lcsd.getMaxColLen(), 0.01);
+        Assert.assertEquals(45, lcsd.getNumNulls());
+        Assert.assertEquals(3, lcsd.getNumDVs());
+      }
+    };
+    List<String> partNames = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      partNames.add("ds=" + i);
+    }
+    AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+        Arrays.asList("col1_string"));
+    statChecker.checkStats(aggrStats);
+  }
+
+  @Test
+  public void noPartitionsHaveBitVectorStatus() throws Exception {
+    String dbName = "default";
+    String tableName = "snp";
+    long now = System.currentTimeMillis();
+    List<FieldSchema> cols = new ArrayList<>();
+    cols.add(new FieldSchema("col2", "long", "nocomment"));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+        serde, null, null, Collections.<String, String> emptyMap());
+    List<FieldSchema> partCols = new ArrayList<>();
+    partCols.add(new FieldSchema("ds", "string", ""));
+    Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+        Collections.<String, String> emptyMap(), null, null, null);
+    store.createTable(table);
+
+    List<List<String>> partVals = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      List<String> partVal = Arrays.asList("" + i);
+      partVals.add(partVal);
+      StorageDescriptor psd = new StorageDescriptor(sd);
+      psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+      Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+          Collections.<String, String> emptyMap());
+      store.addPartition(part);
+      ColumnStatistics cs = new ColumnStatistics();
+      ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+      desc.setLastAnalyzed(now);
+      desc.setPartName("ds=" + partVal);
+      cs.setStatsDesc(desc);
+      ColumnStatisticsObj obj = new ColumnStatisticsObj();
+      obj.setColName("col2");
+      obj.setColType("long");
+      ColumnStatisticsData data = new ColumnStatisticsData();
+      LongColumnStatsData dcsd = new LongColumnStatsData();
+      dcsd.setHighValue(1000 + i);
+      dcsd.setLowValue(-1000 - i);
+      dcsd.setNumNulls(i);
+      dcsd.setNumDVs(10 * i);
+      data.setLongStats(dcsd);
+      obj.setStatsData(data);
+      cs.addToStatsObj(obj);
+      store.updatePartitionColumnStatistics(cs, partVal);
+    }
+
+    Checker statChecker = new Checker() {
+      @Override
+      public void checkStats(AggrStats aggrStats) throws Exception {
+        Assert.assertEquals(10, aggrStats.getPartsFound());
+        Assert.assertEquals(1, aggrStats.getColStatsSize());
+        ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+        Assert.assertEquals("col2", cso.getColName());
+        Assert.assertEquals("long", cso.getColType());
+        LongColumnStatsData lcsd = cso.getStatsData().getLongStats();
+        Assert.assertEquals(1009, lcsd.getHighValue(), 0.01);
+        Assert.assertEquals(-1009, lcsd.getLowValue(), 0.01);
+        Assert.assertEquals(45, lcsd.getNumNulls());
+        Assert.assertEquals(90, lcsd.getNumDVs());
+      }
+    };
+    List<String> partNames = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      partNames.add("ds=" + i);
+    }
+    AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+        Arrays.asList("col2"));
+    statChecker.checkStats(aggrStats);
+  }
+
+  @Test
+  public void TwoEndsOfPartitionsHaveBitVectorStatus() throws Exception {
+    String dbName = "default";
+    String tableName = "snp";
+    long now = System.currentTimeMillis();
+    List<FieldSchema> cols = new ArrayList<>();
+    cols.add(new FieldSchema("col3", "long", "nocomment"));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+        serde, null, null, Collections.<String, String> emptyMap());
+    List<FieldSchema> partCols = new ArrayList<>();
+    partCols.add(new FieldSchema("ds", "string", ""));
+    Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+        Collections.<String, String> emptyMap(), null, null, null);
+    store.createTable(table);
+
+    List<List<String>> partVals = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      List<String> partVal = Arrays.asList("" + i);
+      partVals.add(partVal);
+      StorageDescriptor psd = new StorageDescriptor(sd);
+      psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+      Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+          Collections.<String, String> emptyMap());
+      store.addPartition(part);
+      if (i < 2 || i > 7) {
+        ColumnStatistics cs = new ColumnStatistics();
+        ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+        desc.setLastAnalyzed(now);
+        desc.setPartName("ds=" + partVal);
+        cs.setStatsDesc(desc);
+        ColumnStatisticsObj obj = new ColumnStatisticsObj();
+        obj.setColName("col3");
+        obj.setColType("long");
+        ColumnStatisticsData data = new ColumnStatisticsData();
+        LongColumnStatsData dcsd = new LongColumnStatsData();
+        dcsd.setHighValue(1000 + i);
+        dcsd.setLowValue(-1000 - i);
+        dcsd.setNumNulls(i);
+        dcsd.setNumDVs(10 * i);
+        dcsd.setBitVectors(bitVectors);
+        data.setLongStats(dcsd);
+        obj.setStatsData(data);
+        cs.addToStatsObj(obj);
+        store.updatePartitionColumnStatistics(cs, partVal);
+      }
+    }
+
+    Checker statChecker = new Checker() {
+      @Override
+      public void checkStats(AggrStats aggrStats) throws Exception {
+        Assert.assertEquals(4, aggrStats.getPartsFound());
+        Assert.assertEquals(1, aggrStats.getColStatsSize());
+        ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+        Assert.assertEquals("col3", cso.getColName());
+        Assert.assertEquals("long", cso.getColType());
+        LongColumnStatsData lcsd = cso.getStatsData().getLongStats();
+        Assert.assertEquals(1010, lcsd.getHighValue(), 0.01);
+        Assert.assertEquals(-1010, lcsd.getLowValue(), 0.01);
+        Assert.assertEquals(45, lcsd.getNumNulls());
+        Assert.assertEquals(3, lcsd.getNumDVs());
+      }
+    };
+    List<String> partNames = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      partNames.add("ds=" + i);
+    }
+    AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+        Arrays.asList("col3"));
+    statChecker.checkStats(aggrStats);
+  }
+
+  @Test
+  public void MiddleOfPartitionsHaveBitVectorStatus() throws Exception {
+    String dbName = "default";
+    String tableName = "snp";
+    long now = System.currentTimeMillis();
+    List<FieldSchema> cols = new ArrayList<>();
+    cols.add(new FieldSchema("col4", "long", "nocomment"));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+        serde, null, null, Collections.<String, String> emptyMap());
+    List<FieldSchema> partCols = new ArrayList<>();
+    partCols.add(new FieldSchema("ds", "string", ""));
+    Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+        Collections.<String, String> emptyMap(), null, null, null);
+    store.createTable(table);
+
+    List<List<String>> partVals = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      List<String> partVal = Arrays.asList("" + i);
+      partVals.add(partVal);
+      StorageDescriptor psd = new StorageDescriptor(sd);
+      psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+      Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+          Collections.<String, String> emptyMap());
+      store.addPartition(part);
+      if (i > 2 && i < 7) {
+        ColumnStatistics cs = new ColumnStatistics();
+        ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+        desc.setLastAnalyzed(now);
+        desc.setPartName("ds=" + partVal);
+        cs.setStatsDesc(desc);
+        ColumnStatisticsObj obj = new ColumnStatisticsObj();
+        obj.setColName("col4");
+        obj.setColType("long");
+        ColumnStatisticsData data = new ColumnStatisticsData();
+        LongColumnStatsData dcsd = new LongColumnStatsData();
+        dcsd.setHighValue(1000 + i);
+        dcsd.setLowValue(-1000 - i);
+        dcsd.setNumNulls(i);
+        dcsd.setNumDVs(10 * i);
+        dcsd.setBitVectors(bitVectors);
+        data.setLongStats(dcsd);
+        obj.setStatsData(data);
+        cs.addToStatsObj(obj);
+        store.updatePartitionColumnStatistics(cs, partVal);
+      }
+    }
+
+    Checker statChecker = new Checker() {
+      @Override
+      public void checkStats(AggrStats aggrStats) throws Exception {
+        Assert.assertEquals(4, aggrStats.getPartsFound());
+        Assert.assertEquals(1, aggrStats.getColStatsSize());
+        ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+        Assert.assertEquals("col4", cso.getColName());
+        Assert.assertEquals("long", cso.getColType());
+        LongColumnStatsData lcsd = cso.getStatsData().getLongStats();
+        Assert.assertEquals(1006, lcsd.getHighValue(), 0.01);
+        Assert.assertEquals(-1006, lcsd.getLowValue(), 0.01);
+        Assert.assertEquals(45, lcsd.getNumNulls());
+        Assert.assertEquals(3, lcsd.getNumDVs());
+      }
+    };
+    List<String> partNames = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      partNames.add("ds=" + i);
+    }
+    AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+        Arrays.asList("col4"));
+    statChecker.checkStats(aggrStats);
+  }
+
+  @Test
+  public void TwoEndsAndMiddleOfPartitionsHaveBitVectorStatusLong() throws Exception {
+    String dbName = "default";
+    String tableName = "snp";
+    long now = System.currentTimeMillis();
+    List<FieldSchema> cols = new ArrayList<>();
+    cols.add(new FieldSchema("col5", "long", "nocomment"));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+        serde, null, null, Collections.<String, String> emptyMap());
+    List<FieldSchema> partCols = new ArrayList<>();
+    partCols.add(new FieldSchema("ds", "string", ""));
+    Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+        Collections.<String, String> emptyMap(), null, null, null);
+    store.createTable(table);
+
+    List<List<String>> partVals = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      List<String> partVal = Arrays.asList("" + i);
+      partVals.add(partVal);
+      StorageDescriptor psd = new StorageDescriptor(sd);
+      psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+      Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+          Collections.<String, String> emptyMap());
+      store.addPartition(part);
+      if (i == 0 || i == 2 || i == 3 || i == 5 || i == 6 || i == 8) {
+        ColumnStatistics cs = new ColumnStatistics();
+        ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+        desc.setLastAnalyzed(now);
+        desc.setPartName("ds=" + partVal);
+        cs.setStatsDesc(desc);
+        ColumnStatisticsObj obj = new ColumnStatisticsObj();
+        obj.setColName("col5");
+        obj.setColType("long");
+        ColumnStatisticsData data = new ColumnStatisticsData();
+        LongColumnStatsData dcsd = new LongColumnStatsData();
+        dcsd.setHighValue(1000 + i);
+        dcsd.setLowValue(-1000 - i);
+        dcsd.setNumNulls(i);
+        dcsd.setNumDVs(10 * i);
+        dcsd.setBitVectors(bitVectors);
+        data.setLongStats(dcsd);
+        obj.setStatsData(data);
+        cs.addToStatsObj(obj);
+        store.updatePartitionColumnStatistics(cs, partVal);
+      }
+    }
+
+    Checker statChecker = new Checker() {
+      @Override
+      public void checkStats(AggrStats aggrStats) throws Exception {
+        Assert.assertEquals(6, aggrStats.getPartsFound());
+        Assert.assertEquals(1, aggrStats.getColStatsSize());
+        ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+        Assert.assertEquals("col5", cso.getColName());
+        Assert.assertEquals("long", cso.getColType());
+        LongColumnStatsData lcsd = cso.getStatsData().getLongStats();
+        Assert.assertEquals(1010, lcsd.getHighValue(), 0.01);
+        Assert.assertEquals(-1010, lcsd.getLowValue(), 0.01);
+        Assert.assertEquals(40, lcsd.getNumNulls());
+        Assert.assertEquals(3, lcsd.getNumDVs());
+      }
+    };
+    List<String> partNames = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      partNames.add("ds=" + i);
+    }
+    AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+        Arrays.asList("col5"));
+    statChecker.checkStats(aggrStats);
+  }
+  
+  @Test
+  public void TwoEndsAndMiddleOfPartitionsHaveBitVectorStatusDouble() throws Exception {
+    String dbName = "default";
+    String tableName = "snp";
+    long now = System.currentTimeMillis();
+    List<FieldSchema> cols = new ArrayList<>();
+    cols.add(new FieldSchema("col5_double", "double", "nocomment"));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+        serde, null, null, Collections.<String, String> emptyMap());
+    List<FieldSchema> partCols = new ArrayList<>();
+    partCols.add(new FieldSchema("ds", "string", ""));
+    Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+        Collections.<String, String> emptyMap(), null, null, null);
+    store.createTable(table);
+
+    List<List<String>> partVals = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      List<String> partVal = Arrays.asList("" + i);
+      partVals.add(partVal);
+      StorageDescriptor psd = new StorageDescriptor(sd);
+      psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+      Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+          Collections.<String, String> emptyMap());
+      store.addPartition(part);
+      if (i == 0 || i == 2 || i == 3 || i == 5 || i == 6 || i == 8) {
+        ColumnStatistics cs = new ColumnStatistics();
+        ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+        desc.setLastAnalyzed(now);
+        desc.setPartName("ds=" + partVal);
+        cs.setStatsDesc(desc);
+        ColumnStatisticsObj obj = new ColumnStatisticsObj();
+        obj.setColName("col5_double");
+        obj.setColType("double");
+        ColumnStatisticsData data = new ColumnStatisticsData();
+        DoubleColumnStatsData dcsd = new DoubleColumnStatsData();
+        dcsd.setHighValue(1000 + i);
+        dcsd.setLowValue(-1000 - i);
+        dcsd.setNumNulls(i);
+        dcsd.setNumDVs(10 * i);
+        dcsd.setBitVectors(bitVectors);
+        data.setDoubleStats(dcsd);
+        obj.setStatsData(data);
+        cs.addToStatsObj(obj);
+        store.updatePartitionColumnStatistics(cs, partVal);
+      }
+    }
+
+    Checker statChecker = new Checker() {
+      @Override
+      public void checkStats(AggrStats aggrStats) throws Exception {
+        Assert.assertEquals(6, aggrStats.getPartsFound());
+        Assert.assertEquals(1, aggrStats.getColStatsSize());
+        ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+        Assert.assertEquals("col5_double", cso.getColName());
+        Assert.assertEquals("double", cso.getColType());
+        DoubleColumnStatsData lcsd = cso.getStatsData().getDoubleStats();
+        Assert.assertEquals(1010, lcsd.getHighValue(), 0.01);
+        Assert.assertEquals(-1010, lcsd.getLowValue(), 0.01);
+        Assert.assertEquals(40, lcsd.getNumNulls());
+        Assert.assertEquals(3, lcsd.getNumDVs());
+      }
+    };
+    List<String> partNames = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      partNames.add("ds=" + i);
+    }
+    AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+        Arrays.asList("col5_double"));
+    statChecker.checkStats(aggrStats);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsNDVUniformDist.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsNDVUniformDist.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsNDVUniformDist.java
new file mode 100644
index 0000000..62918be
--- /dev/null
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsNDVUniformDist.java
@@ -0,0 +1,581 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.hbase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.StatObjectConverter;
+import org.apache.hadoop.hive.metastore.api.AggrStats;
+import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class TestHBaseAggregateStatsNDVUniformDist {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(TestHBaseAggregateStatsNDVUniformDist.class.getName());
+
+  @Mock
+  HTableInterface htable;
+  private HBaseStore store;
+  SortedMap<String, Cell> rows = new TreeMap<>();
+
+  // NDV will be 3 for bitVectors[0] and 12 for bitVectors[1] 
+  String bitVectors[] = {
+      "{0, 4, 5, 7}{0, 1}{0, 1, 2}{0, 1, 4}{0}{0, 2}{0, 3}{0, 2, 3, 4}{0, 1, 4}{0, 1}{0}{0, 1, 3, 8}{0, 2}{0, 2}{0, 9}{0, 1, 4}",
+      "{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}" };
+
+  @Before
+  public void before() throws IOException {
+    MockitoAnnotations.initMocks(this);
+    HiveConf conf = new HiveConf();
+    conf.setBoolean(HBaseReadWrite.NO_CACHE_CONF, true);
+    conf.setBoolean(HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION.varname, true);
+    store = MockUtils.init(conf, htable, rows);
+    store.backdoor().getStatsCache().resetCounters();
+  }
+
+  private static interface Checker {
+    void checkStats(AggrStats aggrStats) throws Exception;
+  }
+
+  @Test
+  public void allPartitionsHaveBitVectorStatus() throws Exception {
+    String dbName = "default";
+    String tableName = "snp";
+    long now = System.currentTimeMillis();
+    List<FieldSchema> cols = new ArrayList<>();
+    cols.add(new FieldSchema("col1", "long", "nocomment"));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+        serde, null, null, Collections.<String, String> emptyMap());
+    List<FieldSchema> partCols = new ArrayList<>();
+    partCols.add(new FieldSchema("ds", "string", ""));
+    Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+        Collections.<String, String> emptyMap(), null, null, null);
+    store.createTable(table);
+
+    List<List<String>> partVals = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      List<String> partVal = Arrays.asList("" + i);
+      partVals.add(partVal);
+      StorageDescriptor psd = new StorageDescriptor(sd);
+      psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+      Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+          Collections.<String, String> emptyMap());
+      store.addPartition(part);
+      ColumnStatistics cs = new ColumnStatistics();
+      ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+      desc.setLastAnalyzed(now);
+      desc.setPartName("ds=" + partVal);
+      cs.setStatsDesc(desc);
+      ColumnStatisticsObj obj = new ColumnStatisticsObj();
+      obj.setColName("col1");
+      obj.setColType("long");
+      ColumnStatisticsData data = new ColumnStatisticsData();
+      LongColumnStatsData dcsd = new LongColumnStatsData();
+      dcsd.setHighValue(1000 + i);
+      dcsd.setLowValue(-1000 - i);
+      dcsd.setNumNulls(i);
+      dcsd.setNumDVs(10 * i + 1);
+      dcsd.setBitVectors(bitVectors[0]);
+      data.setLongStats(dcsd);
+      obj.setStatsData(data);
+      cs.addToStatsObj(obj);
+      store.updatePartitionColumnStatistics(cs, partVal);
+    }
+
+    Checker statChecker = new Checker() {
+      @Override
+      public void checkStats(AggrStats aggrStats) throws Exception {
+        Assert.assertEquals(10, aggrStats.getPartsFound());
+        Assert.assertEquals(1, aggrStats.getColStatsSize());
+        ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+        Assert.assertEquals("col1", cso.getColName());
+        Assert.assertEquals("long", cso.getColType());
+        LongColumnStatsData lcsd = cso.getStatsData().getLongStats();
+        Assert.assertEquals(1009, lcsd.getHighValue(), 0.01);
+        Assert.assertEquals(-1009, lcsd.getLowValue(), 0.01);
+        Assert.assertEquals(45, lcsd.getNumNulls());
+        Assert.assertEquals(3, lcsd.getNumDVs());
+      }
+    };
+    List<String> partNames = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      partNames.add("ds=" + i);
+    }
+    AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+        Arrays.asList("col1"));
+    statChecker.checkStats(aggrStats);
+  }
+
+  @Test
+  public void noPartitionsHaveBitVectorStatus() throws Exception {
+    String dbName = "default";
+    String tableName = "snp";
+    long now = System.currentTimeMillis();
+    List<FieldSchema> cols = new ArrayList<>();
+    cols.add(new FieldSchema("col2", "long", "nocomment"));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+        serde, null, null, Collections.<String, String> emptyMap());
+    List<FieldSchema> partCols = new ArrayList<>();
+    partCols.add(new FieldSchema("ds", "string", ""));
+    Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+        Collections.<String, String> emptyMap(), null, null, null);
+    store.createTable(table);
+
+    List<List<String>> partVals = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      List<String> partVal = Arrays.asList("" + i);
+      partVals.add(partVal);
+      StorageDescriptor psd = new StorageDescriptor(sd);
+      psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+      Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+          Collections.<String, String> emptyMap());
+      store.addPartition(part);
+      ColumnStatistics cs = new ColumnStatistics();
+      ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+      desc.setLastAnalyzed(now);
+      desc.setPartName("ds=" + partVal);
+      cs.setStatsDesc(desc);
+      ColumnStatisticsObj obj = new ColumnStatisticsObj();
+      obj.setColName("col2");
+      obj.setColType("long");
+      ColumnStatisticsData data = new ColumnStatisticsData();
+      LongColumnStatsData dcsd = new LongColumnStatsData();
+      dcsd.setHighValue(1000 + i);
+      dcsd.setLowValue(-1000 - i);
+      dcsd.setNumNulls(i);
+      dcsd.setNumDVs(10 * i + 1);
+      data.setLongStats(dcsd);
+      obj.setStatsData(data);
+      cs.addToStatsObj(obj);
+      store.updatePartitionColumnStatistics(cs, partVal);
+    }
+
+    Checker statChecker = new Checker() {
+      @Override
+      public void checkStats(AggrStats aggrStats) throws Exception {
+        Assert.assertEquals(10, aggrStats.getPartsFound());
+        Assert.assertEquals(1, aggrStats.getColStatsSize());
+        ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+        Assert.assertEquals("col2", cso.getColName());
+        Assert.assertEquals("long", cso.getColType());
+        LongColumnStatsData lcsd = cso.getStatsData().getLongStats();
+        Assert.assertEquals(1009, lcsd.getHighValue(), 0.01);
+        Assert.assertEquals(-1009, lcsd.getLowValue(), 0.01);
+        Assert.assertEquals(45, lcsd.getNumNulls());
+        Assert.assertEquals(91, lcsd.getNumDVs());
+      }
+    };
+    List<String> partNames = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      partNames.add("ds=" + i);
+    }
+    AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+        Arrays.asList("col2"));
+    statChecker.checkStats(aggrStats);
+  }
+
+  @Test
+  public void TwoEndsOfPartitionsHaveBitVectorStatus() throws Exception {
+    String dbName = "default";
+    String tableName = "snp";
+    long now = System.currentTimeMillis();
+    List<FieldSchema> cols = new ArrayList<>();
+    cols.add(new FieldSchema("col3", "long", "nocomment"));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+        serde, null, null, Collections.<String, String> emptyMap());
+    List<FieldSchema> partCols = new ArrayList<>();
+    partCols.add(new FieldSchema("ds", "string", ""));
+    Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+        Collections.<String, String> emptyMap(), null, null, null);
+    store.createTable(table);
+
+    List<List<String>> partVals = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      List<String> partVal = Arrays.asList("" + i);
+      partVals.add(partVal);
+      StorageDescriptor psd = new StorageDescriptor(sd);
+      psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+      Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+          Collections.<String, String> emptyMap());
+      store.addPartition(part);
+      if (i < 2 || i > 7) {
+        ColumnStatistics cs = new ColumnStatistics();
+        ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+        desc.setLastAnalyzed(now);
+        desc.setPartName("ds=" + partVal);
+        cs.setStatsDesc(desc);
+        ColumnStatisticsObj obj = new ColumnStatisticsObj();
+        obj.setColName("col3");
+        obj.setColType("long");
+        ColumnStatisticsData data = new ColumnStatisticsData();
+        LongColumnStatsData dcsd = new LongColumnStatsData();
+        dcsd.setHighValue(1000 + i);
+        dcsd.setLowValue(-1000 - i);
+        dcsd.setNumNulls(i);
+        dcsd.setNumDVs(10 * i + 1);
+        dcsd.setBitVectors(bitVectors[i / 5]);
+        data.setLongStats(dcsd);
+        obj.setStatsData(data);
+        cs.addToStatsObj(obj);
+        store.updatePartitionColumnStatistics(cs, partVal);
+      }
+    }
+
+    Checker statChecker = new Checker() {
+      @Override
+      public void checkStats(AggrStats aggrStats) throws Exception {
+        Assert.assertEquals(4, aggrStats.getPartsFound());
+        Assert.assertEquals(1, aggrStats.getColStatsSize());
+        ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+        Assert.assertEquals("col3", cso.getColName());
+        Assert.assertEquals("long", cso.getColType());
+        LongColumnStatsData lcsd = cso.getStatsData().getLongStats();
+        Assert.assertEquals(1010, lcsd.getHighValue(), 0.01);
+        Assert.assertEquals(-1010, lcsd.getLowValue(), 0.01);
+        Assert.assertEquals(45, lcsd.getNumNulls());
+        Assert.assertEquals(12, lcsd.getNumDVs());
+      }
+    };
+    List<String> partNames = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      partNames.add("ds=" + i);
+    }
+    AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+        Arrays.asList("col3"));
+    statChecker.checkStats(aggrStats);
+  }
+
+  @Test
+  public void MiddleOfPartitionsHaveBitVectorStatus() throws Exception {
+    String dbName = "default";
+    String tableName = "snp";
+    long now = System.currentTimeMillis();
+    List<FieldSchema> cols = new ArrayList<>();
+    cols.add(new FieldSchema("col4", "long", "nocomment"));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+        serde, null, null, Collections.<String, String> emptyMap());
+    List<FieldSchema> partCols = new ArrayList<>();
+    partCols.add(new FieldSchema("ds", "string", ""));
+    Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+        Collections.<String, String> emptyMap(), null, null, null);
+    store.createTable(table);
+
+    List<List<String>> partVals = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      List<String> partVal = Arrays.asList("" + i);
+      partVals.add(partVal);
+      StorageDescriptor psd = new StorageDescriptor(sd);
+      psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+      Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+          Collections.<String, String> emptyMap());
+      store.addPartition(part);
+      if (i > 2 && i < 7) {
+        ColumnStatistics cs = new ColumnStatistics();
+        ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+        desc.setLastAnalyzed(now);
+        desc.setPartName("ds=" + partVal);
+        cs.setStatsDesc(desc);
+        ColumnStatisticsObj obj = new ColumnStatisticsObj();
+        obj.setColName("col4");
+        obj.setColType("long");
+        ColumnStatisticsData data = new ColumnStatisticsData();
+        LongColumnStatsData dcsd = new LongColumnStatsData();
+        dcsd.setHighValue(1000 + i);
+        dcsd.setLowValue(-1000 - i);
+        dcsd.setNumNulls(i);
+        dcsd.setNumDVs(10 * i + 1);
+        dcsd.setBitVectors(bitVectors[0]);
+        data.setLongStats(dcsd);
+        obj.setStatsData(data);
+        cs.addToStatsObj(obj);
+        store.updatePartitionColumnStatistics(cs, partVal);
+      }
+    }
+
+    Checker statChecker = new Checker() {
+      @Override
+      public void checkStats(AggrStats aggrStats) throws Exception {
+        Assert.assertEquals(4, aggrStats.getPartsFound());
+        Assert.assertEquals(1, aggrStats.getColStatsSize());
+        ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+        Assert.assertEquals("col4", cso.getColName());
+        Assert.assertEquals("long", cso.getColType());
+        LongColumnStatsData lcsd = cso.getStatsData().getLongStats();
+        Assert.assertEquals(1006, lcsd.getHighValue(), 0.01);
+        Assert.assertEquals(-1006, lcsd.getLowValue(), 0.01);
+        Assert.assertEquals(45, lcsd.getNumNulls());
+        Assert.assertEquals(3, lcsd.getNumDVs());
+      }
+    };
+    List<String> partNames = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      partNames.add("ds=" + i);
+    }
+    AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+        Arrays.asList("col4"));
+    statChecker.checkStats(aggrStats);
+  }
+
+  @Test
+  public void TwoEndsAndMiddleOfPartitionsHaveBitVectorStatusLong() throws Exception {
+    String dbName = "default";
+    String tableName = "snp";
+    long now = System.currentTimeMillis();
+    List<FieldSchema> cols = new ArrayList<>();
+    cols.add(new FieldSchema("col5_long", "long", "nocomment"));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+        serde, null, null, Collections.<String, String> emptyMap());
+    List<FieldSchema> partCols = new ArrayList<>();
+    partCols.add(new FieldSchema("ds", "string", ""));
+    Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+        Collections.<String, String> emptyMap(), null, null, null);
+    store.createTable(table);
+
+    List<List<String>> partVals = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      List<String> partVal = Arrays.asList("" + i);
+      partVals.add(partVal);
+      StorageDescriptor psd = new StorageDescriptor(sd);
+      psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+      Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+          Collections.<String, String> emptyMap());
+      store.addPartition(part);
+      if (i == 0 || i == 2 || i == 3 || i == 5 || i == 6 || i == 8) {
+        ColumnStatistics cs = new ColumnStatistics();
+        ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+        desc.setLastAnalyzed(now);
+        desc.setPartName("ds=" + partVal);
+        cs.setStatsDesc(desc);
+        ColumnStatisticsObj obj = new ColumnStatisticsObj();
+        obj.setColName("col5_long");
+        obj.setColType("long");
+        ColumnStatisticsData data = new ColumnStatisticsData();
+        LongColumnStatsData dcsd = new LongColumnStatsData();
+        dcsd.setHighValue(1000 + i);
+        dcsd.setLowValue(-1000 - i);
+        dcsd.setNumNulls(i);
+        dcsd.setNumDVs(10 * i + 1);
+        dcsd.setBitVectors(bitVectors[i / 5]);
+        data.setLongStats(dcsd);
+        obj.setStatsData(data);
+        cs.addToStatsObj(obj);
+        store.updatePartitionColumnStatistics(cs, partVal);
+      }
+    }
+
+    Checker statChecker = new Checker() {
+      @Override
+      public void checkStats(AggrStats aggrStats) throws Exception {
+        Assert.assertEquals(6, aggrStats.getPartsFound());
+        Assert.assertEquals(1, aggrStats.getColStatsSize());
+        ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+        Assert.assertEquals("col5_long", cso.getColName());
+        Assert.assertEquals("long", cso.getColType());
+        LongColumnStatsData lcsd = cso.getStatsData().getLongStats();
+        Assert.assertEquals(1010, lcsd.getHighValue(), 0.01);
+        Assert.assertEquals(-1010, lcsd.getLowValue(), 0.01);
+        Assert.assertEquals(40, lcsd.getNumNulls());
+        Assert.assertEquals(12, lcsd.getNumDVs());
+      }
+    };
+    List<String> partNames = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      partNames.add("ds=" + i);
+    }
+    AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+        Arrays.asList("col5_long"));
+    statChecker.checkStats(aggrStats);
+  }
+  
+  @Test
+  public void TwoEndsAndMiddleOfPartitionsHaveBitVectorStatusDecimal() throws Exception {
+    String dbName = "default";
+    String tableName = "snp";
+    long now = System.currentTimeMillis();
+    List<FieldSchema> cols = new ArrayList<>();
+    cols.add(new FieldSchema("col5_decimal", "decimal", "nocomment"));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+        serde, null, null, Collections.<String, String> emptyMap());
+    List<FieldSchema> partCols = new ArrayList<>();
+    partCols.add(new FieldSchema("ds", "string", ""));
+    Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+        Collections.<String, String> emptyMap(), null, null, null);
+    store.createTable(table);
+
+    List<List<String>> partVals = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      List<String> partVal = Arrays.asList("" + i);
+      partVals.add(partVal);
+      StorageDescriptor psd = new StorageDescriptor(sd);
+      psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+      Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+          Collections.<String, String> emptyMap());
+      store.addPartition(part);
+      if (i == 0 || i == 2 || i == 3 || i == 5 || i == 6 || i == 8) {
+        ColumnStatistics cs = new ColumnStatistics();
+        ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+        desc.setLastAnalyzed(now);
+        desc.setPartName("ds=" + partVal);
+        cs.setStatsDesc(desc);
+        ColumnStatisticsObj obj = new ColumnStatisticsObj();
+        obj.setColName("col5_decimal");
+        obj.setColType("decimal");
+        ColumnStatisticsData data = new ColumnStatisticsData();
+        DecimalColumnStatsData dcsd = new DecimalColumnStatsData();
+        dcsd.setHighValue(StatObjectConverter.createThriftDecimal("" + (1000 + i)));
+        dcsd.setLowValue(StatObjectConverter.createThriftDecimal("" + (-1000 - i)));
+        dcsd.setNumNulls(i);
+        dcsd.setNumDVs(10 * i + 1);
+        dcsd.setBitVectors(bitVectors[i / 5]);
+        data.setDecimalStats(dcsd);
+        obj.setStatsData(data);
+        cs.addToStatsObj(obj);
+        store.updatePartitionColumnStatistics(cs, partVal);
+      }
+    }
+
+    Checker statChecker = new Checker() {
+      @Override
+      public void checkStats(AggrStats aggrStats) throws Exception {
+        Assert.assertEquals(6, aggrStats.getPartsFound());
+        Assert.assertEquals(1, aggrStats.getColStatsSize());
+        ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+        Assert.assertEquals("col5_decimal", cso.getColName());
+        Assert.assertEquals("decimal", cso.getColType());
+        DecimalColumnStatsData lcsd = cso.getStatsData().getDecimalStats();
+        Assert.assertEquals(1010, HBaseUtils.getDoubleValue(lcsd.getHighValue()), 0.01);
+        Assert.assertEquals(-1010, HBaseUtils.getDoubleValue(lcsd.getLowValue()), 0.01);
+        Assert.assertEquals(40, lcsd.getNumNulls());
+        Assert.assertEquals(12, lcsd.getNumDVs());
+      }
+    };
+    List<String> partNames = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      partNames.add("ds=" + i);
+    }
+    AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+        Arrays.asList("col5_decimal"));
+    statChecker.checkStats(aggrStats);
+  }
+
+  @Test
+  public void TwoEndsAndMiddleOfPartitionsHaveBitVectorStatusDouble() throws Exception {
+    String dbName = "default";
+    String tableName = "snp";
+    long now = System.currentTimeMillis();
+    List<FieldSchema> cols = new ArrayList<>();
+    cols.add(new FieldSchema("col5_double", "double", "nocomment"));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+        serde, null, null, Collections.<String, String> emptyMap());
+    List<FieldSchema> partCols = new ArrayList<>();
+    partCols.add(new FieldSchema("ds", "string", ""));
+    Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+        Collections.<String, String> emptyMap(), null, null, null);
+    store.createTable(table);
+
+    List<List<String>> partVals = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      List<String> partVal = Arrays.asList("" + i);
+      partVals.add(partVal);
+      StorageDescriptor psd = new StorageDescriptor(sd);
+      psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+      Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+          Collections.<String, String> emptyMap());
+      store.addPartition(part);
+      if (i == 0 || i == 2 || i == 3 || i == 5 || i == 6 || i == 8) {
+        ColumnStatistics cs = new ColumnStatistics();
+        ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+        desc.setLastAnalyzed(now);
+        desc.setPartName("ds=" + partVal);
+        cs.setStatsDesc(desc);
+        ColumnStatisticsObj obj = new ColumnStatisticsObj();
+        obj.setColName("col5_double");
+        obj.setColType("double");
+        ColumnStatisticsData data = new ColumnStatisticsData();
+        DoubleColumnStatsData dcsd = new DoubleColumnStatsData();
+        dcsd.setHighValue(1000 + i);
+        dcsd.setLowValue(-1000 - i);
+        dcsd.setNumNulls(i);
+        dcsd.setNumDVs(10 * i + 1);
+        dcsd.setBitVectors(bitVectors[i / 5]);
+        data.setDoubleStats(dcsd);
+        obj.setStatsData(data);
+        cs.addToStatsObj(obj);
+        store.updatePartitionColumnStatistics(cs, partVal);
+      }
+    }
+
+    Checker statChecker = new Checker() {
+      @Override
+      public void checkStats(AggrStats aggrStats) throws Exception {
+        Assert.assertEquals(6, aggrStats.getPartsFound());
+        Assert.assertEquals(1, aggrStats.getColStatsSize());
+        ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+        Assert.assertEquals("col5_double", cso.getColName());
+        Assert.assertEquals("double", cso.getColType());
+        DoubleColumnStatsData lcsd = cso.getStatsData().getDoubleStats();
+        Assert.assertEquals(1010, lcsd.getHighValue(), 0.01);
+        Assert.assertEquals(-1010, lcsd.getLowValue(), 0.01);
+        Assert.assertEquals(40, lcsd.getNumNulls());
+        Assert.assertEquals(12, lcsd.getNumDVs());
+      }
+    };
+    List<String> partNames = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      partNames.add("ds=" + i);
+    }
+    AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+        Arrays.asList("col5_double"));
+    statChecker.checkStats(aggrStats);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/ql/src/test/results/clientpositive/tez/explainuser_1.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/tez/explainuser_1.q.out b/ql/src/test/results/clientpositive/tez/explainuser_1.q.out
index b501f97..0eb9132 100644
--- a/ql/src/test/results/clientpositive/tez/explainuser_1.q.out
+++ b/ql/src/test/results/clientpositive/tez/explainuser_1.q.out
@@ -426,9 +426,9 @@ Stage-0
                                           <-Map 8 [SIMPLE_EDGE]
                                             SHUFFLE [RS_15]
                                               PartitionCols:_col0, _col1, _col2
-                                              Group By Operator [GBY_14] (rows=1 width=101)
+                                              Group By Operator [GBY_14] (rows=2 width=101)
                                                 Output:["_col0","_col1","_col2","_col3"],aggregations:["sum(c_int)"],keys:key, c_int, c_float
-                                                Filter Operator [FIL_49] (rows=3 width=93)
+                                                Filter Operator [FIL_49] (rows=5 width=74)
                                                   predicate:((((c_int + 1) >= 0) and ((c_int > 0) or (c_float >= 0.0))) and key is not null)
                                                   TableScan [TS_11] (rows=20 width=83)
                                                     default@cbo_t2,cbo_t2,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]
@@ -446,9 +446,9 @@ Stage-0
                                           <-Map 1 [SIMPLE_EDGE]
                                             SHUFFLE [RS_4]
                                               PartitionCols:_col0, _col1, _col2
-                                              Group By Operator [GBY_3] (rows=1 width=101)
+                                              Group By Operator [GBY_3] (rows=2 width=101)
                                                 Output:["_col0","_col1","_col2","_col3"],aggregations:["sum(c_int)"],keys:key, c_int, c_float
-                                                Filter Operator [FIL_48] (rows=3 width=93)
+                                                Filter Operator [FIL_48] (rows=5 width=74)
                                                   predicate:((((c_int + 1) >= 0) and ((c_int > 0) or (c_float >= 0.0))) and key is not null)
                                                   TableScan [TS_0] (rows=20 width=83)
                                                     default@cbo_t1,cbo_t1,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]
@@ -1201,11 +1201,11 @@ Stage-0
     Stage-1
       Reducer 3
       File Output Operator [FS_19]
-        Select Operator [SEL_18] (rows=21 width=101)
+        Select Operator [SEL_18] (rows=36 width=101)
           Output:["_col0","_col1","_col2","_col3","_col4"]
-          Filter Operator [FIL_17] (rows=21 width=101)
+          Filter Operator [FIL_17] (rows=36 width=101)
             predicate:((_col1 > 0) or (_col6 >= 0))
-            Merge Join Operator [MERGEJOIN_28] (rows=21 width=101)
+            Merge Join Operator [MERGEJOIN_28] (rows=36 width=101)
               Conds:RS_14._col0=RS_15._col0(Inner),Output:["_col1","_col2","_col3","_col4","_col6"]
             <-Map 5 [SIMPLE_EDGE]
               SHUFFLE [RS_15]
@@ -1219,25 +1219,25 @@ Stage-0
             <-Reducer 2 [SIMPLE_EDGE]
               SHUFFLE [RS_14]
                 PartitionCols:_col0
-                Filter Operator [FIL_9] (rows=6 width=182)
+                Filter Operator [FIL_9] (rows=10 width=182)
                   predicate:(((_col1 + _col4) = 2) and ((_col4 + 1) = 2))
-                  Merge Join Operator [MERGEJOIN_27] (rows=25 width=182)
+                  Merge Join Operator [MERGEJOIN_27] (rows=40 width=182)
                     Conds:RS_6._col0=RS_7._col0(Left Outer),Output:["_col0","_col1","_col2","_col3","_col4"]
                   <-Map 1 [SIMPLE_EDGE]
                     SHUFFLE [RS_6]
                       PartitionCols:_col0
-                      Select Operator [SEL_2] (rows=5 width=74)
+                      Select Operator [SEL_2] (rows=9 width=82)
                         Output:["_col0","_col1","_col2"]
-                        Filter Operator [FIL_24] (rows=5 width=74)
+                        Filter Operator [FIL_24] (rows=9 width=82)
                           predicate:((((c_int + 1) = 2) and ((c_int > 0) or (c_float >= 0.0))) and key is not null)
                           TableScan [TS_0] (rows=20 width=83)
                             default@cbo_t1,cbo_t1,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]
                   <-Map 4 [SIMPLE_EDGE]
                     SHUFFLE [RS_7]
                       PartitionCols:_col0
-                      Select Operator [SEL_5] (rows=5 width=71)
+                      Select Operator [SEL_5] (rows=9 width=79)
                         Output:["_col0","_col1"]
-                        Filter Operator [FIL_25] (rows=5 width=74)
+                        Filter Operator [FIL_25] (rows=9 width=82)
                           predicate:((((c_int + 1) = 2) and ((c_int > 0) or (c_float >= 0.0))) and key is not null)
                           TableScan [TS_3] (rows=20 width=83)
                             default@cbo_t2,cbo_t2,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]
@@ -1257,27 +1257,27 @@ Stage-0
     Stage-1
       Reducer 2
       File Output Operator [FS_14]
-        Select Operator [SEL_13] (rows=24 width=101)
+        Select Operator [SEL_13] (rows=50 width=101)
           Output:["_col0","_col1","_col2","_col3","_col4"]
-          Filter Operator [FIL_12] (rows=24 width=101)
+          Filter Operator [FIL_12] (rows=50 width=101)
             predicate:(((_col1 + _col4) = 2) and ((_col1 > 0) or (_col6 >= 0)) and ((_col4 + 1) = 2))
-            Merge Join Operator [MERGEJOIN_19] (rows=72 width=101)
+            Merge Join Operator [MERGEJOIN_19] (rows=200 width=101)
               Conds:RS_8._col0=RS_9._col0(Right Outer),RS_8._col0=RS_10._col0(Right Outer),Output:["_col1","_col2","_col3","_col4","_col6"]
             <-Map 1 [SIMPLE_EDGE]
               SHUFFLE [RS_8]
                 PartitionCols:_col0
-                Select Operator [SEL_2] (rows=6 width=77)
+                Select Operator [SEL_2] (rows=10 width=83)
                   Output:["_col0","_col1","_col2"]
-                  Filter Operator [FIL_17] (rows=6 width=77)
+                  Filter Operator [FIL_17] (rows=10 width=83)
                     predicate:(((c_int + 1) = 2) and ((c_int > 0) or (c_float >= 0.0)))
                     TableScan [TS_0] (rows=20 width=83)
                       default@cbo_t1,cbo_t1,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]
             <-Map 3 [SIMPLE_EDGE]
               SHUFFLE [RS_9]
                 PartitionCols:_col0
-                Select Operator [SEL_5] (rows=6 width=74)
+                Select Operator [SEL_5] (rows=10 width=80)
                   Output:["_col0","_col1"]
-                  Filter Operator [FIL_18] (rows=6 width=77)
+                  Filter Operator [FIL_18] (rows=10 width=83)
                     predicate:(((c_int + 1) = 2) and ((c_int > 0) or (c_float >= 0.0)))
                     TableScan [TS_3] (rows=20 width=83)
                       default@cbo_t2,cbo_t2,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]
@@ -1509,53 +1509,53 @@ Stage-0
                                 Output:["_col0","_col1","_col2"]
                                 Filter Operator [FIL_31] (rows=1 width=101)
                                   predicate:((_col1 + _col4) >= 0)
-                                  Merge Join Operator [MERGEJOIN_60] (rows=1 width=101)
+                                  Merge Join Operator [MERGEJOIN_60] (rows=2 width=101)
                                     Conds:RS_28._col0=RS_29._col0(Inner),Output:["_col0","_col1","_col2","_col4"]
                                   <-Reducer 10 [SIMPLE_EDGE]
                                     SHUFFLE [RS_29]
                                       PartitionCols:_col0
-                                      Filter Operator [FIL_26] (rows=1 width=105)
+                                      Filter Operator [FIL_26] (rows=2 width=62)
                                         predicate:_col0 is not null
-                                        Limit [LIM_24] (rows=1 width=105)
+                                        Limit [LIM_24] (rows=3 width=76)
                                           Number of rows:5
-                                          Select Operator [SEL_23] (rows=1 width=105)
+                                          Select Operator [SEL_23] (rows=3 width=76)
                                             Output:["_col0","_col1"]
                                           <-Reducer 9 [SIMPLE_EDGE]
                                             SHUFFLE [RS_22]
-                                              Select Operator [SEL_20] (rows=1 width=105)
+                                              Select Operator [SEL_20] (rows=3 width=76)
                                                 Output:["_col0","_col1","_col2","_col3"]
-                                                Group By Operator [GBY_19] (rows=1 width=101)
+                                                Group By Operator [GBY_19] (rows=3 width=70)
                                                   Output:["_col0","_col1","_col2","_col3"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0, KEY._col1, KEY._col2
                                                 <-Map 8 [SIMPLE_EDGE]
                                                   SHUFFLE [RS_18]
                                                     PartitionCols:_col0, _col1, _col2
-                                                    Group By Operator [GBY_17] (rows=1 width=101)
+                                                    Group By Operator [GBY_17] (rows=3 width=70)
                                                       Output:["_col0","_col1","_col2","_col3"],aggregations:["sum(c_int)"],keys:key, c_int, c_float
-                                                      Filter Operator [FIL_58] (rows=4 width=93)
+                                                      Filter Operator [FIL_58] (rows=6 width=77)
                                                         predicate:(((c_int + 1) >= 0) and ((c_int > 0) or (c_float >= 0.0)))
                                                         TableScan [TS_14] (rows=20 width=83)
                                                           default@cbo_t2,cbo_t2,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]
                                   <-Reducer 3 [SIMPLE_EDGE]
                                     SHUFFLE [RS_28]
                                       PartitionCols:_col0
-                                      Filter Operator [FIL_12] (rows=1 width=97)
+                                      Filter Operator [FIL_12] (rows=2 width=54)
                                         predicate:_col0 is not null
-                                        Limit [LIM_10] (rows=1 width=97)
+                                        Limit [LIM_10] (rows=3 width=68)
                                           Number of rows:5
-                                          Select Operator [SEL_9] (rows=1 width=97)
+                                          Select Operator [SEL_9] (rows=3 width=68)
                                             Output:["_col0","_col1","_col2"]
                                           <-Reducer 2 [SIMPLE_EDGE]
                                             SHUFFLE [RS_8]
-                                              Select Operator [SEL_6] (rows=1 width=97)
+                                              Select Operator [SEL_6] (rows=3 width=68)
                                                 Output:["_col0","_col1","_col2"]
-                                                Group By Operator [GBY_5] (rows=1 width=101)
+                                                Group By Operator [GBY_5] (rows=3 width=70)
                                                   Output:["_col0","_col1","_col2","_col3"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0, KEY._col1, KEY._col2
                                                 <-Map 1 [SIMPLE_EDGE]
                                                   SHUFFLE [RS_4]
                                                     PartitionCols:_col0, _col1, _col2
-                                                    Group By Operator [GBY_3] (rows=1 width=101)
+                                                    Group By Operator [GBY_3] (rows=3 width=70)
                                                       Output:["_col0","_col1","_col2","_col3"],aggregations:["sum(c_int)"],keys:key, c_int, c_float
-                                                      Filter Operator [FIL_56] (rows=4 width=93)
+                                                      Filter Operator [FIL_56] (rows=6 width=77)
                                                         predicate:(((c_int + 1) >= 0) and ((c_int > 0) or (c_float >= 0.0)))
                                                         TableScan [TS_0] (rows=20 width=83)
                                                           default@cbo_t1,cbo_t1,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]
@@ -1575,16 +1575,16 @@ Stage-0
     Stage-1
       Reducer 2
       File Output Operator [FS_12]
-        Select Operator [SEL_11] (rows=6 width=4)
+        Select Operator [SEL_11] (rows=11 width=4)
           Output:["_col0"]
-          Merge Join Operator [MERGEJOIN_17] (rows=6 width=4)
+          Merge Join Operator [MERGEJOIN_17] (rows=11 width=4)
             Conds:RS_8._col0=RS_9._col0(Left Semi),Output:["_col1"]
           <-Map 1 [SIMPLE_EDGE]
             SHUFFLE [RS_8]
               PartitionCols:_col0
-              Select Operator [SEL_2] (rows=5 width=74)
+              Select Operator [SEL_2] (rows=9 width=82)
                 Output:["_col0","_col1"]
-                Filter Operator [FIL_15] (rows=5 width=74)
+                Filter Operator [FIL_15] (rows=9 width=82)
                   predicate:((((c_int + 1) = 2) and ((c_int > 0) or (c_float >= 0.0))) and key is not null)
                   TableScan [TS_0] (rows=20 width=83)
                     default@cbo_t1,cbo_t1,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]
@@ -1615,27 +1615,27 @@ Stage-0
     Stage-1
       Reducer 2
       File Output Operator [FS_18]
-        Select Operator [SEL_17] (rows=12 width=93)
+        Select Operator [SEL_17] (rows=16 width=93)
           Output:["_col0","_col1","_col2"]
-          Merge Join Operator [MERGEJOIN_28] (rows=12 width=93)
+          Merge Join Operator [MERGEJOIN_28] (rows=16 width=93)
             Conds:RS_13._col0=RS_14._col0(Left Semi),RS_13._col0=RS_15._col0(Left Semi),Output:["_col0","_col1","_col2"]
           <-Map 1 [SIMPLE_EDGE]
             SHUFFLE [RS_13]
               PartitionCols:_col0
-              Select Operator [SEL_2] (rows=5 width=74)
+              Select Operator [SEL_2] (rows=9 width=82)
                 Output:["_col0","_col1","_col2"]
-                Filter Operator [FIL_25] (rows=5 width=74)
+                Filter Operator [FIL_25] (rows=9 width=82)
                   predicate:((((c_int + 1) = 2) and ((c_int > 0) or (c_float >= 0.0))) and key is not null)
                   TableScan [TS_0] (rows=20 width=83)
                     default@cbo_t1,cbo_t1,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]
           <-Map 3 [SIMPLE_EDGE]
             SHUFFLE [RS_14]
               PartitionCols:_col0
-              Group By Operator [GBY_10] (rows=2 width=85)
+              Group By Operator [GBY_10] (rows=3 width=85)
                 Output:["_col0"],keys:_col0
-                Select Operator [SEL_5] (rows=5 width=68)
+                Select Operator [SEL_5] (rows=9 width=75)
                   Output:["_col0"]
-                  Filter Operator [FIL_26] (rows=5 width=74)
+                  Filter Operator [FIL_26] (rows=9 width=82)
                     predicate:((((c_int + 1) = 2) and ((c_int > 0) or (c_float >= 0.0))) and key is not null)
                     TableScan [TS_3] (rows=20 width=83)
                       default@cbo_t2,cbo_t2,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]