You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2019/01/23 10:55:10 UTC

[2/2] hive git commit: HIVE-21078: Replicate column and table level statistics for unpartitioned Hive tables (Ashutosh Bapat, reviewed by Sankar Hariappan)

HIVE-21078: Replicate column and table level statistics for unpartitioned Hive tables (Ashutosh Bapat, reviewed by Sankar Hariappan)

Signed-off-by: Sankar Hariappan <sa...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2ffca04a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2ffca04a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2ffca04a

Branch: refs/heads/master
Commit: 2ffca04a8b58979b4995a5a6eb264f8f59d9b425
Parents: eba9646
Author: Ashutosh Bapat <ab...@cloudera.com>
Authored: Wed Jan 23 16:21:04 2019 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Wed Jan 23 16:24:28 2019 +0530

----------------------------------------------------------------------
 .../listener/DbNotificationListener.java        |   1 +
 .../hive/ql/parse/TestReplicationScenarios.java |   1 +
 .../TestReplicationScenariosAcidTables.java     |   1 +
 ...TestReplicationScenariosAcrossInstances.java |   1 +
 .../ql/parse/TestStatsReplicationScenarios.java | 306 +++++++++++++++++++
 ...stStatsReplicationScenariosNoAutogather.java |  55 ++++
 .../hadoop/hive/ql/parse/WarehouseInstance.java |  41 +++
 .../hive/ql/exec/ColumnStatsUpdateTask.java     |  32 +-
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |  13 +-
 .../apache/hadoop/hive/ql/exec/MoveTask.java    |  18 +-
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java  |   6 +-
 .../events/filesystem/FSTableEvent.java         |   5 +
 .../repl/bootstrap/load/table/LoadTable.java    |   1 +
 .../apache/hadoop/hive/ql/metadata/Hive.java    |  41 ++-
 .../ql/metadata/SessionHiveMetaStoreClient.java |  22 +-
 .../apache/hadoop/hive/ql/metadata/Table.java   |  27 ++
 .../hive/ql/parse/ImportSemanticAnalyzer.java   |  11 +-
 .../hive/ql/parse/repl/dump/HiveWrapper.java    |   8 +-
 .../hive/ql/parse/repl/dump/TableExport.java    |   6 +-
 .../repl/dump/events/AlterTableHandler.java     |   9 +
 .../repl/dump/events/CreateTableHandler.java    |   8 +
 .../dump/events/UpdateTableColStatHandler.java  |  21 +-
 .../load/message/UpdateTableColStatHandler.java |  37 ++-
 .../hive/ql/plan/ColumnStatsUpdateWork.java     |  15 +
 .../hadoop/hive/ql/plan/CreateTableDesc.java    |  40 ++-
 .../hadoop/hive/ql/plan/ImportTableDesc.java    |   3 +-
 .../apache/hadoop/hive/ql/plan/MoveWork.java    |   9 +
 .../hadoop/hive/ql/exec/TestExecDriver.java     |   2 +-
 .../hive/metastore/api/GetTableRequest.java     | 111 ++++++-
 .../apache/hadoop/hive/metastore/api/Table.java | 116 ++++++-
 .../src/gen/thrift/gen-php/metastore/Types.php  |  51 ++++
 .../gen/thrift/gen-py/hive_metastore/ttypes.py  |  31 +-
 .../gen/thrift/gen-rb/hive_metastore_types.rb   |   8 +-
 .../hadoop/hive/common/StatsSetupConst.java     |  29 ++
 .../hive/metastore/HiveMetaStoreClient.java     |  17 ++
 .../hadoop/hive/metastore/IMetaStoreClient.java |  44 +++
 .../src/main/thrift/hive_metastore.thrift       |   6 +-
 .../hadoop/hive/metastore/HiveMetaStore.java    |  61 +++-
 .../hadoop/hive/metastore/ObjectStore.java      |   4 +-
 .../apache/hadoop/hive/metastore/RawStore.java  |   1 -
 .../hive/metastore/cache/CachedStore.java       |   1 +
 .../events/UpdateTableColumnStatEvent.java      |  11 +-
 .../metastore/messaging/MessageBuilder.java     |   5 +-
 .../messaging/UpdateTableColumnStatMessage.java |   4 +
 .../json/JSONUpdateTableColumnStatMessage.java  |  13 +-
 .../HiveMetaStoreClientPreCatalog.java          |  12 +
 .../apache/hadoop/hive/metastore/TestStats.java |  89 +++---
 47 files changed, 1236 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index fa7ab25..81b35a4 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -757,6 +757,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
   public void onUpdateTableColumnStat(UpdateTableColumnStatEvent updateTableColumnStatEvent) throws MetaException {
     UpdateTableColumnStatMessage msg = MessageBuilder.getInstance()
             .buildUpdateTableColumnStatMessage(updateTableColumnStatEvent.getColStats(),
+                    updateTableColumnStatEvent.getTableObj(),
                     updateTableColumnStatEvent.getTableParameters(),
                     updateTableColumnStatEvent.getValidWriteIds(), updateTableColumnStatEvent.getWriteId());
     NotificationEvent event = new NotificationEvent(0, now(), EventType.UPDATE_TABLE_COLUMN_STAT.toString(),

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index c85a2a4..6e9c443 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -570,6 +570,7 @@ public class TestReplicationScenarios {
       @Nullable
       @Override
       public Table apply(@Nullable Table table) {
+        LOG.info("Performing injection on table " + table.getTableName());
         if (table.getTableName().equalsIgnoreCase("ptned")){
           injectionPathCalled = true;
           return null;

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 4472a61..342985e 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -66,6 +66,7 @@ import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLI
  * TestReplicationScenariosAcidTables - test replication for ACID tables
  */
 public class TestReplicationScenariosAcidTables {
+
   @Rule
   public final TestName testName = new TestName();
 

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 0df99b3..1adec4e 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -305,6 +305,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     WarehouseInstance.Tuple tuple = primary
         .run("use " + primaryDbName)
         .run("create table t1 (id int)")
+        .run("insert into t1 values (1), (2)")
         .run("create table t2 (place string) partitioned by (country string)")
         .run("insert into table t2 partition(country='india') values ('bangalore')")
         .run("insert into table t2 partition(country='us') values ('austin')")

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java
new file mode 100644
index 0000000..8815a13
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
+import org.apache.hadoop.hive.shims.Utils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.junit.Assert;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
+
+/**
+ * Tests for statistics replication.
+ */
+public class TestStatsReplicationScenarios {
+  @Rule
+  public final TestName testName = new TestName();
+
+  protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
+  static WarehouseInstance primary;
+  private static WarehouseInstance replica;
+  private String primaryDbName, replicatedDbName;
+  private static HiveConf conf;
+  private static boolean hasAutogather;
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    Map<String, String> overrides = new HashMap<>();
+    overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+        GzipJSONMessageEncoder.class.getCanonicalName());
+
+    internalBeforeClassSetup(overrides, TestReplicationScenarios.class, true);
+  }
+
+  static void internalBeforeClassSetup(Map<String, String> overrides, Class clazz,
+                                       boolean autogather)
+      throws Exception {
+    conf = new HiveConf(clazz);
+    conf.set("dfs.client.use.datanode.hostname", "true");
+    conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
+    MiniDFSCluster miniDFSCluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+    Map<String, String> localOverrides = new HashMap<String, String>() {{
+        put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
+        put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true");
+      }};
+    localOverrides.putAll(overrides);
+    replica = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
+
+    // Run with autogather false on primary if requested
+    hasAutogather = autogather;
+    localOverrides.put(HiveConf.ConfVars.HIVESTATSAUTOGATHER.varname,
+                        autogather ? "true" : "false");
+    primary = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
+  }
+
+  @AfterClass
+  public static void classLevelTearDown() throws IOException {
+    primary.close();
+    replica.close();
+  }
+
+  @Before
+  public void setup() throws Throwable {
+    primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
+    replicatedDbName = "replicated_" + primaryDbName;
+    primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
+            SOURCE_OF_REPLICATION + "' = '1,2,3')");
+  }
+
+  @After
+  public void tearDown() throws Throwable {
+    primary.run("drop database if exists " + primaryDbName + " cascade");
+    replica.run("drop database if exists " + replicatedDbName + " cascade");
+  }
+
+
+  private Map<String, String> collectStatsParams(Map<String, String> allParams) {
+    Map<String, String> statsParams = new HashMap<String, String>();
+    List<String> params = new ArrayList<>(StatsSetupConst.SUPPORTED_STATS);
+    params.add(StatsSetupConst.COLUMN_STATS_ACCURATE);
+    for (String param : params) {
+      String value = allParams.get(param);
+      if (value != null) {
+        statsParams.put(param, value);
+      }
+    }
+
+    return statsParams;
+  }
+
+  private void verifyReplicatedStatsForTable(String tableName) throws Exception {
+    // Test column stats
+    Assert.assertEquals(primary.getTableColumnStatistics(primaryDbName, tableName),
+                        replica.getTableColumnStatistics(replicatedDbName, tableName));
+
+    // Test table level stats
+    Map<String, String> rParams =
+            collectStatsParams(replica.getTable(replicatedDbName, tableName).getParameters());
+    Map<String, String> pParams =
+            collectStatsParams(primary.getTable(primaryDbName, tableName).getParameters());
+    Assert.assertEquals(pParams, rParams);
+  }
+
+  private void verifyNoStatsReplicationForMetadataOnly(String tableName) throws Throwable {
+    // Test column stats
+    Assert.assertTrue(replica.getTableColumnStatistics(replicatedDbName, tableName).isEmpty());
+
+    // When no data is replicated, the basic stats parameters for table should look as if it's a
+    // new table created on replica. Based on the create table rules the basic stats may be true
+    // or false. Either is fine with us so don't bother checking exact values.
+    Map<String, String> rParams =
+            collectStatsParams(replica.getTable(replicatedDbName, tableName).getParameters());
+    List<String> params = new ArrayList<>(StatsSetupConst.SUPPORTED_STATS);
+    Map<String, String> expectedFalseParams = new HashMap<>();
+    Map<String, String> expectedTrueParams = new HashMap<>();
+    StatsSetupConst.setStatsStateForCreateTable(expectedTrueParams,
+            replica.getTableColNames(replicatedDbName, tableName), StatsSetupConst.TRUE);
+    StatsSetupConst.setStatsStateForCreateTable(expectedFalseParams,
+            replica.getTableColNames(replicatedDbName, tableName), StatsSetupConst.FALSE);
+    Assert.assertTrue(rParams.equals(expectedFalseParams) || rParams.equals(expectedTrueParams));
+  }
+
+  private List<String> createBootStrapData() throws Throwable {
+    String simpleTableName = "sTable";
+    String partTableName = "pTable";
+    String ndTableName = "ndTable";
+
+    primary.run("use " + primaryDbName)
+            .run("create table " + simpleTableName + " (id int)")
+            .run("insert into " + simpleTableName + " values (1), (2)")
+            .run("create table " + partTableName + " (place string) partitioned by (country string)")
+            .run("insert into table " + partTableName + " partition(country='india') values ('bangalore')")
+            .run("insert into table " + partTableName + " partition(country='us') values ('austin')")
+            .run("insert into table " + partTableName + " partition(country='france') values ('paris')")
+            .run("create table " + ndTableName + " (str string)");
+
+    List<String> tableNames = new ArrayList<String>(Arrays.asList(simpleTableName, partTableName,
+            ndTableName));
+
+    // Run analyze on each of the tables, if they are not being gathered automatically.
+    if (!hasAutogather) {
+      for (String name : tableNames) {
+        Assert.assertTrue(primary.getTableColumnStatistics(primaryDbName, name).isEmpty());
+        primary.run("use " + primaryDbName)
+                .run("analyze table " + name + " compute statistics for columns");
+      }
+    }
+
+    return tableNames;
+  }
+
+  /**
+   * Dumps primarydb on primary, loads it on replica as replicadb, verifies that the statistics
+   * loaded are same as the ones on primary.
+   * @param tableNames, names of tables on primary expected to be loaded
+   * @param lastReplicationId of the last dump, for incremental dump/load
+   * @param parallelLoad, if true, parallel bootstrap load is used
+   * @param metadataOnly, only metadata is dumped and loaded.
+   * @return lastReplicationId of the dump performed.
+   */
+  private String dumpLoadVerify(List<String> tableNames, String lastReplicationId,
+                                boolean parallelLoad, boolean metadataOnly)
+          throws Throwable {
+    List<String> withClauseList;
+    // Parallel load works only for bootstrap.
+    parallelLoad = parallelLoad && (lastReplicationId == null);
+
+    // With clause construction for REPL DUMP command.
+    if (metadataOnly) {
+      withClauseList = Collections.singletonList("'hive.repl.dump.metadata.only'='true'");
+    } else {
+      withClauseList = Collections.emptyList();
+    }
+
+    // Take dump
+    WarehouseInstance.Tuple dumpTuple = primary.run("use " + primaryDbName)
+            .dump(primaryDbName, lastReplicationId, withClauseList);
+
+    // Load, if necessary changing configuration.
+    if (parallelLoad && lastReplicationId == null) {
+      replica.hiveConf.setBoolVar(HiveConf.ConfVars.EXECPARALLEL, true);
+    }
+
+    replica.load(replicatedDbName, dumpTuple.dumpLocation)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(tableNames.toArray(new String[1]));
+
+    // Metadata load may not load all the events.
+    if (!metadataOnly) {
+      replica.run("repl status " + replicatedDbName)
+              .verifyResult(dumpTuple.lastReplicationId);
+    }
+
+    if (parallelLoad) {
+      replica.hiveConf.setBoolVar(HiveConf.ConfVars.EXECPARALLEL, false);
+    }
+
+    // Test statistics
+    for (String name : tableNames) {
+      if (metadataOnly) {
+        verifyNoStatsReplicationForMetadataOnly(name);
+      } else {
+        verifyReplicatedStatsForTable(name);
+      }
+    }
+
+    return dumpTuple.lastReplicationId;
+  }
+
+  private void createIncrementalData(List<String> tableNames) throws Throwable {
+    String simpleTableName = "sTable";
+    String partTableName = "pTable";
+    String ndTableName = "ndTable";
+
+    Assert.assertTrue(tableNames.containsAll(Arrays.asList(simpleTableName, partTableName,
+                                                         ndTableName)));
+    String incTableName = "iTable"; // New table
+
+    primary.run("use " + primaryDbName)
+            .run("insert into " + simpleTableName + " values (3), (4)")
+            // new data inserted into table
+            .run("insert into " + ndTableName + " values ('string1'), ('string2')")
+            // two partitions changed and one unchanged
+            .run("insert into table " + partTableName + " values ('india', 'pune')")
+            .run("insert into table " + partTableName + " values ('us', 'chicago')")
+            // new partition
+            .run("insert into table " + partTableName + " values ('australia', 'perth')")
+            .run("create table " + incTableName + " (config string, enabled boolean)")
+            .run("insert into " + incTableName + " values ('conf1', true)")
+            .run("insert into " + incTableName + " values ('conf2', false)");
+    tableNames.add(incTableName);
+
+    // Run analyze on each of the tables, if they are not being gathered automatically.
+    if (!hasAutogather) {
+      for (String name : tableNames) {
+        primary.run("use " + primaryDbName)
+                .run("analyze table " + name + " compute statistics for columns");
+      }
+    }
+
+  }
+
+  public void testStatsReplicationCommon(boolean parallelBootstrap, boolean metadataOnly) throws Throwable {
+    List<String> tableNames = createBootStrapData();
+    String lastReplicationId = dumpLoadVerify(tableNames, null, parallelBootstrap,
+            metadataOnly);
+
+    // Incremental dump
+    createIncrementalData(tableNames);
+    lastReplicationId = dumpLoadVerify(tableNames, lastReplicationId, parallelBootstrap,
+            metadataOnly);
+  }
+
+  @Test
+  public void testForNonAcidTables() throws Throwable {
+    testStatsReplicationCommon(false, false);
+  }
+
+  @Test
+  public void testForNonAcidTablesParallelBootstrapLoad() throws Throwable {
+    testStatsReplicationCommon(true, false);
+  }
+
+  @Test
+  public void testNonAcidMetadataOnlyDump() throws Throwable {
+    testStatsReplicationCommon(false, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
new file mode 100644
index 0000000..f58ddb8
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.junit.Assert;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests statistics replication when statistics are collected using ANALYZE command.
+ */
+public class TestStatsReplicationScenariosNoAutogather extends TestStatsReplicationScenarios {
+  @Rule
+  public final TestName testName = new TestName();
+
+  protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
+  static WarehouseInstance primary;
+  private static WarehouseInstance replica;
+  private String primaryDbName, replicatedDbName;
+  private static HiveConf conf;
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    Map<String, String> overrides = new HashMap<>();
+    overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+        GzipJSONMessageEncoder.class.getCanonicalName());
+
+    internalBeforeClassSetup(overrides, TestReplicationScenarios.class, false);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index bf4154c..b272f06 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
 import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -380,6 +381,46 @@ public class WarehouseInstance implements Closeable {
     }
   }
 
+  /**
+   * Get statistics for given set of columns of a given table in the given database.
+   * @param dbName - the database where the table resides
+   * @param tableName - tablename whose statistics are to be retrieved
+   * @return - list of ColumnStatisticsObj objects in the order of the specified columns
+   */
+  public List<ColumnStatisticsObj> getTableColumnStatistics(String dbName, String tableName) throws Exception {
+    return client.getTableColumnStatistics(dbName, tableName, getTableColNames(dbName, tableName));
+  }
+
+  /**
+   * @param dbName, database name
+   * @param tableName, table name
+   * @return - list of columns of given table in the given database.
+   * @throws Exception
+   */
+  public List<String> getTableColNames(String dbName, String tableName) throws Exception {
+    List<String> colNames = new ArrayList();
+    client.getSchema(dbName, tableName).forEach(fs -> colNames.add(fs.getName()));
+    return colNames;
+  }
+
+  /**
+   * Get statistics for given set of columns for all the partitions of a given table in the given
+   * database.
+   * @param dbName - the database where the table resides
+   * @param tableName - name of the partitioned table in the database
+   * @param colNames - columns whose statistics is to be retrieved
+   * @return Map of partition name and list of ColumnStatisticsObj. The objects in the list are
+   * ordered according to the given list of columns.
+   * @throws Exception
+   */
+  Map<String, List<ColumnStatisticsObj>> getAllPartitionColumnStatistics(String dbName,
+                                                                         String tableName,
+                                                                         List<String> colNames)
+          throws Exception {
+    return client.getPartitionColumnStatistics(dbName, tableName,
+            client.listPartitionNames(dbName, tableName, (short) -1), colNames);
+  }
+
   public List<Partition> getAllPartitions(String dbName, String tableName) throws Exception {
     try {
       return client.listPartitions(dbName, tableName, Short.MAX_VALUE);

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
index 61fb3d3..cf00d7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
@@ -21,11 +21,11 @@ package org.apache.hadoop.hive.ql.exec;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -77,6 +78,14 @@ public class ColumnStatsUpdateTask extends Task<ColumnStatsUpdateWork> {
   private ColumnStatistics constructColumnStatsFromInput()
       throws SemanticException, MetaException {
 
+    // If we are replicating the stats, we don't need to construct those again.
+    if (work.getColStats() != null) {
+      ColumnStatistics colStats = work.getColStats();
+      LOG.debug("Got stats through replication for " +
+              colStats.getStatsDesc().getDbName() + "." +
+              colStats.getStatsDesc().getTableName());
+      return colStats;
+    }
     String dbName = work.dbName();
     String tableName = work.getTableName();
     String partName = work.getPartName();
@@ -287,9 +296,22 @@ public class ColumnStatsUpdateTask extends Task<ColumnStatsUpdateWork> {
   }
 
   private int persistColumnStats(Hive db) throws HiveException, MetaException, IOException {
-    List<ColumnStatistics> colStats = new ArrayList<>();
-    colStats.add(constructColumnStatsFromInput());
-    SetPartitionsStatsRequest request = new SetPartitionsStatsRequest(colStats);
+    ColumnStatistics colStats = constructColumnStatsFromInput();
+    ColumnStatisticsDesc colStatsDesc = colStats.getStatsDesc();
+    // We do not support stats replication for a transactional table yet. If we are converting
+    // a non-transactional table to a transactional table during replication, we might get
+    // column statistics but we shouldn't update those.
+    if (work.getColStats() != null &&
+        AcidUtils.isTransactionalTable(getHive().getTable(colStatsDesc.getDbName(),
+                                                          colStatsDesc.getTableName()))) {
+      LOG.debug("Skipped updating column stats for table " +
+                TableName.getDbTable(colStatsDesc.getDbName(), colStatsDesc.getTableName()) +
+                " because it is converted to a transactional table during replication.");
+      return 0;
+    }
+
+    SetPartitionsStatsRequest request =
+            new SetPartitionsStatsRequest(Collections.singletonList(colStats));
     db.setPartitionColumnStatistics(request);
     return 0;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index dfa7e5e..cb7fdf7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -4724,6 +4724,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
     if (crtTbl.getReplaceMode()) {
       ReplicationSpec replicationSpec = crtTbl.getReplicationSpec();
       long writeId = 0;
+      EnvironmentContext environmentContext = null;
       if (replicationSpec != null && replicationSpec.isInReplicationScope()) {
         if (replicationSpec.isMigratingToTxnTable()) {
           // for migration we start the transaction and allocate write id in repl txn task for migration.
@@ -4735,11 +4736,19 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
         } else {
           writeId = crtTbl.getReplWriteId();
         }
+
+        // In case of replication statistics is obtained from the source, so do not update those
+        // on replica. Since we are not replicating statisics for transactional tables, do not do
+        // so for transactional tables right now.
+        if (!AcidUtils.isTransactionalTable(crtTbl)) {
+          environmentContext = new EnvironmentContext();
+          environmentContext.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
+        }
       }
 
       // replace-mode creates are really alters using CreateTableDesc.
-      db.alterTable(tbl.getCatName(), tbl.getDbName(), tbl.getTableName(), tbl, false, null,
-              true, writeId);
+      db.alterTable(tbl.getCatName(), tbl.getDbName(), tbl.getTableName(), tbl, false,
+              environmentContext, true, writeId);
     } else {
       if ((foreignKeys != null && foreignKeys.size() > 0) ||
           (primaryKeys != null && primaryKeys.size() > 0) ||

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index ca4391f..fb35c79 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -398,8 +398,24 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
             Utilities.FILE_OP_LOGGER.trace("loadTable called from " + tbd.getSourcePath()
               + " into " + tbd.getTable().getTableName());
           }
+
+          boolean resetStatistics;
+          if (hasFollowingStatsTask()) {
+            // If there's a follow-on stats task then the stats will be correct after load, so don't
+            // need to reset the statistics.
+            resetStatistics = false;
+          } else if (!work.getIsInReplicationScope()) {
+            // If the load is not happening during replication and there is not follow-on stats
+            // task, stats will be inaccurate after load and so need to be reset.
+            resetStatistics = true;
+          } else {
+            // If we are loading a table during replication, the stats will also be replicated
+            // and hence accurate if it's a non-transactional table. For transactional table we
+            // do not replicate stats yet.
+            resetStatistics = AcidUtils.isTransactionalTable(table.getParameters());
+          }
           db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getLoadFileType(),
-              work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isFullAcidOp, hasFollowingStatsTask(),
+              work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isFullAcidOp, resetStatistics,
               tbd.getWriteId(), tbd.getStmtId(), tbd.isInsertOverwrite());
           if (work.getOutputs() != null) {
             DDLTask.addIfAbsentByName(new WriteEntity(table,

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index a5b944b..947bfcf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -257,7 +257,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
           LOG.debug(
               "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri());
           try {
-            HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(tblName);
+            HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(tblName,
+                                                                                        conf);
             boolean shouldWriteExternalTableLocationInfo =
                 conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)
                 && TableType.EXTERNAL_TABLE.equals(tableTuple.object.getTableType())
@@ -335,6 +336,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
       // added/modified by concurrent txns which are later than current txn. So, need to set last repl Id of this table
       // as bootstrap dump's last repl Id.
       tuple.replicationSpec.setCurrentReplicationState(String.valueOf(lastReplId));
+
+      // For now we do not replicate stats for ACID table. So, wipe out column stats if any.
+      tableSpec.tableHandle.getTTable().unsetColStats();
     }
     MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle);
     new TableExport(

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
index 599eb04..d57cbd1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
@@ -97,6 +97,11 @@ public class FSTableEvent implements TableEvent {
         // If the conversion is from non transactional to transactional table
         if (AcidUtils.isTransactionalTable(table)) {
           replicationSpec().setMigratingToTxnTable();
+          // There won't be any writeId associated with statistics on source non-transactional
+          // table. We will need to associate a cooked up writeId on target for those. But that's
+          // not done yet. Till then we don't replicate statistics for ACID table even if it's
+          // available on the source.
+          table.getTTable().unsetColStats();
         }
         if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
           // since we have converted to an external table now after applying the migration rules the

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index e0f0979..0d1a88c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -319,6 +319,7 @@ public class LoadTable {
       );
       moveWork.setLoadTableWork(loadTableWork);
     }
+    moveWork.setIsInReplicationScope(replicationSpec.isInReplicationScope());
     Task<?> loadTableTask = TaskFactory.get(moveWork, context.hiveConf);
     copyTask.addDependentTask(loadTableTask);
     return copyTask;

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index c017790..cd59efb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1241,8 +1241,30 @@ public class Hive {
    * @return the table or if throwException is false a null value.
    * @throws HiveException
    */
-  public Table getTable(final String dbName, final String tableName,
-      boolean throwException, boolean checkTransactional) throws HiveException {
+  public Table getTable(final String dbName, final String tableName, boolean throwException,
+                        boolean checkTransactional) throws HiveException {
+    return getTable(dbName, tableName, throwException, checkTransactional, false);
+  }
+
+  /**
+   * Returns metadata of the table.
+   *
+   * @param dbName
+   *          the name of the database
+   * @param tableName
+   *          the name of the table
+   * @param throwException
+   *          controls whether an exception is thrown or a returns a null
+   * @param checkTransactional
+   *          checks whether the metadata table stats are valid (or
+   *          compilant with the snapshot isolation of) for the current transaction.
+   * @param getColumnStats
+   *          get column statistics if available
+   * @return the table or if throwException is false a null value.
+   * @throws HiveException
+   */
+  public Table getTable(final String dbName, final String tableName, boolean throwException,
+                        boolean checkTransactional, boolean getColumnStats) throws HiveException {
 
     if (tableName == null || tableName.equals("")) {
       throw new HiveException("empty table creation??");
@@ -1261,9 +1283,9 @@ public class Hive {
               dbName, tableName);
         }
         tTable = getMSC().getTable(getDefaultCatalog(conf), dbName, tableName,
-            validWriteIdList != null ? validWriteIdList.toString() : null);
+            validWriteIdList != null ? validWriteIdList.toString() : null, getColumnStats);
       } else {
-        tTable = getMSC().getTable(dbName, tableName);
+        tTable = getMSC().getTable(dbName, tableName, getColumnStats);
       }
     } catch (NoSuchObjectException e) {
       if (throwException) {
@@ -2755,14 +2777,13 @@ private void constructOneLBLocationMap(FileStatus fSta,
    *          If the source directory is LOCAL
    * @param isSkewedStoreAsSubdir
    *          if list bucketing enabled
-   * @param hasFollowingStatsTask
-   *          if there is any following stats task
    * @param isAcidIUDoperation true if this is an ACID based Insert [overwrite]/update/delete
+   * @param resetStatistics should reset statistics as part of move.
    * @param writeId write ID allocated for the current load operation
    * @param stmtId statement ID of the current load statement
    */
   public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType, boolean isSrcLocal,
-      boolean isSkewedStoreAsSubdir, boolean isAcidIUDoperation, boolean hasFollowingStatsTask,
+      boolean isSkewedStoreAsSubdir, boolean isAcidIUDoperation, boolean resetStatistics,
       Long writeId, int stmtId, boolean isInsertOverwrite) throws HiveException {
 
     PerfLogger perfLogger = SessionState.getPerfLogger();
@@ -2835,11 +2856,13 @@ private void constructOneLBLocationMap(FileStatus fSta,
       perfLogger.PerfLogEnd("MoveTask", PerfLogger.FILE_MOVES);
     }
     if (!this.getConf().getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
+      LOG.debug("setting table statistics false for " + tbl.getDbName() + "." + tbl.getTableName());
       StatsSetupConst.setBasicStatsState(tbl.getParameters(), StatsSetupConst.FALSE);
     }
 
     //column stats will be inaccurate
-    if (!hasFollowingStatsTask) {
+    if (resetStatistics) {
+      LOG.debug("Clearing table statistics for " + tbl.getDbName() + "." + tbl.getTableName());
       StatsSetupConst.clearColumnStatsState(tbl.getParameters());
     }
 
@@ -2858,7 +2881,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     }
 
     EnvironmentContext environmentContext = null;
-    if (hasFollowingStatsTask) {
+    if (!resetStatistics) {
       environmentContext = new EnvironmentContext();
       environmentContext.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
index 322b580..83cb3ea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
@@ -174,13 +174,20 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
   @Override
   public org.apache.hadoop.hive.metastore.api.Table getTable(String dbname, String name) throws MetaException,
   TException, NoSuchObjectException {
+    return getTable(dbname, name, false);
+  }
+
+  @Override
+  public org.apache.hadoop.hive.metastore.api.Table getTable(String dbname, String name,
+                                                             boolean getColStats) throws MetaException,
+  TException, NoSuchObjectException {
     // First check temp tables
     org.apache.hadoop.hive.metastore.api.Table table = getTempTable(dbname, name);
     if (table != null) {
       return deepCopy(table);  // Original method used deepCopy(), do the same here.
     }
     // Try underlying client
-    return super.getTable(MetaStoreUtils.getDefaultCatalog(conf), dbname, name);
+    return super.getTable(MetaStoreUtils.getDefaultCatalog(conf), dbname, name, getColStats);
   }
 
   // Need to override this one too or dropTable breaks because it doesn't find the table when checks
@@ -188,10 +195,19 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
   @Override
   public org.apache.hadoop.hive.metastore.api.Table getTable(String catName, String dbName,
                                                              String tableName) throws TException {
+    return getTable(catName, dbName, tableName, false);
+  }
+
+  // Need to override this one too or dropTable breaks because it doesn't find the table when checks
+  // before the drop.
+  @Override
+  public org.apache.hadoop.hive.metastore.api.Table getTable(String catName, String dbName,
+                                                             String tableName, boolean getColStats)
+          throws TException {
     if (!DEFAULT_CATALOG_NAME.equals(catName)) {
-      return super.getTable(catName, dbName, tableName);
+      return super.getTable(catName, dbName, tableName, getColStats);
     } else {
-      return getTable(dbName, tableName);
+      return getTable(dbName, tableName, getColStats);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
index 89b2db3..cd483eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
@@ -36,11 +36,13 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.CreationMetadata;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -53,6 +55,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
@@ -1106,4 +1109,28 @@ public class Table implements Serializable {
   public Boolean isOutdatedForRewriting() {
     return outdatedForRewritingMaterializedView;
   }
+
+  public ColumnStatistics getColStats() {
+    return tTable.isSetColStats() ? tTable.getColStats() : null;
+  }
+
+  /**
+   * Setup the table level stats as if the table is new. Used when setting up Table for a new
+   * table or during replication.
+   */
+  public void setStatsStateLikeNewTable() {
+    // We do not replicate statistics for
+    // an ACID Table right now, so don't touch them right now.
+    if (AcidUtils.isTransactionalTable(this)) {
+      return;
+    }
+
+    if (isPartitioned()) {
+      StatsSetupConst.setStatsStateForCreateTable(getParameters(), null,
+              StatsSetupConst.FALSE);
+    } else {
+      StatsSetupConst.setStatsStateForCreateTable(getParameters(),
+              MetaStoreUtils.getColumnNames(getCols()), StatsSetupConst.TRUE);
+    }
+  }
 };

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index a843987..6102339 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -284,6 +284,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         //if the conversion is from non transactional to transactional table
         if (TxnUtils.isTransactionalTable(tblObj)) {
           replicationSpec.setMigratingToTxnTable();
+          // There won't be any writeId associated with statistics on source non-transactional
+          // table. We will need to associate a cooked up writeId on target for those. But that's
+          // not done yet. Till then we don't replicate statistics for ACID table even if it's
+          // available on the source.
+          tblObj.unsetColStats();
         }
         tblDesc = getBaseCreateTableDescFromTable(dbname, tblObj);
         if (TableType.valueOf(tblObj.getTableType()) == TableType.EXTERNAL_TABLE) {
@@ -302,7 +307,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     boolean inReplicationScope = false;
     if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){
       tblDesc.setReplicationSpec(replicationSpec);
-      StatsSetupConst.setBasicStatsState(tblDesc.getTblProps(), StatsSetupConst.FALSE);
+      // Statistics for a non-transactional table will be replicated separately. Don't bother
+      // with it here.
+      if (TxnUtils.isTransactionalTable(tblDesc.getTblProps())) {
+        StatsSetupConst.setBasicStatsState(tblDesc.getTblProps(), StatsSetupConst.FALSE);
+      }
       inReplicationScope = true;
       tblDesc.setReplWriteId(writeId);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java
index fb8c4ca..2fa3676 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.parse.repl.dump;
 
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -54,8 +55,11 @@ public class HiveWrapper {
     return new Tuple<>(functionForSpec, () -> db.getDatabase(dbName));
   }
 
-  public Tuple<Table> table(final String tableName) throws HiveException {
-    return new Tuple<>(functionForSpec, () -> db.getTable(dbName, tableName));
+  public Tuple<Table> table(final String tableName, HiveConf conf) throws HiveException {
+    // Column statistics won't be accurate if we are dumping only metadata
+    boolean getColStats = !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY);
+    return new Tuple<>(functionForSpec, () -> db.getTable(dbName, tableName, true, false,
+            getColStats));
   }
 
   public static class Tuple<T> {

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
index b60be88..adc9446 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
@@ -72,9 +72,11 @@ public class TableExport {
         ? null
         : tableSpec;
     this.replicationSpec = replicationSpec;
-    if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) || (this.tableSpec != null
-        && this.tableSpec.tableHandle.isView())) {
+    if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) ||
+            (this.tableSpec != null && this.tableSpec.tableHandle.isView())) {
       this.replicationSpec.setIsMetadataOnly(true);
+
+      this.tableSpec.tableHandle.setStatsStateLikeNewTable();
     }
     this.db = db;
     this.distCpDoAsUser = distCpDoAsUser;

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
index 00fa370..ff43399 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -94,6 +95,14 @@ class AlterTableHandler extends AbstractEventHandler<AlterTableMessage> {
       withinContext.replicationSpec.setIsMetadataOnly(true);
       Table qlMdTableAfter = new Table(after);
       Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
+
+      // If we are not dumping metadata about a table, we shouldn't be dumping basic statistics
+      // as well, since that won't be accurate. So reset them to what they would look like for an
+      // empty table.
+      if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) {
+        qlMdTableAfter.setStatsStateLikeNewTable();
+      }
+
       EximUtil.createExportDump(
           metaDataPath.getFileSystem(withinContext.hiveConf),
           metaDataPath,

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
index 5870876..a8bf671 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -61,6 +62,13 @@ class CreateTableHandler extends AbstractEventHandler<CreateTableMessage> {
       withinContext.replicationSpec.setIsMetadataOnly(true);
     }
 
+    // If we are not dumping data about a table, we shouldn't be dumping basic statistics
+    // as well, since that won't be accurate. So reset them to what they would look like for an
+    // empty table.
+    if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) {
+      qlMdTable.setStatsStateLikeNewTable();
+    }
+
     Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
     EximUtil.createExportDump(
         metaDataPath.getFileSystem(withinContext.hiveConf),

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
index a3aecde..e50a2bc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
@@ -19,14 +19,15 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.messaging.UpdateTableColumnStatMessage;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
 class UpdateTableColStatHandler extends AbstractEventHandler<UpdateTableColumnStatMessage> {
 
-  UpdateTableColStatHandler(NotificationEvent event) {
-    super(event);
-  }
+  UpdateTableColStatHandler(NotificationEvent event) { super(event); }
 
   @Override
   UpdateTableColumnStatMessage eventMessage(String stringRepresentation) {
@@ -35,6 +36,20 @@ class UpdateTableColStatHandler extends AbstractEventHandler<UpdateTableColumnSt
 
   @Override
   public void handle(Context withinContext) throws Exception {
+    Table qlMdTable = new Table(eventMessage.getTableObject());
+    if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) {
+      return;
+    }
+
+    // Statistics without data doesn't make sense.
+    if (withinContext.replicationSpec.isMetadataOnly()) {
+      return;
+    }
+    // For now we do not replicate the statistics for transactional tables.
+    if (AcidUtils.isTransactionalTable(qlMdTable)) {
+      return;
+    }
+
     LOG.info("Processing#{} UpdateTableColumnStat message : {}", fromEventId(), eventMessageAsJSON);
     DumpMetaData dmd = withinContext.createDmd(this);
     dmd.setPayload(eventMessageAsJSON);

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java
index eb3d18a..9a60de4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java
@@ -17,10 +17,13 @@
  */
 package org.apache.hadoop.hive.ql.parse.repl.load.message;
 
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.messaging.UpdateTableColumnStatMessage;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
+import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork;
 
 import java.io.Serializable;
 import java.util.Collections;
@@ -31,13 +34,29 @@ import java.util.List;
  * Target(Load) side handler for table stat update event
  */
 public class UpdateTableColStatHandler extends AbstractMessageHandler {
-  @Override
-  public List<Task<? extends Serializable>> handle(Context context)
-      throws SemanticException {
-    context.log.info("Replication of table stat update event is not supported yet");
-    if (!context.isDbNameEmpty()) {
-      updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null);
+    @Override
+    public List<Task<? extends Serializable>> handle(Context context)
+            throws SemanticException {
+        UpdateTableColumnStatMessage utcsm =
+                deserializer.getUpdateTableColumnStatMessage(context.dmd.getPayload());
+
+        // Update tablename and database name in the statistics object
+        ColumnStatistics colStats = utcsm.getColumnStatistics();
+        ColumnStatisticsDesc colStatsDesc = colStats.getStatsDesc();
+        colStatsDesc.setDbName(context.dbName);
+        if (!context.isTableNameEmpty()) {
+          colStatsDesc.setTableName(context.tableName);
+        }
+        if (!context.isDbNameEmpty()) {
+            updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName,
+                    context.tableName, null);
+        }
+
+      // TODO: For txn stats update, ColumnStatsUpdateTask.execute()->Hive
+      // .setPartitionColumnStatistics expects a valid writeId allocated by the current txn and
+      // also, there should be a table snapshot. But, it won't be there as update from
+      // ReplLoadTask which doesn't have a write id allocated. Need to check this further.
+        return Collections.singletonList(TaskFactory.get(new ColumnStatsUpdateWork(colStats),
+                context.hiveConf));
     }
-    return Collections.singletonList(TaskFactory.get(new DependencyCollectionWork(), context.hiveConf));
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java
index 6de1a37..1219b62 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.plan;
 import java.io.Serializable;
 import java.util.Map;
 
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.ql.plan.DDLDesc.DDLDescWithWriteId;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 
@@ -42,6 +43,7 @@ public class ColumnStatsUpdateWork implements Serializable, DDLDescWithWriteId {
   private final String tableName;
   private final String colName;
   private final String colType;
+  private final ColumnStatistics colStats;
   private long writeId;
 
   public ColumnStatsUpdateWork(String partName,
@@ -56,6 +58,17 @@ public class ColumnStatsUpdateWork implements Serializable, DDLDescWithWriteId {
     this.tableName = tableName;
     this.colName = colName;
     this.colType = colType;
+    this.colStats = null;
+  }
+
+  public ColumnStatsUpdateWork(ColumnStatistics colStats) {
+    this.colStats = colStats;
+    this.partName = null;
+    this.mapProp = null;
+    this.dbName = null;
+    this.tableName = null;
+    this.colName = null;
+    this.colType = null;
   }
 
   @Override
@@ -87,6 +100,8 @@ public class ColumnStatsUpdateWork implements Serializable, DDLDescWithWriteId {
     return colType;
   }
 
+  public ColumnStatistics getColStats() { return colStats; }
+
   @Override
   public void setWriteId(long writeId) {
     this.writeId = writeId;

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
index f00148b..c71ff6d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.PartitionManagementTask;
 import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.SQLCheckConstraint;
@@ -38,6 +40,7 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.DDLTask;
@@ -106,6 +109,7 @@ public class CreateTableDesc extends DDLDesc implements Serializable {
   List<SQLNotNullConstraint> notNullConstraints;
   List<SQLDefaultConstraint> defaultConstraints;
   List<SQLCheckConstraint> checkConstraints;
+  private ColumnStatistics colStats;
   private Long initialMmWriteId; // Initial MM write ID for CTAS and import.
   // The FSOP configuration for the FSOP that is going to write initial data during ctas.
   // This is not needed beyond compilation, so it is transient.
@@ -127,7 +131,8 @@ public class CreateTableDesc extends DDLDesc implements Serializable {
       boolean ifNotExists, List<String> skewedColNames, List<List<String>> skewedColValues,
       List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys,
       List<SQLUniqueConstraint> uniqueConstraints, List<SQLNotNullConstraint> notNullConstraints,
-      List<SQLDefaultConstraint> defaultConstraints, List<SQLCheckConstraint> checkConstraints) {
+      List<SQLDefaultConstraint> defaultConstraints, List<SQLCheckConstraint> checkConstraints,
+      ColumnStatistics colStats) {
 
     this(tableName, isExternal, isTemporary, cols, partCols,
         bucketCols, sortCols, numBuckets, fieldDelim, fieldEscape,
@@ -137,6 +142,7 @@ public class CreateTableDesc extends DDLDesc implements Serializable {
         primaryKeys, foreignKeys, uniqueConstraints, notNullConstraints, defaultConstraints, checkConstraints);
 
     this.databaseName = databaseName;
+    this.colStats = colStats;
   }
 
   public CreateTableDesc(String databaseName, String tableName, boolean isExternal, boolean isTemporary,
@@ -157,7 +163,8 @@ public class CreateTableDesc extends DDLDesc implements Serializable {
         collItemDelim, mapKeyDelim, lineDelim, comment, inputFormat,
         outputFormat, location, serName, storageHandler, serdeProps,
         tblProps, ifNotExists, skewedColNames, skewedColValues,
-        primaryKeys, foreignKeys, uniqueConstraints, notNullConstraints, defaultConstraints, checkConstraints);
+        primaryKeys, foreignKeys, uniqueConstraints, notNullConstraints, defaultConstraints, checkConstraints,
+       null);
     this.partColNames = partColNames;
     this.isCTAS = isCTAS;
   }
@@ -878,14 +885,29 @@ public class CreateTableDesc extends DDLDesc implements Serializable {
       }
     }
 
-    if (!this.isCTAS && (tbl.getPath() == null || (tbl.isEmpty() && !isExternal()))) {
-      if (!tbl.isPartitioned() && conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
-        StatsSetupConst.setStatsStateForCreateTable(tbl.getTTable().getParameters(),
-            MetaStoreUtils.getColumnNames(tbl.getCols()), StatsSetupConst.TRUE);
-      }
+    if (colStats != null) {
+      ColumnStatisticsDesc colStatsDesc = new ColumnStatisticsDesc(colStats.getStatsDesc());
+      colStatsDesc.setCatName(tbl.getCatName());
+      colStatsDesc.setDbName(getTableName());
+      colStatsDesc.setDbName(getDatabaseName());
+      tbl.getTTable().setColStats(new ColumnStatistics(colStatsDesc, colStats.getStatsObj()));
+    }
+
+    // The statistics for non-transactional tables will be obtained from the source. Do not
+    // reset those on replica.
+    if (replicationSpec != null && replicationSpec.isInReplicationScope() &&
+        !TxnUtils.isTransactionalTable(tbl.getTTable())) {
+      // Do nothing to the table statistics.
     } else {
-      StatsSetupConst.setStatsStateForCreateTable(tbl.getTTable().getParameters(), null,
-          StatsSetupConst.FALSE);
+      if (!this.isCTAS && (tbl.getPath() == null || (tbl.isEmpty() && !isExternal()))) {
+        if (!tbl.isPartitioned() && conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
+          StatsSetupConst.setStatsStateForCreateTable(tbl.getTTable().getParameters(),
+                  MetaStoreUtils.getColumnNames(tbl.getCols()), StatsSetupConst.TRUE);
+        }
+      } else {
+        StatsSetupConst.setStatsStateForCreateTable(tbl.getTTable().getParameters(), null,
+                StatsSetupConst.FALSE);
+      }
     }
     return tbl;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
index 50b43ba..5c30fca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
@@ -84,7 +84,8 @@ public class ImportTableDesc {
                 null,
                 null,
             null,
-            null);
+            null,
+                table.getColStats());
         this.createTblDesc.setStoredAsSubDirectories(table.getSd().isStoredAsSubDirectories());
         break;
       case VIEW:

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
index 47a56d5..8ca8e46 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
@@ -56,6 +56,7 @@ public class MoveWork implements Serializable {
    */
   protected List<Partition> movedParts;
   private boolean isNoop;
+  private boolean isInReplicationScope = false;
 
   public MoveWork() {
   }
@@ -164,4 +165,12 @@ public class MoveWork implements Serializable {
   public void setNeedCleanTarget(boolean needCleanTarget) {
     this.needCleanTarget = needCleanTarget;
   }
+
+  public void setIsInReplicationScope(boolean isInReplicationScope) {
+    this.isInReplicationScope = isInReplicationScope;
+  }
+
+  public boolean getIsInReplicationScope() {
+    return this.isInReplicationScope;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
index e108684..78f2585 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
@@ -142,7 +142,7 @@ public class TestExecDriver extends TestCase {
         db.createTable(src, cols, null, TextInputFormat.class,
             HiveIgnoreKeyTextOutputFormat.class);
         db.loadTable(hadoopDataFile[i], src, LoadFileType.KEEP_EXISTING,
-           true, false, false, false, null, 0, false);
+           true, false, false, true, null, 0, false);
         i++;
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/2ffca04a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetTableRequest.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetTableRequest.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetTableRequest.java
index 2804952..3b60695 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetTableRequest.java
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetTableRequest.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
   private static final org.apache.thrift.protocol.TField CAPABILITIES_FIELD_DESC = new org.apache.thrift.protocol.TField("capabilities", org.apache.thrift.protocol.TType.STRUCT, (short)3);
   private static final org.apache.thrift.protocol.TField CAT_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("catName", org.apache.thrift.protocol.TType.STRING, (short)4);
   private static final org.apache.thrift.protocol.TField VALID_WRITE_ID_LIST_FIELD_DESC = new org.apache.thrift.protocol.TField("validWriteIdList", org.apache.thrift.protocol.TType.STRING, (short)6);
+  private static final org.apache.thrift.protocol.TField GET_COLUMN_STATS_FIELD_DESC = new org.apache.thrift.protocol.TField("getColumnStats", org.apache.thrift.protocol.TType.BOOL, (short)7);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -55,6 +56,7 @@ import org.slf4j.LoggerFactory;
   private ClientCapabilities capabilities; // optional
   private String catName; // optional
   private String validWriteIdList; // optional
+  private boolean getColumnStats; // optional
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -62,7 +64,8 @@ import org.slf4j.LoggerFactory;
     TBL_NAME((short)2, "tblName"),
     CAPABILITIES((short)3, "capabilities"),
     CAT_NAME((short)4, "catName"),
-    VALID_WRITE_ID_LIST((short)6, "validWriteIdList");
+    VALID_WRITE_ID_LIST((short)6, "validWriteIdList"),
+    GET_COLUMN_STATS((short)7, "getColumnStats");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -87,6 +90,8 @@ import org.slf4j.LoggerFactory;
           return CAT_NAME;
         case 6: // VALID_WRITE_ID_LIST
           return VALID_WRITE_ID_LIST;
+        case 7: // GET_COLUMN_STATS
+          return GET_COLUMN_STATS;
         default:
           return null;
       }
@@ -127,7 +132,9 @@ import org.slf4j.LoggerFactory;
   }
 
   // isset id assignments
-  private static final _Fields optionals[] = {_Fields.CAPABILITIES,_Fields.CAT_NAME,_Fields.VALID_WRITE_ID_LIST};
+  private static final int __GETCOLUMNSTATS_ISSET_ID = 0;
+  private byte __isset_bitfield = 0;
+  private static final _Fields optionals[] = {_Fields.CAPABILITIES,_Fields.CAT_NAME,_Fields.VALID_WRITE_ID_LIST,_Fields.GET_COLUMN_STATS};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -141,6 +148,8 @@ import org.slf4j.LoggerFactory;
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     tmpMap.put(_Fields.VALID_WRITE_ID_LIST, new org.apache.thrift.meta_data.FieldMetaData("validWriteIdList", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.GET_COLUMN_STATS, new org.apache.thrift.meta_data.FieldMetaData("getColumnStats", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(GetTableRequest.class, metaDataMap);
   }
@@ -161,6 +170,7 @@ import org.slf4j.LoggerFactory;
    * Performs a deep copy on <i>other</i>.
    */
   public GetTableRequest(GetTableRequest other) {
+    __isset_bitfield = other.__isset_bitfield;
     if (other.isSetDbName()) {
       this.dbName = other.dbName;
     }
@@ -176,6 +186,7 @@ import org.slf4j.LoggerFactory;
     if (other.isSetValidWriteIdList()) {
       this.validWriteIdList = other.validWriteIdList;
     }
+    this.getColumnStats = other.getColumnStats;
   }
 
   public GetTableRequest deepCopy() {
@@ -189,6 +200,8 @@ import org.slf4j.LoggerFactory;
     this.capabilities = null;
     this.catName = null;
     this.validWriteIdList = null;
+    setGetColumnStatsIsSet(false);
+    this.getColumnStats = false;
   }
 
   public String getDbName() {
@@ -306,6 +319,28 @@ import org.slf4j.LoggerFactory;
     }
   }
 
+  public boolean isGetColumnStats() {
+    return this.getColumnStats;
+  }
+
+  public void setGetColumnStats(boolean getColumnStats) {
+    this.getColumnStats = getColumnStats;
+    setGetColumnStatsIsSet(true);
+  }
+
+  public void unsetGetColumnStats() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __GETCOLUMNSTATS_ISSET_ID);
+  }
+
+  /** Returns true if field getColumnStats is set (has been assigned a value) and false otherwise */
+  public boolean isSetGetColumnStats() {
+    return EncodingUtils.testBit(__isset_bitfield, __GETCOLUMNSTATS_ISSET_ID);
+  }
+
+  public void setGetColumnStatsIsSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __GETCOLUMNSTATS_ISSET_ID, value);
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case DB_NAME:
@@ -348,6 +383,14 @@ import org.slf4j.LoggerFactory;
       }
       break;
 
+    case GET_COLUMN_STATS:
+      if (value == null) {
+        unsetGetColumnStats();
+      } else {
+        setGetColumnStats((Boolean)value);
+      }
+      break;
+
     }
   }
 
@@ -368,6 +411,9 @@ import org.slf4j.LoggerFactory;
     case VALID_WRITE_ID_LIST:
       return getValidWriteIdList();
 
+    case GET_COLUMN_STATS:
+      return isGetColumnStats();
+
     }
     throw new IllegalStateException();
   }
@@ -389,6 +435,8 @@ import org.slf4j.LoggerFactory;
       return isSetCatName();
     case VALID_WRITE_ID_LIST:
       return isSetValidWriteIdList();
+    case GET_COLUMN_STATS:
+      return isSetGetColumnStats();
     }
     throw new IllegalStateException();
   }
@@ -451,6 +499,15 @@ import org.slf4j.LoggerFactory;
         return false;
     }
 
+    boolean this_present_getColumnStats = true && this.isSetGetColumnStats();
+    boolean that_present_getColumnStats = true && that.isSetGetColumnStats();
+    if (this_present_getColumnStats || that_present_getColumnStats) {
+      if (!(this_present_getColumnStats && that_present_getColumnStats))
+        return false;
+      if (this.getColumnStats != that.getColumnStats)
+        return false;
+    }
+
     return true;
   }
 
@@ -483,6 +540,11 @@ import org.slf4j.LoggerFactory;
     if (present_validWriteIdList)
       list.add(validWriteIdList);
 
+    boolean present_getColumnStats = true && (isSetGetColumnStats());
+    list.add(present_getColumnStats);
+    if (present_getColumnStats)
+      list.add(getColumnStats);
+
     return list.hashCode();
   }
 
@@ -544,6 +606,16 @@ import org.slf4j.LoggerFactory;
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(isSetGetColumnStats()).compareTo(other.isSetGetColumnStats());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetGetColumnStats()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.getColumnStats, other.getColumnStats);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -609,6 +681,12 @@ import org.slf4j.LoggerFactory;
       }
       first = false;
     }
+    if (isSetGetColumnStats()) {
+      if (!first) sb.append(", ");
+      sb.append("getColumnStats:");
+      sb.append(this.getColumnStats);
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -639,6 +717,8 @@ import org.slf4j.LoggerFactory;
 
   private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
     try {
+      // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+      __isset_bitfield = 0;
       read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
     } catch (org.apache.thrift.TException te) {
       throw new java.io.IOException(te);
@@ -704,6 +784,14 @@ import org.slf4j.LoggerFactory;
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 7: // GET_COLUMN_STATS
+            if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) {
+              struct.getColumnStats = iprot.readBool();
+              struct.setGetColumnStatsIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -748,6 +836,11 @@ import org.slf4j.LoggerFactory;
           oprot.writeFieldEnd();
         }
       }
+      if (struct.isSetGetColumnStats()) {
+        oprot.writeFieldBegin(GET_COLUMN_STATS_FIELD_DESC);
+        oprot.writeBool(struct.getColumnStats);
+        oprot.writeFieldEnd();
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -777,7 +870,10 @@ import org.slf4j.LoggerFactory;
       if (struct.isSetValidWriteIdList()) {
         optionals.set(2);
       }
-      oprot.writeBitSet(optionals, 3);
+      if (struct.isSetGetColumnStats()) {
+        optionals.set(3);
+      }
+      oprot.writeBitSet(optionals, 4);
       if (struct.isSetCapabilities()) {
         struct.capabilities.write(oprot);
       }
@@ -787,6 +883,9 @@ import org.slf4j.LoggerFactory;
       if (struct.isSetValidWriteIdList()) {
         oprot.writeString(struct.validWriteIdList);
       }
+      if (struct.isSetGetColumnStats()) {
+        oprot.writeBool(struct.getColumnStats);
+      }
     }
 
     @Override
@@ -796,7 +895,7 @@ import org.slf4j.LoggerFactory;
       struct.setDbNameIsSet(true);
       struct.tblName = iprot.readString();
       struct.setTblNameIsSet(true);
-      BitSet incoming = iprot.readBitSet(3);
+      BitSet incoming = iprot.readBitSet(4);
       if (incoming.get(0)) {
         struct.capabilities = new ClientCapabilities();
         struct.capabilities.read(iprot);
@@ -810,6 +909,10 @@ import org.slf4j.LoggerFactory;
         struct.validWriteIdList = iprot.readString();
         struct.setValidWriteIdListIsSet(true);
       }
+      if (incoming.get(3)) {
+        struct.getColumnStats = iprot.readBool();
+        struct.setGetColumnStatsIsSet(true);
+      }
     }
   }