You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by dh...@apache.org on 2018/05/21 20:27:09 UTC

[2/2] impala git commit: IMPALA-6131: Track time of last statistics update in metadata

IMPALA-6131: Track time of last statistics update in metadata

The timestamp of the last COMPUTE STATS operation is saved to
table property "impala.lastComputeStatsTime". The format is
the same as in "transient_lastDdlTime", so the two can be
compared to check if the schema has changed since computing
statistics.

Other changes:
- Handling of "transient_lastDdlTime" is simplified - the old
  logic set it to current time + 1, if the old version was
  >= current time, to ensure that it is always increased by
  DDL operations. This was useful in the past, as IMPALA-387
  used lastDdlTime to check if partition data needs to be
  reloaded, but since IMPALA-1480, Impala does not rely on
  lastDdlTime at all.

- Computing / setting stats on HDFS tables no longer increases
  "transient_lastDdlTime".

- When Kudu tables are (re)loaded, it is checked if their
  HMS representation is up to date, and if it is, then
  IMetaStoreClient.alter_table() is not called. The old
  logic always called alter_table() after loading metadata
  from Kudu. This change was needed to ensure that
  "transient_lastDdlTime" works similarly in HDFS and Kudu
  tables, and should also make (re)loading Kudu tables faster.

Notes:
- Kudu will be able to sync its tables to HMS in the near
  future (see KUDU-2191), so the Kudu metadata handling in
  Impala may need to be redesigned.

Testing:
tests/metadata/test_last_ddl_time_update.py is extended by
- also checking "impala.lastComputeStatsTime"
- testing more SQL statements
- tests for Kudu tables

Note that test_last_ddl_time_update.py is ran only in
exhaustive testing.

Change-Id: I59a671ac29d352bd92ce40d5cb6662bb23f146b5
Reviewed-on: http://gerrit.cloudera.org:8080/10116
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/5c7d3b12
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/5c7d3b12
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/5c7d3b12

Branch: refs/heads/master
Commit: 5c7d3b12e3aa750e7ab88e3ef1092d5218e53cc2
Parents: d880c72
Author: Csaba Ringhofer <cs...@cloudera.com>
Authored: Tue Mar 20 13:29:33 2018 +0100
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Mon May 21 18:25:26 2018 +0000

----------------------------------------------------------------------
 .../org/apache/impala/analysis/ToSqlUtils.java  |   4 +-
 .../impala/catalog/CatalogServiceCatalog.java   |  29 ---
 .../org/apache/impala/catalog/HdfsTable.java    |  18 +-
 .../org/apache/impala/catalog/KuduTable.java    |  12 +-
 .../java/org/apache/impala/catalog/Table.java   |  30 +--
 .../impala/service/CatalogOpExecutor.java       |  92 +++-----
 .../impala/analysis/AuthorizationTest.java      |   2 +
 tests/metadata/test_last_ddl_time_update.py     | 228 ++++++++++++++-----
 8 files changed, 235 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/5c7d3b12/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index a3c084d..a4129c4 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -224,8 +224,8 @@ public class ToSqlUtils {
     org.apache.hadoop.hive.metastore.api.Table msTable = table.getMetaStoreTable();
     // Use a LinkedHashMap to preserve the ordering of the table properties.
     LinkedHashMap<String, String> properties = Maps.newLinkedHashMap(msTable.getParameters());
-    if (properties.containsKey("transient_lastDdlTime")) {
-      properties.remove("transient_lastDdlTime");
+    if (properties.containsKey(Table.TBL_PROP_LAST_DDL_TIME)) {
+      properties.remove(Table.TBL_PROP_LAST_DDL_TIME);
     }
     boolean isExternal = msTable.getTableType() != null &&
         msTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString());

http://git-wip-us.apache.org/repos/asf/impala/blob/5c7d3b12/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 20a3e2f..19006ae 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -1401,35 +1401,6 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
-   * Returns the table parameter 'transient_lastDdlTime', or -1 if it's not set.
-   * TODO: move this to a metastore helper class.
-   */
-  public static long getLastDdlTime(org.apache.hadoop.hive.metastore.api.Table msTbl) {
-    Preconditions.checkNotNull(msTbl);
-    Map<String, String> params = msTbl.getParameters();
-    String lastDdlTimeStr = params.get("transient_lastDdlTime");
-    if (lastDdlTimeStr != null) {
-      try {
-        return Long.parseLong(lastDdlTimeStr);
-      } catch (NumberFormatException e) {}
-    }
-    return -1;
-  }
-
-  /**
-   * Updates the cached lastDdlTime for the given table. The lastDdlTime is used during
-   * the metadata refresh() operations to determine if there have been any external
-   * (outside of Impala) modifications to the table.
-   */
-  public void updateLastDdlTime(TTableName tblName, long ddlTime) {
-    Db db = getDb(tblName.getDb_name());
-    if (db == null) return;
-    Table tbl = db.getTable(tblName.getTable_name());
-    if (tbl == null) return;
-    tbl.updateLastDdlTime(ddlTime);
-  }
-
-  /**
    * Renames a table. Equivalent to an atomic drop + add of the table. Returns
    * a pair of tables containing the removed table (or null if the table drop was not
    * successful) and the new table (or null if either the drop of the old one or the

http://git-wip-us.apache.org/repos/asf/impala/blob/5c7d3b12/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index b309156..9910eb8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1232,12 +1232,10 @@ public class HdfsTable extends Table {
    *
    * If 'loadTableSchema' is true, the table schema is loaded from the Hive Metastore.
    *
-   * There are several cases where existing file descriptors might be reused incorrectly:
-   * 1. an ALTER TABLE ADD PARTITION or dynamic partition insert is executed through
-   *    Hive. This does not update the lastDdlTime.
-   * 2. Hdfs rebalancer is executed. This changes the block locations but doesn't update
-   *    the mtime (file modification time).
-   * If any of these occur, user has to execute "invalidate metadata" to invalidate the
+   * Existing file descriptors might be reused incorrectly if Hdfs rebalancer was
+   * executed, as it changes the block locations but doesn't update the mtime (file
+   * modification time).
+   * If this occurs, user has to execute "invalidate metadata" to invalidate the
    * metadata cache of the table and trigger a fresh load.
    */
   public void load(boolean reuseMetadata, IMetaStoreClient client,
@@ -1339,13 +1337,13 @@ public class HdfsTable extends Table {
    * any, or for all the table partitions if 'partitionsToUpdate' is null.
    */
   private void updatePartitionsFromHms(IMetaStoreClient client,
-      Set<String> partitionsToUpdate, boolean loadParitionFileMetadata)
+      Set<String> partitionsToUpdate, boolean loadPartitionFileMetadata)
       throws Exception {
     if (LOG.isTraceEnabled()) LOG.trace("Sync table partitions: " + getFullName());
     org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
     Preconditions.checkNotNull(msTbl);
     Preconditions.checkState(msTbl.getPartitionKeysSize() != 0);
-    Preconditions.checkState(loadParitionFileMetadata || partitionsToUpdate == null);
+    Preconditions.checkState(loadPartitionFileMetadata || partitionsToUpdate == null);
 
     // Retrieve all the partition names from the Hive Metastore. We need this to
     // identify the delta between partitions of the local HdfsTable and the table entry
@@ -1378,7 +1376,7 @@ public class HdfsTable extends Table {
         // list and loading them from the Hive Metastore.
         dirtyPartitions.add(partition);
       } else {
-        if (partitionsToUpdate == null && loadParitionFileMetadata) {
+        if (partitionsToUpdate == null && loadPartitionFileMetadata) {
           Path partitionPath = partition.getLocationPath();
           List<HdfsPartition> partitions =
             partitionsToUpdateFileMdByPath.get(partitionPath);
@@ -1412,7 +1410,7 @@ public class HdfsTable extends Table {
     // Load file metadata. Until we have a notification mechanism for when a
     // file changes in hdfs, it is sometimes required to reload all the file
     // descriptors and block metadata of a table (e.g. REFRESH statement).
-    if (loadParitionFileMetadata) {
+    if (loadPartitionFileMetadata) {
       if (partitionsToUpdate != null) {
         // Only reload file metadata of partitions specified in 'partitionsToUpdate'
         Preconditions.checkState(partitionsToUpdateFileMdByPath.isEmpty());

http://git-wip-us.apache.org/repos/asf/impala/blob/5c7d3b12/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index 8296ed0..5d6a10d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -219,7 +219,8 @@ public class KuduTable extends Table {
     final Timer.Context context =
         getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time();
     try {
-      msTable_ = msTbl;
+      // Copy the table to check later if anything has changed.
+      msTable_ = msTbl.deepCopy();
       kuduTableName_ = msTable_.getParameters().get(KuduTable.KEY_TABLE_NAME);
       Preconditions.checkNotNull(kuduTableName_);
       kuduMasters_ = msTable_.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
@@ -234,10 +235,15 @@ public class KuduTable extends Table {
             kuduTableName_, e);
       }
 
+      // Avoid updating HMS if the schema didn't change.
+      if (msTable_.equals(msTbl)) return;
+
       // Update the table schema in HMS.
       try {
-        long lastDdlTime = CatalogOpExecutor.calculateDdlTime(msTable_);
-        msTable_.putToParameters("transient_lastDdlTime", Long.toString(lastDdlTime));
+        // HMS would fill this table property if it was not set, but as metadata written
+        // with alter_table is not read back in case of Kudu tables, it has to be set here
+        // to ensure that HMS and catalogd have the same timestamp.
+        updateTimestampProperty(msTable_, TBL_PROP_LAST_DDL_TIME);
         msTable_.putToParameters(StatsSetupConst.DO_NOT_UPDATE_STATS,
             StatsSetupConst.TRUE);
         msClient.alter_table(msTable_.getDbName(), msTable_.getTableName(), msTable_);

http://git-wip-us.apache.org/repos/asf/impala/blob/5c7d3b12/fe/src/main/java/org/apache/impala/catalog/Table.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 9255f0a..57dab1a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -98,9 +98,6 @@ public abstract class Table extends CatalogObjectImpl {
   // Type of this table (array of struct) that mirrors the columns. Useful for analysis.
   protected final ArrayType type_ = new ArrayType(new StructType());
 
-  // The lastDdlTime for this table; -1 if not set
-  protected long lastDdlTime_;
-
   // True if this object is stored in an Impalad catalog cache.
   protected boolean storedInImpaladCatalogCache_ = false;
 
@@ -110,14 +107,19 @@ public abstract class Table extends CatalogObjectImpl {
   public static final String ALTER_DURATION_METRIC = "alter-duration";
   public static final String LOAD_DURATION_METRIC = "load-duration";
 
+  // Table property key for storing the time of the last DDL operation.
+  public static final String TBL_PROP_LAST_DDL_TIME = "transient_lastDdlTime";
+
+  // Table property key for storing the last time when Impala executed COMPUTE STATS.
+  public static final String TBL_PROP_LAST_COMPUTE_STATS_TIME =
+      "impala.lastComputeStatsTime";
+
   protected Table(org.apache.hadoop.hive.metastore.api.Table msTable, Db db,
       String name, String owner) {
     msTable_ = msTable;
     db_ = db;
     name_ = name.toLowerCase();
     owner_ = owner;
-    lastDdlTime_ = (msTable_ != null) ?
-        CatalogServiceCatalog.getLastDdlTime(msTable_) : -1;
     tableStats_ = new TTableStats(-1);
     tableStats_.setTotal_file_bytes(-1);
     initMetrics();
@@ -186,16 +188,6 @@ public abstract class Table extends CatalogObjectImpl {
     ((StructType) type_.getItemType()).clearFields();
   }
 
-  /**
-   * Updates the lastDdlTime for this Table, if the new value is greater
-   * than the existing value. Does nothing if the new value is less than
-   * or equal to the existing value.
-   */
-  public void updateLastDdlTime(long ddlTime) {
-    // Ensure the lastDdlTime never goes backwards.
-    if (ddlTime > lastDdlTime_) lastDdlTime_ = ddlTime;
-  }
-
   // Returns a list of all column names for this table which we expect to have column
   // stats in the HMS. This exists because, when we request the column stats from HMS,
   // including a column name that does not have stats causes the
@@ -578,4 +570,12 @@ public abstract class Table extends CatalogObjectImpl {
     if (!(obj instanceof Table)) return false;
     return getFullName().equals(((Table) obj).getFullName());
   }
+
+  /**
+   *  Updates a table property with the current system time in seconds precision.
+   */
+  public static void updateTimestampProperty(
+      org.apache.hadoop.hive.metastore.api.Table msTbl, String propertyKey) {
+    msTbl.putToParameters(propertyKey, Long.toString(System.currentTimeMillis() / 1000));
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/5c7d3b12/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index ea67389..82422ab 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -711,7 +711,7 @@ public class CatalogOpExecutor {
       if (LOG.isTraceEnabled()) {
         LOG.trace(String.format("Altering view %s", tableName));
       }
-      applyAlterTable(msTbl);
+      applyAlterTable(msTbl, true);
       try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
         tbl.load(true, msClient.getHiveClient(), msTbl);
       }
@@ -729,6 +729,8 @@ public class CatalogOpExecutor {
    * in batches of size 'MAX_PARTITION_UPDATES_PER_RPC'.
    * This function is used by COMPUTE STATS, COMPUTE INCREMENTAL STATS and
    * ALTER TABLE SET COLUMN STATS.
+   * Updates table property 'impala.lastComputeStatsTime' for COMPUTE (INCREMENTAL) STATS,
+   * but not for ALTER TABLE SET COLUMN STATS.
    * Returns the number of updated partitions and columns in 'numUpdatedPartitions'
    * and 'numUpdatedColumns', respectively.
    */
@@ -780,10 +782,15 @@ public class CatalogOpExecutor {
       bulkAlterPartitions(table.getDb().getName(), table.getName(), modifiedParts);
     }
 
-    // Update table row count and total file bytes. Apply table alteration to HMS last to
-    // ensure the lastDdlTime is as accurate as possible.
-    if (params.isSetTable_stats()) updateTableStats(params, msTbl);
-    applyAlterTable(msTbl);
+    if (params.isSetTable_stats()) {
+      // Update table row count and total file bytes.
+      updateTableStats(params, msTbl);
+      // Set impala.lastComputeStatsTime just before alter_table to ensure that it is as
+      // accurate as possible.
+      Table.updateTimestampProperty(msTbl, HdfsTable.TBL_PROP_LAST_COMPUTE_STATS_TIME);
+    }
+
+    applyAlterTable(msTbl, false);
 
     numUpdatedPartitions.setRef(Long.valueOf(0));
     if (modifiedParts != null) {
@@ -1232,7 +1239,7 @@ public class CatalogOpExecutor {
     boolean droppedTotalSize =
         msTbl.getParameters().remove(StatsSetupConst.TOTAL_SIZE) != null;
     if (droppedRowCount || droppedTotalSize) {
-      applyAlterTable(msTbl);
+      applyAlterTable(msTbl, false);
       ++numTargetedPartitions;
     }
 
@@ -1778,7 +1785,7 @@ public class CatalogOpExecutor {
             cacheOp.getCache_pool_name(), replication);
         catalog_.watchCacheDirs(Lists.<Long>newArrayList(id),
             new TTableName(newTable.getDbName(), newTable.getTableName()));
-        applyAlterTable(newTable);
+        applyAlterTable(newTable, true);
       }
       Table newTbl = catalog_.addTable(newTable.getDbName(), newTable.getTableName());
       addTableToCatalogUpdate(newTbl, response.result);
@@ -1968,7 +1975,7 @@ public class CatalogOpExecutor {
         msTbl.getSd().addToCols(fs);
       }
     }
-    applyAlterTable(msTbl);
+    applyAlterTable(msTbl, true);
   }
 
   /**
@@ -2005,7 +2012,7 @@ public class CatalogOpExecutor {
             "Column name %s not found in table %s.", colName, tbl.getFullName()));
       }
     }
-    applyAlterTable(msTbl);
+    applyAlterTable(msTbl, true);
   }
 
   /**
@@ -2276,7 +2283,7 @@ public class CatalogOpExecutor {
       String alteredColumns = MetaStoreUtil.removeValueFromCsvList(oldColumns, colName);
       msTbl.getParameters().put(sortByKey, alteredColumns);
     }
-    applyAlterTable(msTbl);
+    applyAlterTable(msTbl, true);
   }
 
   /**
@@ -2337,7 +2344,7 @@ public class CatalogOpExecutor {
       // The default partition must be updated if the file format is changed so that new
       // partitions are created with the new file format.
       if (tbl instanceof HdfsTable) ((HdfsTable) tbl).addDefaultPartition(msTbl.getSd());
-      applyAlterTable(msTbl);
+      applyAlterTable(msTbl, true);
       reloadFileMetadata = true;
     } else {
       Preconditions.checkArgument(tbl instanceof HdfsTable);
@@ -2376,7 +2383,7 @@ public class CatalogOpExecutor {
       // The default partition must be updated if the row format is changed so that new
       // partitions are created with the new file format.
       ((HdfsTable) tbl).addDefaultPartition(msTbl.getSd());
-      applyAlterTable(msTbl);
+      applyAlterTable(msTbl, true);
       reloadFileMetadata = true;
     } else {
       List<HdfsPartition> partitions =
@@ -2418,7 +2425,7 @@ public class CatalogOpExecutor {
           tbl.getMetaStoreTable().deepCopy();
       if (msTbl.getPartitionKeysSize() == 0) reloadFileMetadata = true;
       msTbl.getSd().setLocation(location);
-      applyAlterTable(msTbl);
+      applyAlterTable(msTbl, true);
     } else {
       TableName tableName = tbl.getTableName();
       HdfsPartition partition = catalog_.getHdfsPartition(
@@ -2503,7 +2510,7 @@ public class CatalogOpExecutor {
           throw new UnsupportedOperationException(
               "Unknown target TTablePropertyType: " + params.getTarget());
       }
-      applyAlterTable(msTbl);
+      applyAlterTable(msTbl, true);
     }
   }
 
@@ -2630,7 +2637,7 @@ public class CatalogOpExecutor {
     }
 
     // Update the table metadata.
-    applyAlterTable(msTbl);
+    applyAlterTable(msTbl, true);
     return loadFileMetadata;
   }
 
@@ -2869,13 +2876,19 @@ public class CatalogOpExecutor {
    * command if the metadata is not completely in-sync. This affects both Hive and
    * Impala, but is more important in Impala because the metadata is cached for a
    * longer period of time.
+   * If 'setLastDdlTime' is true, then table property 'transient_lastDdlTime' is updated
+   * to the current time.
    */
-  private void applyAlterTable(org.apache.hadoop.hive.metastore.api.Table msTbl)
+  private void applyAlterTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
+      boolean setLastDdlTime)
       throws ImpalaRuntimeException {
-    long lastDdlTime = -1;
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-      lastDdlTime = calculateDdlTime(msTbl);
-      msTbl.putToParameters("transient_lastDdlTime", Long.toString(lastDdlTime));
+      if (setLastDdlTime) {
+        // It would be enough to remove this table property, as HMS would fill it, but
+        // this would make it necessary to reload the table after alter_table in order to
+        // remain consistent with HMS.
+        Table.updateTimestampProperty(msTbl, Table.TBL_PROP_LAST_DDL_TIME);
+      }
       // Avoid computing/setting stats on the HMS side because that may reset the
       // 'numRows' table property (see HIVE-15653). The DO_NOT_UPDATE_STATS flag
       // tells the HMS not to recompute/reset any statistics on its own. Any
@@ -2886,9 +2899,6 @@ public class CatalogOpExecutor {
     } catch (TException e) {
       throw new ImpalaRuntimeException(
           String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_table"), e);
-    } finally {
-      catalog_.updateLastDdlTime(
-          new TTableName(msTbl.getDbName(), msTbl.getTableName()), lastDdlTime);
     }
   }
 
@@ -2906,7 +2916,6 @@ public class CatalogOpExecutor {
     try {
       MetastoreShim.alterPartitions(
           msClient.getHiveClient(), tableName.getDb(), tableName.getTbl(), hmsPartitions);
-      updateLastDdlTime(msTbl, msClient);
     } catch (TException e) {
       throw new ImpalaRuntimeException(
           String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_partitions"), e);
@@ -3111,42 +3120,6 @@ public class CatalogOpExecutor {
     return fsList;
   }
 
-   /**
-   * Sets the table parameter 'transient_lastDdlTime' to System.currentTimeMillis()/1000
-   * in the given msTbl. 'transient_lastDdlTime' is guaranteed to be changed.
-   * If msClient is not null then this method applies alter_table() to update the
-   * Metastore. Otherwise, the caller is responsible for the final update.
-   */
-  private long updateLastDdlTime(org.apache.hadoop.hive.metastore.api.Table msTbl,
-      MetaStoreClient msClient) throws MetaException, NoSuchObjectException, TException {
-    Preconditions.checkNotNull(msTbl);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Updating lastDdlTime for table: " + msTbl.getTableName());
-    }
-    Map<String, String> params = msTbl.getParameters();
-    long lastDdlTime = calculateDdlTime(msTbl);
-    params.put("transient_lastDdlTime", Long.toString(lastDdlTime));
-    msTbl.setParameters(params);
-    if (msClient != null) {
-      msClient.getHiveClient().alter_table(
-          msTbl.getDbName(), msTbl.getTableName(), msTbl);
-    }
-    catalog_.updateLastDdlTime(
-        new TTableName(msTbl.getDbName(), msTbl.getTableName()), lastDdlTime);
-    return lastDdlTime;
-  }
-
-  /**
-   * Calculates the next transient_lastDdlTime value.
-   */
-  public static long calculateDdlTime(
-      org.apache.hadoop.hive.metastore.api.Table msTbl) {
-    long existingLastDdlTime = CatalogServiceCatalog.getLastDdlTime(msTbl);
-    long currentTime = System.currentTimeMillis() / 1000;
-    if (existingLastDdlTime == currentTime) ++currentTime;
-    return currentTime;
-  }
-
   /**
    * Executes a TResetMetadataRequest and returns the result as a
    * TResetMetadataResponse. Based on the request parameters, this operation
@@ -3269,7 +3242,6 @@ public class CatalogOpExecutor {
    * If the insert touched any pre-existing partitions that were cached, a request to
    * watch the associated cache directives will be submitted. This will result in an
    * async table refresh once the cache request completes.
-   * Updates the lastDdlTime of the table if new partitions were created.
    */
   public TUpdateCatalogResponse updateCatalog(TUpdateCatalogRequest update)
       throws ImpalaException {

http://git-wip-us.apache.org/repos/asf/impala/blob/5c7d3b12/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java b/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java
index 89051c1..8244e96 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java
@@ -1880,6 +1880,7 @@ public class AuthorizationTest extends FrontendTestBase {
           ? ImmutableList.of("", "STATS_GENERATED_VIA_STATS_TASK", "true")
           : ImmutableList.of("", "STATS_GENERATED", "TASK"))
       .add(
+      "","impala.lastComputeStatsTime","*",
       "","numRows","11000",
       "","totalSize","834279",
       "","transient_lastDdlTime","*",
@@ -1930,6 +1931,7 @@ public class AuthorizationTest extends FrontendTestBase {
           ? ImmutableList.of("", "STATS_GENERATED_VIA_STATS_TASK", "true")
           : ImmutableList.of("", "STATS_GENERATED", "TASK"))
       .add(
+      "","impala.lastComputeStatsTime","*",
       "","numRows","100",
       "","totalSize","6472",
       "","transient_lastDdlTime","*",

http://git-wip-us.apache.org/repos/asf/impala/blob/5c7d3b12/tests/metadata/test_last_ddl_time_update.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_last_ddl_time_update.py b/tests/metadata/test_last_ddl_time_update.py
index b1bcfbb..e9b48fc 100644
--- a/tests/metadata/test_last_ddl_time_update.py
+++ b/tests/metadata/test_last_ddl_time_update.py
@@ -21,7 +21,7 @@ import time
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.util.filesystem_utils import WAREHOUSE, IS_S3
 
-# Checks that ALTER and INSERT statements update the last DDL time of the modified table.
+# Checks different statements' effect on last DDL time and last compute stats time.
 class TestLastDdlTimeUpdate(ImpalaTestSuite):
 
   @classmethod
@@ -41,82 +41,188 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite):
       # to regress here.
       cls.ImpalaTestMatrix.add_constraint(lambda v: False)
 
+  # Convenience class to make calls to TestLastDdlTimeUpdate.run_test() shorter by
+  # storing common arguments as members and substituting table name and HDFS warehouse
+  # path to the query string.
+  class TestHelper:
+    def __init__(self, test_suite, db_name, tbl_name):
+      self.test_suite = test_suite
+      self.db_name = db_name
+      self.tbl_name = tbl_name
+      self.fq_tbl_name = "%s.%s" % (self.db_name, self.tbl_name)
+
+    def expect_no_time_change(self, query):
+      """Neither transient_lastDdlTime or impala.lastComputeStatsTime should be
+      changed by running the query.
+      The following strings are substituted in the query: "%(TBL)s" and "%(WAREHOUSE)s"
+      """
+      self.run_test(query, False, False)
+
+    def expect_ddl_time_change(self, query):
+      """Running the query should increase transient_lastDdlTime but
+      not impala.lastComputeStatsTime.
+      The following strings are substituted in the query: "%(TBL)s" and "%(WAREHOUSE)s"
+      """
+      self.run_test(query, True, False)
+
+    def expect_stat_time_change(self, query):
+      """Running the query should increase impala.lastComputeStatsTime but
+      not transient_lastDdlTime.
+      The following strings are substituted in the query: "%(TBL)s" and "%(WAREHOUSE)s"
+      """
+      self.run_test(query, False, True)
+
+    def run_test(self, query, expect_changed_ddl_time, expect_changed_stats_time):
+      """
+      Runs the query and compares the last ddl/compute stats time before and after
+      executing the query. If expect_changed_ddl_time or expect_changed_stat_time
+      is true, then we expect the given table property to be increased, otherwise we
+      expect it to be unchanged.
+      The following strings are substituted in the query: "%(TBL)s" and "%(WAREHOUSE)s"
+      """
+
+      HIVE_LAST_DDL_TIME_PARAM_KEY = "transient_lastDdlTime"
+      LAST_COMPUTE_STATS_TIME_KEY = "impala.lastComputeStatsTime"
+
+      # Get timestamps before executing query.
+      table = self.test_suite.hive_client.get_table(self.db_name, self.tbl_name)
+      assert table is not None
+      beforeDdlTime = table.parameters[HIVE_LAST_DDL_TIME_PARAM_KEY]
+      beforeStatsTime = table.parameters[LAST_COMPUTE_STATS_TIME_KEY]
+      # HMS uses a seconds granularity on the last ddl time - sleeping 1100 ms should be
+      # enough to ensure that the new timestamps are strictly greater than the old ones.
+      time.sleep (1.1)
+
+      self.test_suite.execute_query(query %
+        {'TBL': self.fq_tbl_name, 'WAREHOUSE': WAREHOUSE})
+
+      # Get timestamps after executing query.
+      table = self.test_suite.hive_client.get_table(self.db_name, self.tbl_name)
+      afterDdlTime = table.parameters[HIVE_LAST_DDL_TIME_PARAM_KEY]
+      afterStatsTime = table.parameters[LAST_COMPUTE_STATS_TIME_KEY]
+
+      if expect_changed_ddl_time:
+        # check that the new ddlTime is strictly greater than the old one.
+        assert long(afterDdlTime) > long(beforeDdlTime)
+      else:
+        assert long(afterDdlTime) == long(beforeDdlTime)
+
+      if expect_changed_stats_time:
+        # check that the new statsTime is strictly greater than the old one.
+        assert long(afterStatsTime) > long(beforeStatsTime)
+      else:
+        assert long(afterStatsTime) == long(beforeStatsTime)
+
   def test_alter(self, vector, unique_database):
     TBL_NAME = "alter_test_tbl"
     FQ_TBL_NAME = unique_database + "." + TBL_NAME
+
     self.execute_query("create external table %s (i int) "
                        "partitioned by (j int, s string)" % FQ_TBL_NAME)
 
+    # compute statistics to fill table property impala.lastComputeStatsTime
+    self.execute_query("compute stats %s" % FQ_TBL_NAME)
+
+    h = TestLastDdlTimeUpdate.TestHelper(self, unique_database, TBL_NAME)
+
     # add/drop partitions
-    self.run_test("alter table %s add partition (j=1, s='2012')" % FQ_TBL_NAME,
-                  unique_database, TBL_NAME, False)
-    self.run_test("alter table %s add if not exists "
-                  "partition (j=1, s='2012')" % FQ_TBL_NAME,
-                  unique_database, TBL_NAME, False)
-    self.run_test("alter table %s drop partition (j=1, s='2012')" % FQ_TBL_NAME,
-                  unique_database, TBL_NAME, False)
-    self.run_test("alter table %s drop if exists "
-                  "partition (j=2, s='2012')" % FQ_TBL_NAME,
-                  unique_database, TBL_NAME, False)
-    # rename columns
-    self.run_test("alter table %s change column i k int" % FQ_TBL_NAME,
-                  unique_database, TBL_NAME, True)
-    self.run_test("alter table %s change column k i int" % FQ_TBL_NAME,
-                  unique_database, TBL_NAME, True)
+    h.expect_no_time_change("alter table %(TBL)s add partition (j=1, s='2012')")
+    h.expect_no_time_change("alter table %(TBL)s add if not exists "
+                            "partition (j=1, s='2012')")
+    h.expect_no_time_change("alter table %(TBL)s drop partition (j=1, s='2012')")
+    h.expect_no_time_change("alter table %(TBL)s drop if exists "
+                            "partition (j=2, s='2012')")
     # change location of table
-    self.run_test("alter table %s set location "
-                  "'%s'" % (FQ_TBL_NAME, WAREHOUSE),
-                  unique_database, TBL_NAME, True)
+    h.expect_ddl_time_change("alter table %(TBL)s set location '%(WAREHOUSE)s'")
     # change format of table
-    self.run_test("alter table %s set fileformat textfile" % FQ_TBL_NAME,
-                  unique_database, TBL_NAME, True)
+    h.expect_ddl_time_change("alter table %(TBL)s set fileformat textfile")
+
+    self.run_common_test_cases(h)
+
+    # prepare for incremental statistics tests
+    self.execute_query("drop stats %s" % FQ_TBL_NAME)
+    self.execute_query("alter table %s add partition (j=1, s='2012')" % FQ_TBL_NAME)
+
+    # compute incremental statistics
+    h.expect_stat_time_change("compute incremental stats %(TBL)s")
+    h.expect_stat_time_change("compute incremental stats %(TBL)s "
+                              "partition (j=1, s='2012')")
+
+    # drop incremental statistics
+    h.expect_no_time_change("drop incremental stats %(TBL)s partition (j=1, s='2012')")
+
+    # prepare for table sample statistics tests
+    self.execute_query(
+        "alter table %s set tblproperties ('impala.enable.stats.extrapolation'='true')"
+        % FQ_TBL_NAME)
+
+    # compute sampled statistics
+    h.expect_stat_time_change("compute stats %(TBL)s tablesample system(20)")
+
 
   def test_insert(self, vector, unique_database):
     TBL_NAME = "insert_test_tbl"
     FQ_TBL_NAME = unique_database + "." + TBL_NAME
     self.execute_query("create external table %s (i int) "
                        "partitioned by (j int, s string)" % FQ_TBL_NAME)
+
+    # initialize compute stats time
+    self.execute_query("compute stats %s" % FQ_TBL_NAME)
+
+    h = TestLastDdlTimeUpdate.TestHelper(self, unique_database, TBL_NAME)
+
     # static partition insert
-    self.run_test("insert into %s partition(j=1, s='2012') "
-                  "select 10" % FQ_TBL_NAME, unique_database, TBL_NAME, False)
+    h.expect_no_time_change("insert into %(TBL)s partition(j=1, s='2012') select 10")
     # dynamic partition insert
-    self.run_test("insert into %s partition(j, s) "
-                  "select 10, 2, '2013'" % FQ_TBL_NAME, unique_database, TBL_NAME, False)
+    h.expect_no_time_change("insert into %(TBL)s partition(j, s) select 10, 2, '2013'")
+
     # dynamic partition insert changing no partitions (empty input)
-    self.run_test("insert into %s partition(j, s) "
-                  "select * from (select 10 as i, 2 as j, '2013' as s) as t "
-                  "where t.i < 10" % FQ_TBL_NAME, unique_database, TBL_NAME, False)
+    h.expect_no_time_change("insert into %(TBL)s partition(j, s) "
+                            "select * from (select 10 as i, 2 as j, '2013' as s) as t "
+                            "where t.i < 10")
     # dynamic partition insert modifying an existing partition
-    self.run_test("insert into %s partition(j, s) "
-                  "select 20, 1, '2012'" % FQ_TBL_NAME, unique_database, TBL_NAME, False)
-
-  def run_test(self, query, db_name, table_name, expect_changed):
-    """
-    Runs the given query (expected to be an ALTER or INSERT statement)
-    and compares the last ddl time before and after executing the query.
-    If expect_change is true then we expect the last ddl time to increase,
-    otherwise we expect no change.
-    """
-
-    HIVE_LAST_DDL_TIME_PARAM_KEY = "transient_lastDdlTime"
-
-    # Get last DDL time before executing query.
-    table = self.hive_client.get_table(db_name, table_name)
-    assert table is not None
-    beforeDdlTime = table.parameters[HIVE_LAST_DDL_TIME_PARAM_KEY]
-
-    # Sleep for 2s to make sure the new ddlTime is strictly greater than the old one.
-    # Hive uses a seconds granularity on the last ddl time.
-    time.sleep (2)
-
-    self.execute_query(query)
-
-    # Get last ddl time after executing query .
-    table = self.hive_client.get_table(db_name, table_name)
-    afterDdlTime = table.parameters[HIVE_LAST_DDL_TIME_PARAM_KEY]
-
-    if expect_changed:
-      # check that the new ddlTime is strictly greater than the old one.
-      assert long(afterDdlTime) > long(beforeDdlTime)
-    else:
-      assert long(afterDdlTime) == long(beforeDdlTime)
+    h.expect_no_time_change("insert into %(TBL)s partition(j, s) select 20, 1, '2012'")
+
+  def test_kudu(self, vector, unique_database):
+    TBL_NAME = "kudu_test_tbl"
+    FQ_TBL_NAME = unique_database + "." + TBL_NAME
+    self.execute_query("create table %s (i int primary key) "
+                       "partition by hash(i) partitions 3 stored as kudu" % FQ_TBL_NAME)
+
+    # initialize last compute stats time
+    self.execute_query("compute stats %s" % FQ_TBL_NAME)
+
+    h = TestLastDdlTimeUpdate.TestHelper(self, unique_database, TBL_NAME)
+
+    # insert
+    h.expect_no_time_change("insert into %s values (1)" % FQ_TBL_NAME)
+
+    self.run_common_test_cases(h)
+
+  # Tests that should behave the same with HDFS and Kudu tables.
+  def run_common_test_cases(self, test_helper):
+    h = test_helper
+    # rename columns
+    h.expect_ddl_time_change("alter table %(TBL)s change column i k int")
+    h.expect_ddl_time_change("alter table %(TBL)s change column k i int")
+    # change table property
+    h.expect_ddl_time_change("alter table %(TBL)s set tblproperties ('a'='b')")
+
+    # changing table statistics manually
+    h.expect_ddl_time_change("alter table %(TBL)s set tblproperties ('numRows'='1')")
+
+    # changing column statistics manually
+    h.expect_no_time_change("alter table %(TBL)s set column stats i ('numdvs'='3')")
+
+    # compute statistics
+    h.expect_stat_time_change("compute stats %(TBL)s")
+    # compute statistics for a single column
+    h.expect_stat_time_change("compute stats %(TBL)s (i)")
+
+    # drop statistics
+    h.expect_no_time_change("drop stats %(TBL)s")
+
+    # invalidate metadata and reload table
+    self.execute_query("invalidate metadata %s" % h.fq_tbl_name)
+    # run any query to reload the table
+    h.expect_no_time_change("describe %(TBL)s")