You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/12/15 23:50:05 UTC

[impala] branch master updated (313d758 -> f2f348c)

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 313d758  IMPALA-9235: add more per-connection stats to /rpcz
     new 590da59  IMPALA-8706: ISO:SQL:2016 datetime patterns - Milestone 4
     new 6ebea33  IMPALA-9092: Add support for creating external Kudu table
     new ee9823d  [DOCS] Fixed typos and outdated terms in impala_proxy.xml
     new f2f348c  IMPALA-7550: Add documentation to profile counters

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/hbase-scan-node.cc                     |   5 +-
 be/src/exec/hbase-scan-node.h                      |   2 +-
 be/src/exec/hdfs-scan-node-base.cc                 | 167 +++++++---
 be/src/exec/hdfs-scan-node-base.h                  |  57 +---
 be/src/exec/kudu-scan-node-base.cc                 |   4 +-
 be/src/exec/scan-node.cc                           | 100 +++---
 be/src/exec/scan-node.h                            |  17 --
 be/src/exprs/date-functions-ir.cc                  |   2 +-
 be/src/runtime/coordinator-backend-state.cc        |  17 +-
 be/src/runtime/coordinator.cc                      |  90 +++---
 be/src/runtime/date-parse-util.cc                  |  21 +-
 be/src/runtime/date-test.cc                        | 150 +++++++--
 be/src/runtime/date-value.cc                       | 134 ++++++---
 be/src/runtime/date-value.h                        |  27 +-
 be/src/runtime/datetime-iso-sql-format-parser.cc   | 101 +++++--
 be/src/runtime/datetime-iso-sql-format-parser.h    |  35 ++-
 .../runtime/datetime-iso-sql-format-tokenizer.cc   |  49 ++-
 be/src/runtime/datetime-parser-common.cc           | 113 +++++--
 be/src/runtime/datetime-parser-common.h            |  41 ++-
 be/src/runtime/timestamp-parse-util.cc             |  38 ++-
 be/src/runtime/timestamp-parse-util.h              |   8 +
 be/src/util/debug-counters.h                       |  72 -----
 be/src/util/default-path-handlers.cc               |  46 +++
 be/src/util/runtime-profile-counters.h             | 227 ++++++++++++++
 be/src/util/runtime-profile.cc                     |  65 ++++
 docs/topics/impala_proxy.xml                       |  37 +--
 .../analysis/AlterTableSetTblProperties.java       |   7 +-
 .../apache/impala/analysis/CreateTableStmt.java    |  59 ++--
 .../org/apache/impala/analysis/ToSqlUtils.java     |  13 +-
 .../java/org/apache/impala/catalog/KuduTable.java  |  15 +-
 .../main/java/org/apache/impala/catalog/Table.java |  23 ++
 .../apache/impala/service/CatalogOpExecutor.java   |  24 +-
 .../impala/service/KuduCatalogOpExecutor.java      |  16 +-
 .../apache/impala/analysis/AnalyzeKuduDDLTest.java | 283 ++++++++++-------
 .../org/apache/impala/catalog/CatalogTest.java     |  45 ++-
 .../impala/catalog/local/LocalCatalogTest.java     |  35 ++-
 .../org/apache/impala/common/FrontendFixture.java  |   2 +-
 .../queries/QueryTest/show-create-table.test       | 188 ++++++++++--
 tests/common/skip.py                               |   6 +-
 tests/metadata/test_show_create_table.py           |  18 +-
 tests/query_test/test_cast_with_format.py          | 335 ++++++++++++++++++++-
 tests/query_test/test_kudu.py                      | 211 ++++++++++++-
 www/profile_docs.tmpl                              |  91 ++++++
 www/query_profile.tmpl                             |   1 +
 44 files changed, 2320 insertions(+), 677 deletions(-)
 delete mode 100644 be/src/util/debug-counters.h
 create mode 100644 www/profile_docs.tmpl


[impala] 03/04: [DOCS] Fixed typos and outdated terms in impala_proxy.xml

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ee9823d0dd96fe3e4705a9c1499834a748d91cf9
Author: Alex Rodoni <ar...@cloudera.com>
AuthorDate: Fri Dec 13 13:46:36 2019 -0800

    [DOCS] Fixed typos and outdated terms in impala_proxy.xml
    
    Change-Id: I2f1ee44cddd71e2d929d1253a73b33e52c08ad42
    Reviewed-on: http://gerrit.cloudera.org:8080/14907
    Reviewed-by: Alex Rodoni <ar...@cloudera.com>
    Tested-by: Alex Rodoni <ar...@cloudera.com>
---
 docs/topics/impala_proxy.xml | 37 ++++++++++++++++---------------------
 1 file changed, 16 insertions(+), 21 deletions(-)

diff --git a/docs/topics/impala_proxy.xml b/docs/topics/impala_proxy.xml
index ca67885..581272b 100644
--- a/docs/topics/impala_proxy.xml
+++ b/docs/topics/impala_proxy.xml
@@ -44,7 +44,7 @@ under the License.
 
     <p>
       For most clusters that have multiple users and production availability requirements, you
-      might set up a proxy server to relay requests to and from Impala.
+      might want to set up a load-balancing proxy server to relay requests to and from Impala.
     </p>
 
     <p>
@@ -87,13 +87,12 @@ under the License.
           server rather than a specific host running the <cmdname>impalad</cmdname> daemon.
         </li>
 
-        <li>
-          The coordinator node for each Impala query potentially requires more memory and CPU
-          cycles than the other nodes that process the query. The proxy server can issue queries
-          so that each connection uses a different coordinator node. This load-balancing
-          technique lets the Impala nodes share this additional work, rather than concentrating
-          it on a single machine.
-        </li>
+        <li> The coordinator node for each Impala query potentially requires
+          more memory and CPU cycles than the other nodes that process the
+          query. The proxy server can issue queries so that each connection uses
+          a different coordinator node. This load-balancing technique lets the
+            <cmdname>impalad</cmdname> nodes share this additional work, rather
+          than concentrating it on a single machine. </li>
       </ul>
 
       <p>
@@ -398,8 +397,8 @@ klist -k <varname>keytabfile</varname>
         </p>
 
         <p>
-          In <filepath>hue.ini</filepath>, set the following for to configure Hue to
-          automatically connect to the proxy server:
+          In <filepath>hue.ini</filepath>, set the following to configure Hue to automatically
+          connect to the proxy server:
         </p>
 
 <codeblock>[impala]
@@ -457,17 +456,13 @@ jdbc:hive2://<varname>proxy_host</varname>:<varname>load_balancer_port</varname>
 
       <dl>
         <dlentry>
-
-          <dt>
-            Client/Server SSL
-          </dt>
-
-          <dd>
-            In this configuration, the proxy server presents an SSL certificate to the client,
-            decrypts the client request, then re-encrypts the request before sending it to the
-            backend <codeph>impalad</codeph>. The client and server certificates can be managed
-            separately. The request or resulting payload is encrypted in transit at all times.
-          </dd>
+          <dt> TLS/SSL Bridging</dt>
+          <dd> In this configuration, the proxy server presents a TLS/SSL
+            certificate to the client, decrypts the client request, then
+            re-encrypts the request before sending it to the backend
+              <codeph>impalad</codeph>. The client and server certificates can
+            be managed separately. The request or resulting payload is encrypted
+            in transit at all times. </dd>
 
         </dlentry>
 


[impala] 04/04: IMPALA-7550: Add documentation to profile counters

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f2f348c0f93208a0f34c33b6a4dc82f4d9d4b290
Author: Jiawei Wang <ji...@cloudera.com>
AuthorDate: Wed Dec 4 16:16:22 2019 -0600

    IMPALA-7550: Add documentation to profile counters
    
    This change changes the way developers define profile counters by
    generating counters from a counter registry. All the profile counters
    will be register there first and then used in the same way as before.
    By doing so, we will be able to manage profile counters in a way that
    we can define profile counters documentation. For example:
    
    Declaration:
    PROFILE_DEFINE_COUNTER(NumBackends, STABLE_HIGH, TUnit::UNIT,
        "Number of backends running this query.");
    
    Initialization:
    COUNTER_SET(PROFILE_NumBackends.Instantiate(query_profile_), num_backends);
    
    This shall be how we define a NumBackends counter. It follows with
    its significance, type, description in the declaration part.
    
    Users now will be able to view profile counters documentation under
    query_profile page, there is a Profile Documentation button which
    leads to /profile_docs.
    
    More details:
    This commit did the following refactors on profile counters.
    1. Add a singleton registry for runtime profile counters prototypes,
    similiar to what Kudu does for metrics. This allows us to generate
    profile documentation for all counters from the code. We add
    /profile_docs and a correspoding UI for the documentation of profile
    counters.
    
    2. Profile counters are also annotated with their significance to users.
    * STABLE_HIGH - High level and stable counters, always useful for measuring
    query performance and status. Counters that everyone is interested. should
    rarely change and if it does we will make some effort to notify users.
    
    * STABLE_LOW - Low level and stable counters. Interesting counters to monitor
     and analyze by machine. It will probably be interesting under some
     circumstances for users.
    
    * Unstable - Unstable but useful. Useful to understand query performance,
    but subject to change, particularly if the implementation changes.
    E.g. MaterializeTupleTimer
    
    * Debug -  Debugging counters. Generally not useful to users of Impala,
     the main use case is low-level debugging. Can be hidden to reduce noise
     for most consumers of profiles.
    
    3. We have around 250 counters. This commit did the replacement in
    scan-node and hdfs-scan-node-base and coordinator.
    
    Concers:
    The downside is that we will have duplicate comments of query profiles
    both in the header file and the .cc file.
    Additionally a (arguably good) limitation is that profile counter names
    need to be unique.
    
    Change-Id: Idc03faddb27754001290bb6d899840e2cbe7ccb7
    Reviewed-on: http://gerrit.cloudera.org:8080/14776
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/hbase-scan-node.cc              |   5 +-
 be/src/exec/hbase-scan-node.h               |   2 +-
 be/src/exec/hdfs-scan-node-base.cc          | 167 ++++++++++++++------
 be/src/exec/hdfs-scan-node-base.h           |  57 +------
 be/src/exec/kudu-scan-node-base.cc          |   4 +-
 be/src/exec/scan-node.cc                    | 100 ++++++------
 be/src/exec/scan-node.h                     |  17 ---
 be/src/runtime/coordinator-backend-state.cc |  17 ++-
 be/src/runtime/coordinator.cc               |  90 ++++++-----
 be/src/util/debug-counters.h                |  72 ---------
 be/src/util/default-path-handlers.cc        |  46 ++++++
 be/src/util/runtime-profile-counters.h      | 227 ++++++++++++++++++++++++++++
 be/src/util/runtime-profile.cc              |  65 ++++++++
 www/profile_docs.tmpl                       |  91 +++++++++++
 www/query_profile.tmpl                      |   1 +
 15 files changed, 683 insertions(+), 278 deletions(-)

diff --git a/be/src/exec/hbase-scan-node.cc b/be/src/exec/hbase-scan-node.cc
index c9380d8..986c5ac 100644
--- a/be/src/exec/hbase-scan-node.cc
+++ b/be/src/exec/hbase-scan-node.cc
@@ -36,6 +36,9 @@
 #include "common/names.h"
 using namespace impala;
 
+PROFILE_DEFINE_TIMER(TotalRawHBaseReadTime, STABLE_HIGH,
+    "Aggregate wall clock time spent reading from HBase.");
+
 HBaseScanNode::HBaseScanNode(ObjectPool* pool, const TPlanNode& tnode,
                              const DescriptorTbl& descs)
     : ScanNode(pool, tnode, descs),
@@ -59,7 +62,7 @@ HBaseScanNode::~HBaseScanNode() {
 
 Status HBaseScanNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(ScanNode::Prepare(state));
-  hbase_read_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HBASE_READ_TIMER);
+  hbase_read_timer_ = PROFILE_TotalRawHBaseReadTime.Instantiate(runtime_profile());
   AddBytesReadCounters();
 
   hbase_scanner_.reset(
diff --git a/be/src/exec/hbase-scan-node.h b/be/src/exec/hbase-scan-node.h
index bd9502e..4c60220 100644
--- a/be/src/exec/hbase-scan-node.h
+++ b/be/src/exec/hbase-scan-node.h
@@ -116,7 +116,7 @@ class HBaseScanNode : public ScanNode {
   /// will be 0.
   int suggested_max_caching_;
 
-  /// Total wall clock time spent reading from HBase.
+  /// Definition can be found in hbase_scan_node.cc
   RuntimeProfile::Counter* hbase_read_timer_ = nullptr;
 
   /// Writes a slot in tuple from an HBase value containing text data.
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index e755ff6..0471ae3 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -66,6 +66,80 @@ using namespace impala;
 using namespace impala::io;
 using namespace strings;
 
+PROFILE_DEFINE_TIMER(TotalRawHdfsReadTime, STABLE_LOW, "Aggregate wall clock time"
+    " across all Disk I/O threads in HDFS read operations.");
+PROFILE_DEFINE_TIMER(TotalRawHdfsOpenFileTime, STABLE_LOW, "Aggregate wall clock time"
+    " spent across all Disk I/O threads in HDFS open operations.");
+PROFILE_DEFINE_DERIVED_COUNTER(PerReadThreadRawHdfsThroughput, STABLE_LOW,
+    TUnit::BYTES_PER_SECOND, "The read throughput in bytes/sec for each HDFS read thread"
+    " while it is executing I/O operations on behalf of a scan.");
+PROFILE_DEFINE_COUNTER(ScanRangesComplete, STABLE_LOW, TUnit::UNIT,
+    "Number of scan ranges that have been completed by a scan node.");
+PROFILE_DEFINE_COUNTER(CollectionItemsRead, STABLE_LOW, TUnit::UNIT,
+    "Total number of nested collection items read by the scan. Only created for scans "
+    "(e.g. Parquet) that support nested types.");
+PROFILE_DEFINE_COUNTER(NumDisksAccessed, STABLE_LOW, TUnit::UNIT, "Number of distinct "
+     "disks accessed by HDFS scan. Each local disk is counted as a disk and each type of"
+     " remote filesystem (e.g. HDFS remote reads, S3) is counted as a distinct disk.");
+PROFILE_DEFINE_SAMPLING_COUNTER(AverageHdfsReadThreadConcurrency, STABLE_LOW, "The"
+     " average number of HDFS read threads executing read operations on behalf of this "
+     "scan. Higher values (i.e. close to the aggregate number of I/O threads across "
+     "all disks accessed) show that this scan is using a larger proportion of the I/O "
+     "capacity of the system. Lower values show that either this scan is not I/O bound"
+     " or that it is getting a small share of the I/O capacity of the system.");
+PROFILE_DEFINE_SUMMARY_STATS_COUNTER(InitialRangeIdealReservation, DEBUG, TUnit::BYTES,
+     "Tracks stats about the ideal reservation for initial scan ranges. Use this to "
+     "determine if the scan got all of the reservation it wanted. Does not include "
+     "subsequent reservation increases done by scanner implementation (e.g. for Parquet "
+     "columns).");
+PROFILE_DEFINE_SUMMARY_STATS_COUNTER(InitialRangeActualReservation, DEBUG,
+    TUnit::BYTES, "Tracks stats about the actual reservation for initial scan ranges. "
+    "Use this to determine if the scan got all of the reservation it wanted. Does not "
+    "include subsequent reservation increases done by scanner implementation "
+    "(e.g. for Parquet columns).");
+PROFILE_DEFINE_COUNTER(BytesReadLocal, STABLE_LOW, TUnit::BYTES,
+    "The total number of bytes read locally");
+PROFILE_DEFINE_COUNTER(BytesReadShortCircuit, STABLE_LOW, TUnit::BYTES,
+    "The total number of bytes read via short circuit read");
+PROFILE_DEFINE_COUNTER(BytesReadDataNodeCache, STABLE_HIGH, TUnit::BYTES,
+    "The total number of bytes read from data node cache");
+PROFILE_DEFINE_COUNTER(RemoteScanRanges, STABLE_HIGH, TUnit::UNIT,
+    "The total number of remote scan ranges");
+PROFILE_DEFINE_COUNTER(BytesReadRemoteUnexpected, STABLE_LOW, TUnit::BYTES,
+    "The total number of bytes read remotely that were expected to be local");
+PROFILE_DEFINE_COUNTER(CachedFileHandlesHitCount, STABLE_LOW, TUnit::UNIT,
+    "Total number of file handle opens where the file handle was present in the cache");
+PROFILE_DEFINE_COUNTER(CachedFileHandlesMissCount, STABLE_LOW, TUnit::UNIT,
+    "Total number of file handle opens where the file handle was not in the cache");
+PROFILE_DEFINE_HIGH_WATER_MARK_COUNTER(MaxCompressedTextFileLength, STABLE_LOW,
+    TUnit::BYTES, "The size of the largest compressed text file to be scanned. "
+    "This is used to estimate scanner thread memory usage.");
+PROFILE_DEFINE_TIMER(ScannerIoWaitTime, STABLE_LOW, "Total amount of time scanner "
+    "threads spent waiting for I/O. This value can be compared to the value of "
+    "ScannerThreadsTotalWallClockTime of MT_DOP = 0 scan nodes or otherwise compared "
+    "to the total time reported for MT_DOP > 0 scan nodes. High values show that "
+    "scanner threads are spending significant time waiting for I/O instead of "
+    "processing data. Note that this includes the time when the thread is runnable "
+    "but not scheduled.");
+PROFILE_DEFINE_SUMMARY_STATS_COUNTER(ParquetUncompressedBytesReadPerColumn, STABLE_LOW,
+    TUnit::BYTES, "Stats about the number of uncompressed bytes read per column. "
+    "Each sample in the counter is the size of a single column that is scanned by the "
+    "scan node.");
+PROFILE_DEFINE_SUMMARY_STATS_COUNTER(ParquetCompressedBytesReadPerColumn, STABLE_LOW,
+    TUnit::BYTES, "Stats about the number of compressed bytes read per column. "
+    "Each sample in the counter is the size of a single column that is scanned by the "
+    "scan node.");
+PROFILE_DEFINE_COUNTER(DataCacheHitCount, STABLE_HIGH, TUnit::UNIT,
+    "Total count of data cache hit");
+PROFILE_DEFINE_COUNTER(DataCachePartialHitCount, STABLE_HIGH, TUnit::UNIT,
+    "Total count of data cache partially hit");
+PROFILE_DEFINE_COUNTER(DataCacheMissCount, STABLE_HIGH, TUnit::UNIT,
+    "Total count of data cache miss");
+PROFILE_DEFINE_COUNTER(DataCacheHitBytes, STABLE_HIGH, TUnit::BYTES,
+    "Total bytes of data cache hit");
+PROFILE_DEFINE_COUNTER(DataCacheMissBytes, STABLE_HIGH, TUnit::BYTES,
+    "Total bytes of data cache miss");
+
 const string HdfsScanNodeBase::HDFS_SPLIT_STATS_DESC =
     "Hdfs split stats (<volume id>:<# splits>/<split lengths>)";
 
@@ -356,33 +430,30 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
   reader_context_ = ExecEnv::GetInstance()->disk_io_mgr()->RegisterContext();
 
   // Initialize HdfsScanNode specific counters
-  hdfs_read_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HDFS_READ_TIMER);
-  hdfs_open_file_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HDFS_OPEN_FILE_TIMER);
-  per_read_thread_throughput_counter_ = runtime_profile()->AddDerivedCounter(
-      PER_READ_THREAD_THROUGHPUT_COUNTER, TUnit::BYTES_PER_SECOND,
+  hdfs_read_timer_ = PROFILE_TotalRawHdfsReadTime.Instantiate(runtime_profile());
+  hdfs_open_file_timer_ =
+      PROFILE_TotalRawHdfsOpenFileTime.Instantiate(runtime_profile());
+  per_read_thread_throughput_counter_ =
+      PROFILE_PerReadThreadRawHdfsThroughput.Instantiate(runtime_profile(),
       bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_read_counter_,
       hdfs_read_timer_));
   scan_ranges_complete_counter_ =
-      ADD_COUNTER(runtime_profile(), SCAN_RANGES_COMPLETE_COUNTER, TUnit::UNIT);
+      PROFILE_ScanRangesComplete.Instantiate(runtime_profile());
   collection_items_read_counter_ =
-      ADD_COUNTER(runtime_profile(), COLLECTION_ITEMS_READ_COUNTER, TUnit::UNIT);
+      PROFILE_CollectionItemsRead.Instantiate(runtime_profile());
   if (DiskInfo::num_disks() < 64) {
     num_disks_accessed_counter_ =
-        ADD_COUNTER(runtime_profile(), NUM_DISKS_ACCESSED_COUNTER, TUnit::UNIT);
+        PROFILE_NumDisksAccessed.Instantiate(runtime_profile());
   } else {
     num_disks_accessed_counter_ = NULL;
   }
 
-  data_cache_hit_count_ = ADD_COUNTER(runtime_profile(),
-      "DataCacheHitCount", TUnit::UNIT);
-  data_cache_partial_hit_count_ = ADD_COUNTER(runtime_profile(),
-      "DataCachePartialHitCount", TUnit::UNIT);
-  data_cache_miss_count_ = ADD_COUNTER(runtime_profile(),
-      "DataCacheMissCount", TUnit::UNIT);
-  data_cache_hit_bytes_ = ADD_COUNTER(runtime_profile(),
-      "DataCacheHitBytes", TUnit::BYTES);
-  data_cache_miss_bytes_ = ADD_COUNTER(runtime_profile(),
-      "DataCacheMissBytes", TUnit::BYTES);
+  data_cache_hit_count_ = PROFILE_DataCacheHitCount.Instantiate(runtime_profile());
+  data_cache_partial_hit_count_ =
+      PROFILE_DataCachePartialHitCount.Instantiate(runtime_profile());
+  data_cache_miss_count_ = PROFILE_DataCacheMissCount.Instantiate(runtime_profile());
+  data_cache_hit_bytes_ = PROFILE_DataCacheHitBytes.Instantiate(runtime_profile());
+  data_cache_miss_bytes_ = PROFILE_DataCacheMissBytes.Instantiate(runtime_profile());
 
   reader_context_->set_bytes_read_counter(bytes_read_counter());
   reader_context_->set_read_timer(hdfs_read_timer_);
@@ -395,38 +466,36 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
   reader_context_->set_data_cache_hit_bytes_counter(data_cache_hit_bytes_);
   reader_context_->set_data_cache_miss_bytes_counter(data_cache_miss_bytes_);
 
-  average_hdfs_read_thread_concurrency_ = runtime_profile()->AddSamplingCounter(
-      AVERAGE_HDFS_READ_THREAD_CONCURRENCY, &active_hdfs_read_thread_counter_);
-
-  initial_range_ideal_reservation_stats_ = ADD_SUMMARY_STATS_COUNTER(runtime_profile(),
-      "InitialRangeIdealReservation", TUnit::BYTES);
-  initial_range_actual_reservation_stats_ = ADD_SUMMARY_STATS_COUNTER(runtime_profile(),
-      "InitialRangeActualReservation", TUnit::BYTES);
-
-  uncompressed_bytes_read_per_column_counter_ = ADD_SUMMARY_STATS_COUNTER(
-      runtime_profile(), "ParquetUncompressedBytesReadPerColumn", TUnit::BYTES);
-  compressed_bytes_read_per_column_counter_ = ADD_SUMMARY_STATS_COUNTER(
-      runtime_profile(), "ParquetCompressedBytesReadPerColumn", TUnit::BYTES);
-
-  bytes_read_local_ = ADD_COUNTER(runtime_profile(), "BytesReadLocal",
-      TUnit::BYTES);
-  bytes_read_short_circuit_ = ADD_COUNTER(runtime_profile(), "BytesReadShortCircuit",
-      TUnit::BYTES);
-  bytes_read_dn_cache_ = ADD_COUNTER(runtime_profile(), "BytesReadDataNodeCache",
-      TUnit::BYTES);
-  num_remote_ranges_ = ADD_COUNTER(runtime_profile(), "RemoteScanRanges",
-      TUnit::UNIT);
-  unexpected_remote_bytes_ = ADD_COUNTER(runtime_profile(), "BytesReadRemoteUnexpected",
-      TUnit::BYTES);
-  cached_file_handles_hit_count_ = ADD_COUNTER(runtime_profile(),
-      "CachedFileHandlesHitCount", TUnit::UNIT);
-  cached_file_handles_miss_count_ = ADD_COUNTER(runtime_profile(),
-      "CachedFileHandlesMissCount", TUnit::UNIT);
-
-  max_compressed_text_file_length_ = runtime_profile()->AddHighWaterMarkCounter(
-      "MaxCompressedTextFileLength", TUnit::BYTES);
-
-  scanner_io_wait_time_ = ADD_TIMER(runtime_profile(), "ScannerIoWaitTime");
+  average_hdfs_read_thread_concurrency_ =
+      PROFILE_AverageHdfsReadThreadConcurrency.Instantiate(runtime_profile(),
+      &active_hdfs_read_thread_counter_);
+
+  initial_range_ideal_reservation_stats_ =
+      PROFILE_InitialRangeIdealReservation.Instantiate(runtime_profile());
+  initial_range_actual_reservation_stats_ =
+      PROFILE_InitialRangeActualReservation.Instantiate(runtime_profile());
+
+  uncompressed_bytes_read_per_column_counter_ =
+      PROFILE_ParquetUncompressedBytesReadPerColumn.Instantiate(runtime_profile());
+  compressed_bytes_read_per_column_counter_ =
+      PROFILE_ParquetCompressedBytesReadPerColumn.Instantiate(runtime_profile());
+
+  bytes_read_local_ = PROFILE_BytesReadLocal.Instantiate(runtime_profile());
+  bytes_read_short_circuit_ =
+      PROFILE_BytesReadShortCircuit.Instantiate(runtime_profile());
+  bytes_read_dn_cache_ = PROFILE_BytesReadDataNodeCache.Instantiate(runtime_profile());
+  num_remote_ranges_ = PROFILE_RemoteScanRanges.Instantiate(runtime_profile());
+  unexpected_remote_bytes_ =
+      PROFILE_BytesReadRemoteUnexpected.Instantiate(runtime_profile());
+  cached_file_handles_hit_count_ =
+      PROFILE_CachedFileHandlesHitCount.Instantiate(runtime_profile());
+  cached_file_handles_miss_count_ =
+      PROFILE_CachedFileHandlesMissCount.Instantiate(runtime_profile());
+
+  max_compressed_text_file_length_ =
+      PROFILE_MaxCompressedTextFileLength.Instantiate(runtime_profile());
+
+  scanner_io_wait_time_ = PROFILE_ScannerIoWaitTime.Instantiate(runtime_profile());
   hdfs_read_thread_concurrency_bucket_ = runtime_profile()->AddBucketingCounters(
       &active_hdfs_read_thread_counter_,
       ExecEnv::GetInstance()->disk_io_mgr()->num_total_disks() + 1);
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index f7c33ad..b05006d 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -522,83 +522,42 @@ class HdfsScanNodeBase : public ScanNode {
   /// profile.
   bool counters_running_ = false;
 
-  /// The size of the largest compressed text file to be scanned. This is used to
-  /// estimate scanner thread memory usage.
-  RuntimeProfile::HighWaterMarkCounter* max_compressed_text_file_length_ = nullptr;
-
   /// Disk accessed bitmap
   RuntimeProfile::Counter disks_accessed_bitmap_;
+  /// The number of active hdfs reading threads reading for this node.
+  RuntimeProfile::Counter active_hdfs_read_thread_counter_;
 
-  /// Total number of bytes read locally
+  /// Definition of following counters can be found in hdfs-scan-node-base.cc
+  RuntimeProfile::HighWaterMarkCounter* max_compressed_text_file_length_ = nullptr;
   RuntimeProfile::Counter* bytes_read_local_ = nullptr;
-
-  /// Total number of bytes read via short circuit read
   RuntimeProfile::Counter* bytes_read_short_circuit_ = nullptr;
-
-  /// Total number of bytes read from data node cache
   RuntimeProfile::Counter* bytes_read_dn_cache_ = nullptr;
-
-  /// Total number of remote scan ranges
   RuntimeProfile::Counter* num_remote_ranges_ = nullptr;
-
-  /// Total number of bytes read remotely that were expected to be local
   RuntimeProfile::Counter* unexpected_remote_bytes_ = nullptr;
-
-  /// Total number of file handle opens where the file handle was present in the cache
   RuntimeProfile::Counter* cached_file_handles_hit_count_ = nullptr;
-
-  /// Total number of file handle opens where the file handle was not in the cache
   RuntimeProfile::Counter* cached_file_handles_miss_count_ = nullptr;
-
-  /// Counters for data cache.
   RuntimeProfile::Counter* data_cache_hit_count_ = nullptr;
   RuntimeProfile::Counter* data_cache_partial_hit_count_ = nullptr;
   RuntimeProfile::Counter* data_cache_miss_count_ = nullptr;
   RuntimeProfile::Counter* data_cache_hit_bytes_ = nullptr;
   RuntimeProfile::Counter* data_cache_miss_bytes_ = nullptr;
-
-  /// The amount of time scanner threads spend waiting for I/O.
   RuntimeProfile::Counter* scanner_io_wait_time_ = nullptr;
-
-  /// The number of active hdfs reading threads reading for this node.
-  RuntimeProfile::Counter active_hdfs_read_thread_counter_;
-
-  /// Average number of active hdfs reading threads
-  /// This should be created in Open() and stopped when all the scanner threads are done.
   RuntimeProfile::Counter* average_hdfs_read_thread_concurrency_ = nullptr;
-
-  /// HDFS read thread concurrency bucket: bucket[i] refers to the number of sample
-  /// taken where there are i concurrent hdfs read thread running. Created in Open().
-  std::vector<RuntimeProfile::Counter*>* hdfs_read_thread_concurrency_bucket_ = nullptr;
-
-  /// HDFS read throughput per Disk I/O thread [bytes/sec],
   RuntimeProfile::Counter* per_read_thread_throughput_counter_ = nullptr;
-
-  /// Total number of disks accessed for this scan node.
   RuntimeProfile::Counter* num_disks_accessed_counter_ = nullptr;
-
-  /// Total file read time in I/O mgr disk thread.
   RuntimeProfile::Counter* hdfs_read_timer_ = nullptr;
-
-  /// Total time spent opening file handles in I/O mgr disk thread.
   RuntimeProfile::Counter* hdfs_open_file_timer_ = nullptr;
-
-  /// Track stats about ideal/actual reservation for initial scan ranges so we can
-  /// determine if the scan got all of the reservation it wanted. Does not include
-  /// subsequent reservation increases done by scanner implementation (e.g. for Parquet
-  /// columns).
   RuntimeProfile::SummaryStatsCounter* initial_range_ideal_reservation_stats_ = nullptr;
   RuntimeProfile::SummaryStatsCounter* initial_range_actual_reservation_stats_ = nullptr;
-
-  /// Track stats about the number of bytes read per column. Each sample in the counter is
-  /// the size of a single column that is scanned by the scan node. The scan node tracks
-  /// the number of bytes read for each column it processes, and when the scan node is
-  /// closed, it updates these counters with the size of each column.
   RuntimeProfile::SummaryStatsCounter* compressed_bytes_read_per_column_counter_ =
       nullptr;
   RuntimeProfile::SummaryStatsCounter* uncompressed_bytes_read_per_column_counter_ =
       nullptr;
 
+  /// HDFS read thread concurrency bucket: bucket[i] refers to the number of sample
+  /// taken where there are i concurrent hdfs read thread running. Created in Open().
+  std::vector<RuntimeProfile::Counter*>* hdfs_read_thread_concurrency_bucket_ = nullptr;
+
   /// Pool for allocating some amounts of memory that is shared between scanners.
   /// e.g. partition key tuple and their string buffers
   boost::scoped_ptr<MemPool> scan_node_pool_;
diff --git a/be/src/exec/kudu-scan-node-base.cc b/be/src/exec/kudu-scan-node-base.cc
index 1bae536..875675a 100644
--- a/be/src/exec/kudu-scan-node-base.cc
+++ b/be/src/exec/kudu-scan-node-base.cc
@@ -42,6 +42,8 @@
 using kudu::client::KuduClient;
 using kudu::client::KuduTable;
 
+PROFILE_DECLARE_COUNTER(ScanRangesComplete);
+
 namespace impala {
 
 const string KuduScanNodeBase::KUDU_ROUND_TRIPS = "TotalKuduScanRoundTrips";
@@ -68,7 +70,7 @@ Status KuduScanNodeBase::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(ScanNode::Prepare(state));
 
   scan_ranges_complete_counter_ =
-      ADD_COUNTER(runtime_profile(), SCAN_RANGES_COMPLETE_COUNTER, TUnit::UNIT);
+      PROFILE_ScanRangesComplete.Instantiate(runtime_profile());
   kudu_round_trips_ = ADD_COUNTER(runtime_profile(), KUDU_ROUND_TRIPS, TUnit::UNIT);
   kudu_remote_tokens_ = ADD_COUNTER(runtime_profile(), KUDU_REMOTE_TOKENS, TUnit::UNIT);
   kudu_client_time_ = ADD_TIMER(runtime_profile(), KUDU_CLIENT_TIME);
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index cad80ac..2334e0b 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -53,30 +53,44 @@ using boost::algorithm::join;
 
 namespace impala {
 
-// Changing these names have compatibility concerns.
-const string ScanNode::BYTES_READ_COUNTER = "BytesRead";
-const string ScanNode::ROWS_READ_COUNTER = "RowsRead";
-const string ScanNode::COLLECTION_ITEMS_READ_COUNTER = "CollectionItemsRead";
-const string ScanNode::TOTAL_HDFS_READ_TIMER = "TotalRawHdfsReadTime(*)";
-const string ScanNode::TOTAL_HDFS_OPEN_FILE_TIMER = "TotalRawHdfsOpenFileTime(*)";
-const string ScanNode::TOTAL_HBASE_READ_TIMER = "TotalRawHBaseReadTime(*)";
-const string ScanNode::TOTAL_THROUGHPUT_COUNTER = "TotalReadThroughput";
-const string ScanNode::MATERIALIZE_TUPLE_TIMER = "MaterializeTupleTime(*)";
-const string ScanNode::PER_READ_THREAD_THROUGHPUT_COUNTER =
-    "PerReadThreadRawHdfsThroughput";
-const string ScanNode::NUM_DISKS_ACCESSED_COUNTER = "NumDisksAccessed";
-const string ScanNode::SCAN_RANGES_COMPLETE_COUNTER = "ScanRangesComplete";
+PROFILE_DEFINE_COUNTER(BytesRead, STABLE_HIGH, TUnit::BYTES, "Total bytes read from "
+    "disk by a scan node.");
+PROFILE_DEFINE_COUNTER(RowsRead, STABLE_HIGH, TUnit::UNIT, "Number of top-level "
+    "rows/tuples read from the storage layer, including those discarded by predicate "
+    "evaluation. Used for all types of scans.");
+PROFILE_DEFINE_RATE_COUNTER(TotalReadThroughput, STABLE_LOW, TUnit::BYTES_PER_SECOND,
+    "BytesRead divided by the total wall clock time that this scan "
+    "was executing (from Open() to Close()). This gives the aggregate data is scanned.");
+PROFILE_DEFINE_TIME_SERIES_COUNTER(BytesReadSeries, UNSTABLE, TUnit::BYTES,
+    "Time series of BytesRead that samples the BytesRead counter.");
+PROFILE_DEFINE_TIMER(MaterializeTupleTime, UNSTABLE, "Wall clock time spent "
+    "materializing tuples and evaluating predicates.");
+PROFILE_DEFINE_COUNTER(NumScannerThreadsStarted, DEBUG, TUnit::UNIT,
+    "NumScannerThreadsStarted - the number of scanner threads started for the duration "
+    "of the ScanNode. A single scanner thread will likely process multiple scan ranges."
+    "Meanwhile, more than one scanner thread can be spun up to process data from a "
+    "single scan range. This is *not* the same as peak scanner thread concurrency "
+    "because the number of scanner threads can fluctuate during execution of the scan.");
+PROFILE_DEFINE_COUNTER(RowBatchesEnqueued, STABLE_LOW, TUnit::UNIT, "Number of row "
+    "batches enqueued in the scan node's output queue.");
+PROFILE_DEFINE_COUNTER(RowBatchBytesEnqueued, STABLE_LOW, TUnit::BYTES,
+    "Number of bytes enqueued in the scan node's output queue.");
+PROFILE_DEFINE_TIMER(RowBatchQueueGetWaitTime, UNSTABLE, "Wall clock time that the "
+    "fragment execution thread spent blocked waiting for row batches to be added to the "
+    "scan node's output queue.");
+PROFILE_DEFINE_TIMER(RowBatchQueuePutWaitTime, UNSTABLE, "Aggregate wall clock time "
+    "across all scanner threads spent blocked waiting for space in the scan node's "
+    "output queue when it is full.");
+PROFILE_DEFINE_COUNTER(RowBatchQueuePeakMemoryUsage, DEBUG, TUnit::BYTES,
+    "Peak memory consumption of row batches enqueued in the scan node's output queue.");
+PROFILE_DEFINE_COUNTER(NumScannerThreadMemUnavailable, STABLE_LOW, TUnit::UNIT,
+    "Number of times scanner threads were not created because of memory not available.");
+PROFILE_DEFINE_SAMPLING_COUNTER(AverageScannerThreadConcurrency, STABLE_LOW,
+    "Average number of executing scanner threads.");
+PROFILE_DEFINE_HIGH_WATER_MARK_COUNTER(PeakScannerThreadConcurrency, STABLE_LOW,
+    TUnit::UNIT, "Peak number of executing scanner threads.");
+
 const string ScanNode::SCANNER_THREAD_COUNTERS_PREFIX = "ScannerThreads";
-const string ScanNode::SCANNER_THREAD_TOTAL_WALLCLOCK_TIME =
-    "ScannerThreadsTotalWallClockTime";
-const string ScanNode::AVERAGE_SCANNER_THREAD_CONCURRENCY =
-    "AverageScannerThreadConcurrency";
-const string ScanNode::PEAK_SCANNER_THREAD_CONCURRENCY =
-    "PeakScannerThreadConcurrency";
-const string ScanNode::AVERAGE_HDFS_READ_THREAD_CONCURRENCY =
-    "AverageHdfsReadThreadConcurrency";
-const string ScanNode::NUM_SCANNER_THREADS_STARTED =
-    "NumScannerThreadsStarted";
 
 bool ScanNode::IsDataCacheDisabled() const {
   return runtime_state()->query_options().disable_data_cache;
@@ -122,9 +136,8 @@ Status ScanNode::Prepare(RuntimeState* state) {
   runtime_state_ = state;
   RETURN_IF_ERROR(ExecNode::Prepare(state));
 
-  rows_read_counter_ =
-      ADD_COUNTER(runtime_profile(), ROWS_READ_COUNTER, TUnit::UNIT);
-  materialize_tuple_timer_ = ADD_TIMER(runtime_profile(), MATERIALIZE_TUPLE_TIMER);
+  rows_read_counter_ = PROFILE_RowsRead.Instantiate(runtime_profile());
+  materialize_tuple_timer_ = PROFILE_MaterializeTupleTime.Instantiate(runtime_profile());
 
   DCHECK_EQ(filter_exprs_.size(), filter_ctxs_.size());
   for (int i = 0; i < filter_exprs_.size(); ++i) {
@@ -135,13 +148,12 @@ Status ScanNode::Prepare(RuntimeState* state) {
 }
 
 void ScanNode::AddBytesReadCounters() {
-  bytes_read_counter_ =
-      ADD_COUNTER(runtime_profile(), BYTES_READ_COUNTER, TUnit::BYTES);
+  bytes_read_counter_ = PROFILE_BytesRead.Instantiate(runtime_profile());
   runtime_state()->AddBytesReadCounter(bytes_read_counter_);
-  bytes_read_timeseries_counter_ = ADD_TIME_SERIES_COUNTER(runtime_profile(),
-      BYTES_READ_COUNTER, bytes_read_counter_);
-  total_throughput_counter_ = runtime_profile()->AddRateCounter(
-      TOTAL_THROUGHPUT_COUNTER, bytes_read_counter_);
+  bytes_read_timeseries_counter_ = PROFILE_BytesReadSeries.Instantiate(runtime_profile(),
+      bytes_read_counter_);
+  total_throughput_counter_ = PROFILE_TotalReadThroughput.Instantiate(runtime_profile(),
+      bytes_read_counter_);
 }
 
 Status ScanNode::Open(RuntimeState* state) {
@@ -214,17 +226,15 @@ void ScanNode::ScannerThreadState::Prepare(
       new MemTracker(-1, "Queued Batches", parent->mem_tracker(), false));
 
   thread_counters_ = ADD_THREAD_COUNTERS(profile, SCANNER_THREAD_COUNTERS_PREFIX);
-  num_threads_started_ = ADD_COUNTER(profile, NUM_SCANNER_THREADS_STARTED, TUnit::UNIT);
-  row_batches_enqueued_ =
-      ADD_COUNTER(profile, "RowBatchesEnqueued", TUnit::UNIT);
-  row_batch_bytes_enqueued_ =
-      ADD_COUNTER(profile, "RowBatchBytesEnqueued", TUnit::BYTES);
-  row_batches_get_timer_ = ADD_TIMER(profile, "RowBatchQueueGetWaitTime");
-  row_batches_put_timer_ = ADD_TIMER(profile, "RowBatchQueuePutWaitTime");
+  num_threads_started_ = PROFILE_NumScannerThreadsStarted.Instantiate(profile);
+  row_batches_enqueued_ = PROFILE_RowBatchesEnqueued.Instantiate(profile);
+  row_batch_bytes_enqueued_ = PROFILE_RowBatchBytesEnqueued.Instantiate(profile);
+  row_batches_get_timer_ = PROFILE_RowBatchQueueGetWaitTime.Instantiate(profile);
+  row_batches_put_timer_ = PROFILE_RowBatchQueuePutWaitTime.Instantiate(profile);
   row_batches_peak_mem_consumption_ =
-      ADD_COUNTER(profile, "RowBatchQueuePeakMemoryUsage", TUnit::BYTES);
+      PROFILE_RowBatchQueuePeakMemoryUsage.Instantiate(profile);
   scanner_thread_mem_unavailable_counter_ =
-      ADD_COUNTER(profile, "NumScannerThreadMemUnavailable", TUnit::UNIT);
+      PROFILE_NumScannerThreadMemUnavailable.Instantiate(profile);
 
   parent->runtime_state()->query_state()->scanner_mem_limiter()->RegisterScan(
       parent, estimated_per_thread_mem);
@@ -267,12 +277,12 @@ void ScanNode::ScannerThreadState::Open(
       FLAGS_max_queued_row_batch_bytes, row_batches_get_timer_, row_batches_put_timer_));
 
   // Start measuring the scanner thread concurrency only once the node is opened.
-  average_concurrency_ = parent->runtime_profile()->AddSamplingCounter(
-      AVERAGE_SCANNER_THREAD_CONCURRENCY, [&num_active=num_active_] () {
+  average_concurrency_ = PROFILE_AverageScannerThreadConcurrency.Instantiate(
+      parent->runtime_profile(), [&num_active=num_active_] () {
         return num_active.Load();
       });
-  peak_concurrency_ = parent->runtime_profile()->AddHighWaterMarkCounter(
-      PEAK_SCANNER_THREAD_CONCURRENCY, TUnit::UNIT);
+  peak_concurrency_ =
+      PROFILE_PeakScannerThreadConcurrency.Instantiate(parent->runtime_profile());
 }
 
 void ScanNode::ScannerThreadState::AddThread(unique_ptr<Thread> thread) {
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index b2294bf..2154c65 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -126,24 +126,7 @@ class ScanNode : public ExecNode {
     return materialize_tuple_timer_;
   }
 
-  /// names of ScanNode common counters
-  static const std::string BYTES_READ_COUNTER;
-  static const std::string ROWS_READ_COUNTER;
-  static const std::string COLLECTION_ITEMS_READ_COUNTER;
-  static const std::string TOTAL_HDFS_READ_TIMER;
-  static const std::string TOTAL_HDFS_OPEN_FILE_TIMER;
-  static const std::string TOTAL_HBASE_READ_TIMER;
-  static const std::string TOTAL_THROUGHPUT_COUNTER;
-  static const std::string PER_READ_THREAD_THROUGHPUT_COUNTER;
-  static const std::string NUM_DISKS_ACCESSED_COUNTER;
-  static const std::string MATERIALIZE_TUPLE_TIMER;
-  static const std::string SCAN_RANGES_COMPLETE_COUNTER;
   static const std::string SCANNER_THREAD_COUNTERS_PREFIX;
-  static const std::string SCANNER_THREAD_TOTAL_WALLCLOCK_TIME;
-  static const std::string AVERAGE_SCANNER_THREAD_CONCURRENCY;
-  static const std::string PEAK_SCANNER_THREAD_CONCURRENCY;
-  static const std::string AVERAGE_HDFS_READ_THREAD_CONCURRENCY;
-  static const std::string NUM_SCANNER_THREADS_STARTED;
 
   const std::vector<ScalarExpr*>& filter_exprs() const { return filter_exprs_; }
 
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 9c7e5d1..79dde0a 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -50,15 +50,18 @@
 using kudu::MonoDelta;
 using kudu::rpc::RpcController;
 using kudu::rpc::RpcSidecar;
-using namespace impala;
 using namespace rapidjson;
 namespace accumulators = boost::accumulators;
 
-const char* Coordinator::BackendState::InstanceStats::LAST_REPORT_TIME_DESC =
-    "Last report received time";
-
 DECLARE_int32(backend_client_rpc_timeout_ms);
 DECLARE_int64(rpc_max_message_size);
+PROFILE_DECLARE_COUNTER(ScanRangesComplete);
+
+namespace impala {
+PROFILE_DECLARE_COUNTER(BytesRead);
+
+const char* Coordinator::BackendState::InstanceStats::LAST_REPORT_TIME_DESC =
+    "Last report received time";
 
 Coordinator::BackendState::BackendState(const QuerySchedule& schedule,
     const TQueryCtx& query_ctx, int state_idx,
@@ -584,10 +587,10 @@ void Coordinator::BackendState::InstanceStats::InitCounters() {
   vector<RuntimeProfile*> children;
   profile_->GetAllChildren(&children);
   for (RuntimeProfile* p : children) {
-    RuntimeProfile::Counter* c = p->GetCounter(ScanNode::SCAN_RANGES_COMPLETE_COUNTER);
+    RuntimeProfile::Counter* c = p->GetCounter(PROFILE_ScanRangesComplete.name());
     if (c != nullptr) scan_ranges_complete_counters_.push_back(c);
 
-    RuntimeProfile::Counter* bytes_read = p->GetCounter(ScanNode::BYTES_READ_COUNTER);
+    RuntimeProfile::Counter* bytes_read = p->GetCounter(PROFILE_BytesRead.name());
     if (bytes_read != nullptr) bytes_read_counters_.push_back(bytes_read);
 
     RuntimeProfile::Counter* bytes_sent =
@@ -783,3 +786,5 @@ void Coordinator::BackendState::InstanceStatsToJson(Value* value, Document* docu
   Value val(TNetworkAddressToString(impalad_address()).c_str(), document->GetAllocator());
   value->AddMember("host", val, document->GetAllocator());
 }
+
+}
\ No newline at end of file
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 09efe80..1051695 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -67,6 +67,42 @@ DECLARE_string(hostname);
 
 using namespace impala;
 
+PROFILE_DEFINE_COUNTER(NumBackends, STABLE_HIGH, TUnit::UNIT,
+    "Number of backends running this query.");
+PROFILE_DEFINE_COUNTER(TotalBytesRead, STABLE_HIGH, TUnit::BYTES,
+    "Total number of bytes read by a query.");
+PROFILE_DEFINE_COUNTER(TotalCpuTime, STABLE_HIGH, TUnit::TIME_NS,
+    "Total CPU time (user + system) consumed by a query.");
+PROFILE_DEFINE_COUNTER(FiltersReceived,STABLE_LOW, TUnit::UNIT,
+    "Total number of filter updates received (always 0 if filter mode is not "
+    "GLOBAL). Excludes repeated broadcast filter updates.");
+PROFILE_DEFINE_COUNTER(NumFragments, STABLE_HIGH, TUnit::UNIT,
+    "Number of fragments in the plan of a query.");
+PROFILE_DEFINE_COUNTER(NumFragmentInstances, STABLE_HIGH, TUnit::UNIT,
+     "Number of fragment instances executed by a query.");
+PROFILE_DEFINE_COUNTER(TotalBytesSent, STABLE_LOW, TUnit::BYTES,"The total number"
+    " of bytes sent (across the network) by this query in exchange nodes. Does not "
+    "include remote reads, data written to disk, or data sent to the client.");
+PROFILE_DEFINE_COUNTER(TotalScanBytesSent, STABLE_LOW, TUnit::BYTES,
+    "The total number of bytes sent (across the network) by fragment instances that "
+    "had a scan node in their plan.");
+PROFILE_DEFINE_COUNTER(TotalInnerBytesSent, STABLE_LOW, TUnit::BYTES, "The total "
+    "number of bytes sent (across the network) by fragment instances that did not have a"
+    " scan node in their plan i.e. that received their input data from other instances"
+    " through exchange node.");
+PROFILE_DEFINE_COUNTER(ExchangeScanRatio, STABLE_LOW, TUnit::DOUBLE_VALUE,
+    "The ratio between TotalScanByteSent and TotalBytesRead, i.e. the selectivity over "
+    "all fragment instances that had a scan node in their plan.");
+PROFILE_DEFINE_COUNTER(InnerNodeSelectivityRatio, STABLE_LOW, TUnit::DOUBLE_VALUE,
+    "The ratio between bytes sent by instances with a scan node in their plan and "
+    "instances without a scan node in their plan. This indicates how well the inner "
+    "nodes of the execution plan reduced the data volume.");
+PROFILE_DEFINE_COUNTER(NumCompletedBackends, STABLE_HIGH, TUnit::UNIT,"The number of "
+    "completed backends. Only valid after all backends have started executing. "
+    "Does not count the number of CANCELLED Backends.");
+PROFILE_DEFINE_TIMER(FinalizationTimer, STABLE_LOW,
+    "Total time spent in finalization (typically 0 except for INSERT into hdfs tables).");
+
 // Maximum number of fragment instances that can publish each broadcast filter.
 static const int MAX_BROADCAST_FILTER_PRODUCERS = 3;
 
@@ -101,8 +137,8 @@ Status Coordinator::Exec() {
 
   query_profile_ =
       RuntimeProfile::Create(obj_pool(), "Execution Profile " + PrintId(query_id()));
-  finalization_timer_ = ADD_TIMER(query_profile_, "FinalizationTimer");
-  filter_updates_received_ = ADD_COUNTER(query_profile_, "FiltersReceived", TUnit::UNIT);
+  finalization_timer_ = PROFILE_FinalizationTimer.Instantiate(query_profile_);
+  filter_updates_received_ = PROFILE_FiltersReceived.Instantiate(query_profile_);
 
   host_profiles_ = RuntimeProfile::Create(obj_pool(), "Per Node Profiles");
   query_profile_->AddChild(host_profiles_);
@@ -178,12 +214,10 @@ void Coordinator::InitFragmentStats() {
     query_profile_->AddChild(fragment_stats->avg_profile(), true);
     query_profile_->AddChild(fragment_stats->root_profile());
   }
-  RuntimeProfile::Counter* num_fragments =
-      ADD_COUNTER(query_profile_, "NumFragments", TUnit::UNIT);
-  num_fragments->Set(static_cast<int64_t>(fragments.size()));
-  RuntimeProfile::Counter* num_finstances =
-      ADD_COUNTER(query_profile_, "NumFragmentInstances", TUnit::UNIT);
-  num_finstances->Set(total_num_finstances);
+  COUNTER_SET(PROFILE_NumFragments.Instantiate(query_profile_),
+      static_cast<int64_t>(fragments.size()));
+  COUNTER_SET(PROFILE_NumFragmentInstances.Instantiate(query_profile_),
+      total_num_finstances);
 }
 
 void Coordinator::InitBackendStates() {
@@ -193,9 +227,7 @@ void Coordinator::InitBackendStates() {
   lock_guard<SpinLock> l(backend_states_init_lock_);
   backend_states_.resize(num_backends);
 
-  RuntimeProfile::Counter* num_backends_counter =
-      ADD_COUNTER(query_profile_, "NumBackends", TUnit::UNIT);
-  num_backends_counter->Set(num_backends);
+  COUNTER_SET(PROFILE_NumBackends.Instantiate(query_profile_), num_backends);
 
   // create BackendStates
   int backend_idx = 0;
@@ -207,8 +239,7 @@ void Coordinator::InitBackendStates() {
   }
   backend_resource_state_ =
       obj_pool()->Add(new BackendResourceState(backend_states_, schedule_));
-  num_completed_backends_ =
-      ADD_COUNTER(query_profile_, "NumCompletedBackends", TUnit::UNIT);
+  num_completed_backends_ = PROFILE_NumCompletedBackends.Instantiate(query_profile_);
 }
 
 void Coordinator::ExecSummary::Init(const QuerySchedule& schedule) {
@@ -887,21 +918,16 @@ void Coordinator::ComputeQuerySummary() {
                     << ") ";
   }
 
-  // The total number of bytes read by this query.
-  COUNTER_SET(ADD_COUNTER(query_profile_, "TotalBytesRead", TUnit::BYTES),
+  // The definitions of these counters are in the top of this file.
+  COUNTER_SET(PROFILE_TotalBytesRead.Instantiate(query_profile_),
       total_utilization.bytes_read);
-  // The total number of bytes sent by this query in exchange nodes. Does not include
-  // remote reads, data written to disk, or data sent to the client.
-  COUNTER_SET(ADD_COUNTER(query_profile_, "TotalBytesSent", TUnit::BYTES),
+  COUNTER_SET(PROFILE_TotalCpuTime.Instantiate(query_profile_),
+      total_utilization.cpu_user_ns + total_utilization.cpu_sys_ns);
+  COUNTER_SET(PROFILE_TotalBytesSent.Instantiate(query_profile_),
       total_utilization.scan_bytes_sent + total_utilization.exchange_bytes_sent);
-  // The total number of bytes sent by fragment instances that had a scan node in their
-  // plan.
-  COUNTER_SET(ADD_COUNTER(query_profile_, "TotalScanBytesSent", TUnit::BYTES),
+  COUNTER_SET(PROFILE_TotalScanBytesSent.Instantiate(query_profile_),
       total_utilization.scan_bytes_sent);
-  // The total number of bytes sent by fragment instances that did not have a scan node in
-  // their plan, i.e. that received their input data from other instances through exchange
-  // node.
-  COUNTER_SET(ADD_COUNTER(query_profile_, "TotalInnerBytesSent", TUnit::BYTES),
+  COUNTER_SET(PROFILE_TotalInnerBytesSent.Instantiate(query_profile_),
       total_utilization.exchange_bytes_sent);
 
   double xchg_scan_ratio = 0;
@@ -909,26 +935,16 @@ void Coordinator::ComputeQuerySummary() {
     xchg_scan_ratio =
         (double)total_utilization.scan_bytes_sent / total_utilization.bytes_read;
   }
-  // The ratio between TotalScanBytesSent and TotalBytesRead, i.e. the selectivity over
-  // all fragment instances that had a scan node in their plan.
-  COUNTER_SET(ADD_COUNTER(query_profile_, "ExchangeScanRatio", TUnit::DOUBLE_VALUE),
-      xchg_scan_ratio);
+  COUNTER_SET(PROFILE_ExchangeScanRatio.Instantiate(query_profile_), xchg_scan_ratio);
 
   double inner_node_ratio = 0;
   if (total_utilization.scan_bytes_sent > 0) {
     inner_node_ratio =
         (double)total_utilization.exchange_bytes_sent / total_utilization.scan_bytes_sent;
   }
-  // The ratio between bytes sent by instances with a scan node in their plan and
-  // instances without a scan node in their plan. This indicates how well the inner nodes
-  // of the execution plan reduced the data volume.
-  COUNTER_SET(
-      ADD_COUNTER(query_profile_, "InnerNodeSelectivityRatio", TUnit::DOUBLE_VALUE),
+  COUNTER_SET(PROFILE_InnerNodeSelectivityRatio.Instantiate(query_profile_),
       inner_node_ratio);
 
-  COUNTER_SET(ADD_COUNTER(query_profile_, "TotalCpuTime", TUnit::TIME_NS),
-      total_utilization.cpu_user_ns + total_utilization.cpu_sys_ns);
-
   // TODO(IMPALA-8126): Move to host profiles
   query_profile_->AddInfoString("Per Node Peak Memory Usage", mem_info.str());
   query_profile_->AddInfoString("Per Node Bytes Read", bytes_read_info.str());
diff --git a/be/src/util/debug-counters.h b/be/src/util/debug-counters.h
deleted file mode 100644
index 127d61a..0000000
--- a/be/src/util/debug-counters.h
+++ /dev/null
@@ -1,72 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_UTIL_DEBUG_COUNTERS_H
-#define IMPALA_UTIL_DEBUG_COUNTERS_H
-
-#include "util/runtime-profile.h"
-
-#define ENABLE_DEBUG_COUNTERS 1
-
-namespace impala {
-
-/// Runtime counters have a two-phase lifecycle - creation and update. This is not
-/// convenient for debugging where we would like to add and remove counters with a minimum
-/// of boilerplate. This header adds a global debug runtime profile, and macros to
-/// update-or-create counters in one line of code. Counters created this way are not
-/// intended to remain in the code; they are a tool for identifying hotspots without having
-/// to run a full profiler.
-/// The AddCounter call adds some more overhead to each macro, and therefore they
-/// should not be used where minimal impact on performance is needed.
-class DebugRuntimeProfile {
- public:
-  static RuntimeProfile& profile() {
-    static RuntimeProfile profile(new ObjectPool(), "DebugProfile");
-    return profile;
-  }
-};
-
-#if ENABLE_DEBUG_COUNTERS
-
-#define DEBUG_SCOPED_TIMER(counter_name) \
-  COUNTER_SCOPED_TIMER(DebugRuntimeProfile::profile().AddCounter(counter_name, \
-    TUnit::CPU_TICKS))
-
-#define DEBUG_COUNTER_ADD(counter_name, v) \
-  COUNTER_ADD(DebugRuntimeProfile::profile().AddCounter(counter_name, \
-    TUnit::UNIT), v)
-
-#define DEBUG_COUNTER_SET(counter_name, v) \
-  COUNTER_SET(DebugRuntimeProfile::profile().AddCounter(counter_name, \
-    TUnit::UNIT), v)
-
-#define PRETTY_PRINT_DEBUG_COUNTERS(ostream_ptr) \
-  DebugRuntimeProfile::profile().PrettyPrint(ostream_ptr)
-
-#else
-
-#define DEBUG_SCOPED_TIMER(counter_name)
-#define DEBUG_COUNTER_ADD(counter_name, v)
-#define DEBUG_COUNTER_SET(counter_name, v)
-#define PRETTY_PRINT_DEBUG_COUNTERS(ostream_ptr)
-
-#endif // ENABLE_DEBUG_COUNTERS
-
-}
-
-#endif // IMPALA_UTIL_DEBUG_COUNTERS_H
diff --git a/be/src/util/default-path-handlers.cc b/be/src/util/default-path-handlers.cc
index 21bc1c6..4d811fe 100644
--- a/be/src/util/default-path-handlers.cc
+++ b/be/src/util/default-path-handlers.cc
@@ -39,6 +39,7 @@
 #include "util/memusage-path-handlers.h"
 #include "util/pprof-path-handlers.h"
 #include "util/process-state-info.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 
@@ -127,6 +128,49 @@ void FlagsHandler(const Webserver::WebRequest& req, Document* document) {
   document->AddMember("flags", flag_arr, document->GetAllocator());
 }
 
+// Profile documentation handlers.
+// Will have list of all the profile counters and the definition of Significance added
+// in document.
+// Counters have the following properties:
+//   name, significance, description, unit
+void ProfileDocsHandler(const Webserver::WebRequest& req, Document* document) {
+  vector<const ProfileEntryPrototype*> prototypes;
+  ProfileEntryPrototypeRegistry::get()->GetPrototypes(&prototypes);
+
+  Value profile_docs(kArrayType);
+  for (auto p : prototypes) {
+    Value p_val(kObjectType);
+    Value name(p->name(), document->GetAllocator());
+    p_val.AddMember("name", name, document->GetAllocator());
+    Value significance(p->significance(), document->GetAllocator());
+    p_val.AddMember("significance", significance, document->GetAllocator());
+    Value description(p->desc(), document->GetAllocator());
+    p_val.AddMember("description", description, document->GetAllocator());
+    auto unit_it = _TUnit_VALUES_TO_NAMES.find(p->unit());
+    const char* unit_str = unit_it != _TUnit_VALUES_TO_NAMES.end() ? unit_it->second : "";
+    Value unit(unit_str, document->GetAllocator());
+    p_val.AddMember("unit", unit, document->GetAllocator());
+    profile_docs.PushBack(p_val, document->GetAllocator());
+  }
+
+  Value significance_def(kArrayType);
+  for (auto significance : ProfileEntryPrototype::ALLSIGNIFICANCE) {
+    Value significance_obj(kObjectType);
+    Value significance_val(ProfileEntryPrototype::SignificanceString(significance),
+        document->GetAllocator());
+    Value significance_description(
+        ProfileEntryPrototype::SignificanceDescription(significance),
+        document->GetAllocator());
+    significance_obj.AddMember("name", significance_val,document->GetAllocator());
+    significance_obj.AddMember("description",
+        significance_description,document->GetAllocator());
+    significance_def.PushBack(significance_obj, document->GetAllocator());
+  }
+
+  document->AddMember("significance_docs", significance_def, document->GetAllocator());
+  document->AddMember("profile_docs", profile_docs, document->GetAllocator());
+}
+
 void JmxHandler(const Webserver::WebRequest& req, Document* document) {
   document->AddMember(rapidjson::StringRef(Webserver::ENABLE_PLAIN_JSON_KEY), true,
       document->GetAllocator());
@@ -234,6 +278,8 @@ void AddDefaultUrlCallbacks(Webserver* webserver, MetricGroup* metric_group,
     MemTracker* process_mem_tracker) {
   webserver->RegisterUrlCallback("/logs", "logs.tmpl", LogsHandler, true);
   webserver->RegisterUrlCallback("/varz", "flags.tmpl", FlagsHandler, true);
+  webserver->RegisterUrlCallback(
+      "/profile_docs", "profile_docs.tmpl", ProfileDocsHandler, true);
   if (JniUtil::is_jvm_inited()) {
     // JmxHandler outputs a plain JSON string and does not require a template to
     // render. However RawUrlCallback only supports PLAIN content type.
diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h
index 2777032..c006b64 100644
--- a/be/src/util/runtime-profile-counters.h
+++ b/be/src/util/runtime-profile-counters.h
@@ -26,6 +26,7 @@
 
 #include "common/atomic.h"
 #include "common/logging.h"
+#include "gutil/singleton.h"
 #include "util/arithmetic-util.h"
 #include "util/runtime-profile.h"
 #include "util/stopwatch.h"
@@ -93,6 +94,232 @@ namespace impala {
   #define SCOPED_CONCURRENT_COUNTER(c)
 #endif
 
+
+#define PROFILE_DEFINE_COUNTER(name, significance, unit, desc) \
+  ::impala::CounterPrototype PROFILE_##name( \
+      #name, ::impala::ProfileEntryPrototype::Significance::significance, desc, unit)
+
+#define PROFILE_DEFINE_RATE_COUNTER(name, significance, unit, desc) \
+  ::impala::RateCounterPrototype PROFILE_##name( \
+      #name, ::impala::ProfileEntryPrototype::Significance::significance, desc, unit)
+
+#define PROFILE_DEFINE_SUMMARY_STATS_COUNTER(name, significance, unit, desc) \
+  ::impala::SummaryStatsCounterPrototype PROFILE_##name( \
+      #name, ::impala::ProfileEntryPrototype::Significance::significance, desc, unit)
+
+#define PROFILE_DEFINE_DERIVED_COUNTER(name, significance, unit, desc) \
+  ::impala::DerivedCounterPrototype PROFILE_##name( \
+      #name, ::impala::ProfileEntryPrototype::Significance::significance, desc, unit)
+
+#define PROFILE_DEFINE_SAMPLING_COUNTER(name, significance, desc) \
+  ::impala::SamplingCounterPrototype PROFILE_##name( \
+      #name, ::impala::ProfileEntryPrototype::Significance::significance, desc)
+
+#define PROFILE_DEFINE_HIGH_WATER_MARK_COUNTER(name, significance, unit, \
+  desc)::impala::HighWaterMarkCounterPrototype PROFILE_##name( \
+      #name, ::impala::ProfileEntryPrototype::Significance::significance, desc, unit)
+
+#define PROFILE_DEFINE_TIME_SERIES_COUNTER(name, significance, unit, desc) \
+  ::impala::TimeSeriesCounterPrototype PROFILE_##name( \
+      #name, ::impala::ProfileEntryPrototype::Significance::significance, desc, unit)
+
+#define PROFILE_DEFINE_TIMER(name, significance, desc) \
+  ::impala::CounterPrototype PROFILE_##name(#name, \
+      ::impala::ProfileEntryPrototype::Significance::significance, desc, TUnit::TIME_NS)
+
+#define PROFILE_DEFINE_SUMMARY_STATS_TIMER(name, significance, desc) \
+  ::impala::SummaryStatsCounterPrototype PROFILE_##name(#name, \
+  ::impala::ProfileEntryPrototype::Significance::significance, desc, TUnit::TIME_NS)
+
+#define PROFILE_DECLARE_COUNTER(name) extern ::impala::CounterPrototype PROFILE_##name
+
+/// Prototype of a profile entry. All prototypes must be defined at compile time and must
+/// have a unique name. Subclasses then must provide a way to create new profile entries
+/// from a prototype, for example by providing an Instantiate() method.
+class ProfileEntryPrototype {
+ public:
+  enum class Significance {
+    // High level and stable counters - always useful for measuring query performance and
+    // status. Counters that everyone is interested. should rarely change and if it does
+    // we will make some effort to notify users.
+    STABLE_HIGH,
+    // Low level and stable counters - interesting counters to monitor and analyze by
+    // machine. It will probably be interesting under some circumstances for users.
+    // Lots of developers are interested.
+    STABLE_LOW,
+    // Unstable but useful - useful to understand query performance, but subject to
+    // change, particularly if the implementation changes. E.g. RowBatchQueuePutWaitTime,
+    // MaterializeTupleTimer
+    UNSTABLE,
+    // Debugging counters - generally not useful to users of Impala, the main use case is
+    // low-level debugging. Can be hidden to reduce noise for most consumers of profiles.
+    DEBUG
+  };
+
+  // Creates a new prototype and registers it with the singleton
+  // ProfileEntryPrototypeRegistry instance.
+  ProfileEntryPrototype(
+      const char* name, Significance significance, const char* desc, TUnit::type unit);
+
+  const char* name() const { return name_; }
+  const char* desc() const { return desc_; }
+  const char* significance() const { return SignificanceString(significance_); }
+  TUnit::type unit() const { return unit_; }
+
+  // Returns a string representation of 'significance'.
+  static const char* SignificanceString(Significance significance);
+
+  constexpr const static Significance ALLSIGNIFICANCE[4] = {Significance::STABLE_HIGH,
+      Significance::STABLE_LOW, Significance::UNSTABLE, Significance::DEBUG};
+
+  // Return the description of this significance. Should be similar with the
+  // comments above but it is a more user interface descriptions
+  static const char* SignificanceDescription(Significance significance);
+
+ protected:
+  // Name of this prototype, needs to have process lifetime.
+  const char* name_;
+
+  // Significance of this prototype. See enum Significance above for descriptions of
+  // possible values.
+  Significance significance_;
+
+  // Description of this prototype, needs to have process lifetime. This is used to
+  // generate documentation automatically.
+  const char* desc_;
+  TUnit::type unit_;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(ProfileEntryPrototype);
+};
+
+/// Registry to collect all profile entry prototypes. During process startup, all
+/// prototypes will register with the singleton instance of this class. Then, this class
+/// can be used to retrieve all registered prototypes, for example to display
+/// documentation to users. This class is thread-safe.
+class ProfileEntryPrototypeRegistry {
+ public:
+  // Returns the singleton instance.
+  static ProfileEntryPrototypeRegistry* get() {
+    return Singleton<ProfileEntryPrototypeRegistry>::get();
+  }
+
+  // Adds a prototype to the registry. Prototypes must have unique names.
+  void AddPrototype(const ProfileEntryPrototype* prototype);
+
+  // Copies all prototype pointers to 'out'.
+  void GetPrototypes(std::vector<const ProfileEntryPrototype*>* out);
+
+ private:
+  mutable SpinLock lock_;
+  std::map<const char*, const ProfileEntryPrototype*> prototypes_;
+};
+
+/// TODO: As an alternative to this approach we could just have a single class
+/// ProfileEntryPrototype and pass its objects into the profile->Add.*Counter() methods.
+class CounterPrototype : public ProfileEntryPrototype {
+ public:
+  CounterPrototype(const char* name, Significance significance, const char* desc,
+      TUnit::type unit): ProfileEntryPrototype(name, significance, desc, unit) {}
+
+  RuntimeProfile::Counter* Instantiate(RuntimeProfile* profile,
+      const std::string& parent_counter_name = "") {
+    return profile->AddCounter(name(), unit(), parent_counter_name);
+  }
+
+};
+
+class DerivedCounterPrototype : public ProfileEntryPrototype {
+ public:
+  DerivedCounterPrototype(const char* name, Significance significance, const char* desc,
+      TUnit::type unit): ProfileEntryPrototype(name, significance, desc, unit) {}
+
+  RuntimeProfile::DerivedCounter* Instantiate(RuntimeProfile* profile,
+      const RuntimeProfile::SampleFunction& counter_fn,
+      const std::string& parent_counter_name = "") {
+    return profile->AddDerivedCounter(name(), unit(), counter_fn, parent_counter_name);
+  }
+};
+
+class SamplingCounterPrototype : public ProfileEntryPrototype {
+ public:
+  SamplingCounterPrototype(const char* name, Significance significance, const char* desc):
+      ProfileEntryPrototype(name, significance, desc, TUnit::DOUBLE_VALUE) {}
+
+  RuntimeProfile::Counter* Instantiate(RuntimeProfile* profile,
+      RuntimeProfile::Counter* src_counter) {
+    return profile->AddSamplingCounter(name(), src_counter);
+  }
+
+  RuntimeProfile::Counter* Instantiate(RuntimeProfile* profile,
+      boost::function<int64_t ()> sample_fn) {
+    return profile->AddSamplingCounter(name(), sample_fn);
+  }
+};
+
+class HighWaterMarkCounterPrototype : public ProfileEntryPrototype {
+ public:
+  HighWaterMarkCounterPrototype(
+      const char* name, Significance significance, const char* desc,
+      TUnit::type unit): ProfileEntryPrototype(name, significance, desc, unit) {}
+
+  RuntimeProfile::HighWaterMarkCounter* Instantiate(RuntimeProfile* profile,
+        const std::string& parent_counter_name = "") {
+    return profile->AddHighWaterMarkCounter(name(), unit(), parent_counter_name);
+  }
+};
+
+class TimeSeriesCounterPrototype : public ProfileEntryPrototype {
+ public:
+  TimeSeriesCounterPrototype(const char* name, Significance significance,
+      const char* desc, TUnit::type unit):
+      ProfileEntryPrototype(name, significance, desc, unit) {}
+
+  RuntimeProfile::TimeSeriesCounter* operator()(RuntimeProfile* profile,
+      RuntimeProfile::Counter* src_counter) {
+    DCHECK(src_counter->unit() == unit());
+    return profile->AddSamplingTimeSeriesCounter(name(), src_counter);
+  }
+
+  RuntimeProfile::TimeSeriesCounter* Instantiate(RuntimeProfile* profile,
+      RuntimeProfile::Counter* src_counter) {
+    return (*this)(profile, src_counter);
+  }
+};
+
+class RateCounterPrototype : public ProfileEntryPrototype {
+ public:
+  RateCounterPrototype(const char* name, Significance significance,
+      const char* desc, TUnit::type unit):
+      ProfileEntryPrototype(name, significance, desc, unit) {}
+
+  RuntimeProfile::Counter* operator()(
+      RuntimeProfile* profile, RuntimeProfile::Counter* src_counter) {
+    RuntimeProfile::Counter* new_counter = profile->AddRateCounter(name(), src_counter);
+    DCHECK_EQ(unit(), new_counter->unit());
+    return new_counter;
+  }
+  RuntimeProfile::Counter* Instantiate(
+      RuntimeProfile* profile, RuntimeProfile::Counter* src_counter) {
+    RuntimeProfile::Counter* new_counter = profile->AddRateCounter(name(), src_counter);
+    DCHECK_EQ(unit(), new_counter->unit());
+    return new_counter;
+  }
+};
+
+class SummaryStatsCounterPrototype : public ProfileEntryPrototype {
+ public:
+  SummaryStatsCounterPrototype(const char* name, Significance significance,
+      const char* desc, TUnit::type unit):
+      ProfileEntryPrototype(name, significance, desc, unit) {}
+
+  RuntimeProfile::SummaryStatsCounter* Instantiate(RuntimeProfile* profile,
+      const std::string& parent_counter_name = "") {
+    return profile->AddSummaryStatsCounter(name(), unit(), parent_counter_name);
+  }
+};
+
+
 /// A counter that keeps track of the highest value seen (reporting that
 /// as value()) and the current value.
 class RuntimeProfile::HighWaterMarkCounter : public RuntimeProfile::Counter {
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index fd68cfe..ce034ec 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -64,6 +64,71 @@ const string RuntimeProfile::TOTAL_TIME_COUNTER_NAME = "TotalTime";
 const string RuntimeProfile::LOCAL_TIME_COUNTER_NAME = "LocalTime";
 const string RuntimeProfile::INACTIVE_TIME_COUNTER_NAME = "InactiveTotalTime";
 
+constexpr ProfileEntryPrototype::Significance ProfileEntryPrototype::ALLSIGNIFICANCE[];
+
+void ProfileEntryPrototypeRegistry::AddPrototype(const ProfileEntryPrototype* prototype) {
+  boost::lock_guard<SpinLock> l(lock_);
+  DCHECK(prototypes_.find(prototype->name()) == prototypes_.end()) <<
+      "Found duplicate prototype name: " << prototype->name();
+  prototypes_.emplace(prototype->name(), prototype);
+}
+
+void ProfileEntryPrototypeRegistry::GetPrototypes(
+    vector<const ProfileEntryPrototype*>* out) {
+  lock_guard<SpinLock> l(lock_);
+  out->reserve(prototypes_.size());
+  for (auto p : prototypes_) out->push_back(p.second);
+}
+
+ProfileEntryPrototype::ProfileEntryPrototype(const char* name, Significance significance,
+    const char* desc, TUnit::type unit) :
+    name_(name), significance_(significance), desc_(desc),
+    unit_(unit) {
+  ProfileEntryPrototypeRegistry::get()->AddPrototype(this);
+}
+
+const char* ProfileEntryPrototype::SignificanceString(
+    ProfileEntryPrototype::Significance significance) {
+  switch (significance) {
+    case Significance::STABLE_HIGH:
+      return "STABLE & HIGH";
+    case Significance::STABLE_LOW:
+      return "STABLE & LOW";
+    case Significance::UNSTABLE:
+      return "UNSTABLE";
+    case Significance::DEBUG:
+      return "DEBUG";
+    default:
+      DCHECK(false);
+      return "";
+  }
+}
+
+const char* ProfileEntryPrototype::SignificanceDescription(
+    ProfileEntryPrototype::Significance significance) {
+  switch (significance) {
+    case Significance::STABLE_HIGH:
+      return "High level and stable counters - always useful for measuring query "
+             "performance and status. Counters that everyone is interested. should "
+             "rarely change and if it does we will make some effort to notify users.";
+    case Significance::STABLE_LOW:
+      return "Low level and stable counters - interesting counters to monitor and "
+             "analyze by machine. It will probably be interesting under some "
+             "circumstances for users.";
+    case Significance::UNSTABLE:
+      return "Unstable but useful - useful to understand query performance, but subject"
+             " to change, particularly if the implementation changes. E.g. "
+             "RowBatchQueuePutWaitTime, MaterializeTupleTimer";
+    case Significance::DEBUG:
+      return "Debugging counters - generally not useful to users of Impala, the main"
+             " use case is low-level debugging. Can be hidden to reduce noise for "
+             "most consumers of profiles.";
+    default:
+      DCHECK(false);
+      return "";
+  }
+}
+
 RuntimeProfile* RuntimeProfile::Create(ObjectPool* pool, const string& name,
     bool is_averaged_profile) {
   return pool->Add(new RuntimeProfile(pool, name, is_averaged_profile));
diff --git a/www/profile_docs.tmpl b/www/profile_docs.tmpl
new file mode 100644
index 0000000..eddd6b8
--- /dev/null
+++ b/www/profile_docs.tmpl
@@ -0,0 +1,91 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+{{> www/common-header.tmpl }}
+
+<h2>Profile Documentation</h2>
+
+<h3>Counters</h3>
+<div class="card">
+  <div class="card-body">
+    <table id="counters-table" class='table table-hover table-bordered'>
+      <thead>
+        <tr>
+          <th>Name</th>
+          <th>Significance</th>
+          <th>Unit</th>
+          <th>Description</th>
+        </tr>
+      </thead>
+      <tbody>
+        {{#profile_docs}}
+        <tr>
+          <td>
+          {{name}}
+          </td>
+          <td>
+          {{significance}}
+          </td>
+          <td>
+          {{unit}}
+          </td>
+          <td>
+          {{description}}
+          </td>
+        </tr>
+        {{/profile_docs}}
+      </tbody>
+    </table>
+  </div>
+</div>
+
+<h3>Appendix</h3>
+<div class="card">
+  <div class="card-body">
+    <table id="significance-table" class='table table-hover table-bordered'>
+      <thead>
+        <tr>
+          <th>Significance</th>
+          <th>Description</th>
+        </tr>
+      </thead>
+      <tbody>
+        {{#significance_docs}}
+        <tr>
+          <td>
+          {{name}}
+          </td>
+          <td>
+          {{description}}
+          </td>
+        </tr>
+        {{/significance_docs}}
+      </tbody>
+    </table>
+  </div>
+</div>
+
+<script>
+    $(document).ready(function() {
+        $('#counters-table').DataTable({
+            "pageLength": 20
+        });
+    });
+</script>
+
+{{> www/common-footer.tmpl }}
diff --git a/www/query_profile.tmpl b/www/query_profile.tmpl
index f4df17a..55c79fb 100644
--- a/www/query_profile.tmpl
+++ b/www/query_profile.tmpl
@@ -24,6 +24,7 @@ under the License.
 
 {{> www/query_detail_tabs.tmpl }}
 
+<br/>
 <div>
     <h4>Download Profile (Available Formats):
         <a style="font-size:16px;" class="btn btn-primary"


[impala] 01/04: IMPALA-8706: ISO:SQL:2016 datetime patterns - Milestone 4

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 590da59a3ce3337395487de8d91dceba1786e11b
Author: Attila Jeges <at...@cloudera.com>
AuthorDate: Thu Oct 17 17:36:43 2019 +0200

    IMPALA-8706: ISO:SQL:2016 datetime patterns - Milestone 4
    
    This patch adds ISO 8601 week-based date format tokens on top
    of what was introduced in IMPALA-8703, IMPALA-8704 and
    IMPALA-8705.
    
    The ISO 8601 week-based date tokens may be used for both datetime
    to string and string to datetime conversion.
    
    The ISO 8601 week-based date tokens are as follows:
      - IYYY: 4-digit ISO 8601 week-numbering year.
              Week-numbering year is the year relating to the ISO
              8601 week number (IW), which is the full week (Monday
              to Sunday) which contains January 4 of the Gregorian
              year.
              Behaves similarly to YYYY in that for datetime to
              string conversion, prefix digits for 1, 2, and 3-digit
              inputs are obtained from current ISO 8601
              week-numbering year.
    
      - IYY:  Last 3 digits of ISO 8601 week-numbering year.
              Behaves similarly to YYY in that for datetime to string
              conversion, prefix digit is obtained from current ISO
              8601 week-numbering year and can accept 1 or 2-digit
              input.
    
      - IY:   Last 2 digits of ISO 8601 week-numbering year.
              Behaves similarly to YY in that for datetime to string
              conversion, prefix digits are obtained from current ISO
              8601 week-numbering year and can accept 1-digit input.
    
      - I:    Last digit of ISO 8601 week-numbering year.
              Behaves similarly to Y in that for datetime to string
              conversion, prefix digits are obtained from current ISO
              8601 week-numbering year.
    
      - IW:   ISO 8601 week of year (1-53).
              Begins on the Monday closest to January 1 of the year.
              For string to datetime conversion, if the input ISO
              8601 week does not exist in the input year, an error
              will be thrown.
    
              Note that IW is different from the other week-related
              tokens WW and W (implemented in IMPALA-8705). With WW
              and W weeks start with the first day of the
              year/month. ISO 8601 weeks on the other hand always
              start with Monday.
    
      - ID:   ISO 8601 day of week (1-7). 1 means Monday and 7 means
              Sunday.
    
    When doing string to datetime conversion, the ISO 8601 week-based
    tokens are meant to be used together and not mixed with other ISO
    SQL date tokens. E.g. 'YYYY-IW-ID' is an invalid format string.
    
    The only exceptions are the day name tokens (DAY and DY) which
    may be used instead of ID with the rest of the ISO 8601
    week-based date tokens. E.g. 'IYYY-IW-DAY' is a valid format
    string.
    
    Change-Id: I89a8c1b98742391cb7b331840d216558dbca362b
    Reviewed-on: http://gerrit.cloudera.org:8080/14852
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Gabor Kaszab <ga...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exprs/date-functions-ir.cc                  |   2 +-
 be/src/runtime/date-parse-util.cc                  |  21 +-
 be/src/runtime/date-test.cc                        | 150 +++++++--
 be/src/runtime/date-value.cc                       | 134 ++++++---
 be/src/runtime/date-value.h                        |  27 +-
 be/src/runtime/datetime-iso-sql-format-parser.cc   | 101 +++++--
 be/src/runtime/datetime-iso-sql-format-parser.h    |  35 ++-
 .../runtime/datetime-iso-sql-format-tokenizer.cc   |  49 ++-
 be/src/runtime/datetime-parser-common.cc           | 113 +++++--
 be/src/runtime/datetime-parser-common.h            |  41 ++-
 be/src/runtime/timestamp-parse-util.cc             |  38 ++-
 be/src/runtime/timestamp-parse-util.h              |   8 +
 tests/query_test/test_cast_with_format.py          | 335 ++++++++++++++++++++-
 13 files changed, 907 insertions(+), 147 deletions(-)

diff --git a/be/src/exprs/date-functions-ir.cc b/be/src/exprs/date-functions-ir.cc
index 8e16fb5..a8638ca 100644
--- a/be/src/exprs/date-functions-ir.cc
+++ b/be/src/exprs/date-functions-ir.cc
@@ -95,7 +95,7 @@ IntVal DateFunctions::WeekOfYear(FunctionContext* context, const DateVal& d_val)
   if (d_val.is_null) return IntVal::null();
   DateValue dv = DateValue::FromDateVal(d_val);
 
-  int yweek = dv.WeekOfYear();
+  int yweek = dv.Iso8601WeekOfYear();
   if (yweek == -1) return IntVal::null();
   return IntVal(yweek);
 }
diff --git a/be/src/runtime/date-parse-util.cc b/be/src/runtime/date-parse-util.cc
index 579a895..1f820f5 100644
--- a/be/src/runtime/date-parse-util.cc
+++ b/be/src/runtime/date-parse-util.cc
@@ -137,6 +137,7 @@ string DateParser::Format(const DateTimeFormatContext& dt_ctx, const DateValue&
   int year, month, day;
   if (!date.ToYearMonthDay(&year, &month, &day)) return "";
 
+  DCHECK(date.IsValid());
   string result;
   result.reserve(dt_ctx.fmt_out_len);
   for (const DateTimeFormatToken& tok: dt_ctx.toks) {
@@ -144,11 +145,7 @@ string DateParser::Format(const DateTimeFormatContext& dt_ctx, const DateValue&
     switch (tok.type) {
       case YEAR:
       case ROUND_YEAR: {
-        num_val = year;
-        if (tok.len < 4) {
-          int adjust_factor = std::pow(10, tok.len);
-          num_val %= adjust_factor;
-        }
+        num_val = AdjustYearToLength(year, tok.len);
         break;
       }
       case QUARTER_OF_YEAR: {
@@ -191,6 +188,20 @@ string DateParser::Format(const DateTimeFormatContext& dt_ctx, const DateValue&
         result.append(FormatTextToken(tok));
         break;
       }
+      case ISO8601_WEEK_NUMBERING_YEAR: {
+        num_val = AdjustYearToLength(date.Iso8601WeekNumberingYear(), tok.len);
+        break;
+      }
+      case ISO8601_WEEK_OF_YEAR: {
+        num_val = date.Iso8601WeekOfYear();
+        break;
+      }
+      case ISO8601_DAY_OF_WEEK: {
+        // WeekDay() returns 0 for Monday and 6 for Sunday.
+        // We need to output 1 for Monday and 7 for Sunday.
+        num_val = date.WeekDay() + 1;
+        break;
+      }
       default: DCHECK(false) << "Unknown date format token";
     }
     if (num_val > -1) {
diff --git a/be/src/runtime/date-test.cc b/be/src/runtime/date-test.cc
index 404c831..3072dc5 100644
--- a/be/src/runtime/date-test.cc
+++ b/be/src/runtime/date-test.cc
@@ -824,10 +824,10 @@ TEST(DateTest, DayOfYear) {
   EXPECT_EQ(366, DateValue(2000, 12, 31).DayOfYear());
 }
 
-TEST(DateTest, WeekOfYear) {
+TEST(DateTest, Iso8601WeekOfYear) {
   // Test that it returns -1 for invalid dates.
   DateValue invalid_dv;
-  EXPECT_EQ(-1, invalid_dv.WeekOfYear());
+  EXPECT_EQ(-1, invalid_dv.Iso8601WeekOfYear());
 
   // Iterate through days of 2019.
   // 2019-01-01 is Tuesday and 2019-12-31 is Tuesday too.
@@ -836,36 +836,144 @@ TEST(DateTest, WeekOfYear) {
   for (DateValue dv = jan1;
       dv <= DateValue(2019, 12, 29);
       dv = dv.AddDays(1)) {
-    EXPECT_EQ(weekday_offset / 7 + 1, dv.WeekOfYear());
+    EXPECT_EQ(weekday_offset / 7 + 1, dv.Iso8601WeekOfYear());
     ++weekday_offset;
   }
 
   // Year 2015 has 53 weeks. 2015-12-31 is Thursday.
-  EXPECT_EQ(53, DateValue(2015, 12, 31).WeekOfYear());
+  EXPECT_EQ(53, DateValue(2015, 12, 31).Iso8601WeekOfYear());
 
   // 2019-12-30 (Monday) and 2019-12-31 (Tuesday) belong to year 2020.
-  EXPECT_EQ(1, DateValue(2019, 12, 30).WeekOfYear());
-  EXPECT_EQ(1, DateValue(2019, 12, 31).WeekOfYear());
-  EXPECT_EQ(1, DateValue(2020, 1, 1).WeekOfYear());
-  EXPECT_EQ(1, DateValue(2020, 1, 5).WeekOfYear());
-  EXPECT_EQ(2, DateValue(2020, 1, 6).WeekOfYear());
+  EXPECT_EQ(1, DateValue(2019, 12, 30).Iso8601WeekOfYear());
+  EXPECT_EQ(1, DateValue(2019, 12, 31).Iso8601WeekOfYear());
+  EXPECT_EQ(1, DateValue(2020, 1, 1).Iso8601WeekOfYear());
+  EXPECT_EQ(1, DateValue(2020, 1, 5).Iso8601WeekOfYear());
+  EXPECT_EQ(2, DateValue(2020, 1, 6).Iso8601WeekOfYear());
 
   // 0002-01-01 is Tuesday. Test days around 0002-01-01.
-  EXPECT_EQ(51, DateValue(1, 12, 23).WeekOfYear());
-  EXPECT_EQ(52, DateValue(1, 12, 30).WeekOfYear());
-  EXPECT_EQ(1, DateValue(1, 12, 31).WeekOfYear());
-  EXPECT_EQ(1, DateValue(2, 1, 1).WeekOfYear());
-  EXPECT_EQ(1, DateValue(2, 1, 6).WeekOfYear());
-  EXPECT_EQ(2, DateValue(2, 1, 7).WeekOfYear());
+  EXPECT_EQ(51, DateValue(1, 12, 23).Iso8601WeekOfYear());
+  EXPECT_EQ(52, DateValue(1, 12, 30).Iso8601WeekOfYear());
+  EXPECT_EQ(1, DateValue(1, 12, 31).Iso8601WeekOfYear());
+  EXPECT_EQ(1, DateValue(2, 1, 1).Iso8601WeekOfYear());
+  EXPECT_EQ(1, DateValue(2, 1, 6).Iso8601WeekOfYear());
+  EXPECT_EQ(2, DateValue(2, 1, 7).Iso8601WeekOfYear());
   // 0001-01-01 is Monday. Test days around 0001-01-01.
-  EXPECT_EQ(1, DateValue(1, 1, 1).WeekOfYear());
-  EXPECT_EQ(1, DateValue(1, 1, 2).WeekOfYear());
-  EXPECT_EQ(2, DateValue(1, 1, 8).WeekOfYear());
+  EXPECT_EQ(1, DateValue(1, 1, 1).Iso8601WeekOfYear());
+  EXPECT_EQ(1, DateValue(1, 1, 2).Iso8601WeekOfYear());
+  EXPECT_EQ(2, DateValue(1, 1, 8).Iso8601WeekOfYear());
 
   // 9999-12-31 is Friday. Test days around 9999-12-31.
-  EXPECT_EQ(52, DateValue(9999, 12, 31).WeekOfYear());
-  EXPECT_EQ(52, DateValue(9999, 12, 27).WeekOfYear());
-  EXPECT_EQ(51, DateValue(9999, 12, 26).WeekOfYear());
+  EXPECT_EQ(52, DateValue(9999, 12, 31).Iso8601WeekOfYear());
+  EXPECT_EQ(52, DateValue(9999, 12, 27).Iso8601WeekOfYear());
+  EXPECT_EQ(51, DateValue(9999, 12, 26).Iso8601WeekOfYear());
+}
+
+TEST(DateTest, Iso8601WeekNumberingYear) {
+  // Test that it returns -1 for invalid dates.
+  DateValue invalid_dv;
+  EXPECT_EQ(-1, invalid_dv.Iso8601WeekNumberingYear());
+
+  // Iterate through days of 2019.
+  // 2019-01-01 is Tuesday and 2019-12-29 is Sunday.
+  DateValue jan1(2019, 1, 1);
+  for (DateValue dv = jan1;
+      dv <= DateValue(2019, 12, 29);
+      dv = dv.AddDays(1)) {
+    EXPECT_EQ(2019, dv.Iso8601WeekNumberingYear());
+  }
+  // 2019-12-30 (Monday) and 2019-12-31 (Tuesday) belong to year 2020.
+  EXPECT_EQ(2020, DateValue(2019, 12, 30).Iso8601WeekNumberingYear());
+  EXPECT_EQ(2020, DateValue(2019, 12, 31).Iso8601WeekNumberingYear());
+  EXPECT_EQ(2020, DateValue(2020, 1, 1).Iso8601WeekNumberingYear());
+
+  // 2015-01-01 is Thursday and 2015-12-31 is Thursday too.
+  // Both days belong to year 2015.
+  EXPECT_EQ(2015, DateValue(2015, 1, 1).Iso8601WeekNumberingYear());
+  EXPECT_EQ(2015, DateValue(2015, 12, 31).Iso8601WeekNumberingYear());
+
+  // 2040-01-01 is Sunday and 2040-12-31 is Monday.
+  // Neither days belong to year 2040.
+  EXPECT_EQ(2039, DateValue(2040, 1, 1).Iso8601WeekNumberingYear());
+  EXPECT_EQ(2041, DateValue(2040, 12, 31).Iso8601WeekNumberingYear());
+
+  // 0002-01-01 is Tuesday. Test days around 0002-01-01.
+  EXPECT_EQ(1, DateValue(1, 12, 29).Iso8601WeekNumberingYear());
+  EXPECT_EQ(1, DateValue(1, 12, 30).Iso8601WeekNumberingYear());
+  EXPECT_EQ(2, DateValue(1, 12, 31).Iso8601WeekNumberingYear());
+  EXPECT_EQ(2, DateValue(2, 1, 1).Iso8601WeekNumberingYear());
+  EXPECT_EQ(2, DateValue(2, 1, 2).Iso8601WeekNumberingYear());
+  // 0001-01-01 is Monday. Test days around 0001-01-01.
+  EXPECT_EQ(1, DateValue(1, 1, 1).Iso8601WeekNumberingYear());
+  EXPECT_EQ(1, DateValue(1, 1, 2).Iso8601WeekNumberingYear());
+
+  // 9999-12-31 is Friday. Test days around 9999-12-31.
+  EXPECT_EQ(9999, DateValue(9999, 12, 30).Iso8601WeekNumberingYear());
+  EXPECT_EQ(9999, DateValue(9999, 12, 31).Iso8601WeekNumberingYear());
+}
+
+TEST(DateTest, CreateFromIso8601WeekBasedDateVals) {
+  // Invalid week numbering year.
+  EXPECT_FALSE(DateValue::CreateFromIso8601WeekBasedDateVals(-1, 1, 1).IsValid());
+  EXPECT_FALSE(DateValue::CreateFromIso8601WeekBasedDateVals(0, 1, 1).IsValid());
+  EXPECT_FALSE(DateValue::CreateFromIso8601WeekBasedDateVals(10000, 1, 1).IsValid());
+
+  // Test invalid week of year.
+  // Year 2020 has 53 weeks.
+  EXPECT_FALSE(DateValue::CreateFromIso8601WeekBasedDateVals(2020, 54, 1).IsValid());
+  EXPECT_FALSE(DateValue::CreateFromIso8601WeekBasedDateVals(2020, 0, 1).IsValid());
+  // Year 2019 has 52 weeks.
+  EXPECT_FALSE(DateValue::CreateFromIso8601WeekBasedDateVals(2019, 53, 1).IsValid());
+
+  // Test invalid week day.
+  EXPECT_FALSE(DateValue::CreateFromIso8601WeekBasedDateVals(2020, 1, 0).IsValid());
+  EXPECT_FALSE(DateValue::CreateFromIso8601WeekBasedDateVals(2020, 1, 8).IsValid());
+
+  // 0001-01-01 is Monday. It belongs to the first week of year 1.
+  // Test days around 0001-01-01.
+  EXPECT_EQ(DateValue(1, 1, 1), DateValue::CreateFromIso8601WeekBasedDateVals(1, 1, 1));
+  EXPECT_EQ(DateValue(1, 1, 2), DateValue::CreateFromIso8601WeekBasedDateVals(1, 1, 2));
+  EXPECT_EQ(DateValue(1, 1, 7), DateValue::CreateFromIso8601WeekBasedDateVals(1, 1, 7));
+  EXPECT_EQ(DateValue(1, 1, 8), DateValue::CreateFromIso8601WeekBasedDateVals(1, 2, 1));
+  // 0001-12-30 is Sunday, belongs to week 52 of year 1.
+  EXPECT_EQ(DateValue(1, 12, 30),
+      DateValue::CreateFromIso8601WeekBasedDateVals(1, 52, 7));
+  // 0001-12-31 is Monday, belongs to year 2.
+  EXPECT_EQ(DateValue(1, 12, 31), DateValue::CreateFromIso8601WeekBasedDateVals(2, 1, 1));
+  EXPECT_EQ(DateValue(2, 1, 1), DateValue::CreateFromIso8601WeekBasedDateVals(2, 1, 2));
+
+  // Test 2020 ISO 8601 week numbering year.
+  // 2019-12-30 is Monday, belongs to week 1 of year 2020.
+  // 2021-01-03 is Sunday, belongs to week 53 of year 2020.
+  int week_of_year = 1, day_of_week = 1;
+  for (DateValue dv(2019, 12, 30);
+      dv <= DateValue(2021, 1, 3);
+      dv = dv.AddDays(1)) {
+    DateValue dv_iso8601 = DateValue::CreateFromIso8601WeekBasedDateVals(2020,
+        week_of_year, day_of_week);
+    EXPECT_TRUE(dv_iso8601.IsValid());
+    EXPECT_EQ(dv, dv_iso8601);
+
+    if (day_of_week == 7) ++week_of_year;
+    day_of_week = day_of_week % 7 + 1;
+  }
+  EXPECT_EQ(54, week_of_year);
+  EXPECT_EQ(1, day_of_week);
+
+  // 9998-12-31 is Thursday, belongs to week 53 of year 9998.
+  EXPECT_EQ(DateValue(9998, 12, 31),
+      DateValue::CreateFromIso8601WeekBasedDateVals(9998, 53, 4));
+  EXPECT_EQ(DateValue(9999, 1, 1),
+      DateValue::CreateFromIso8601WeekBasedDateVals(9998, 53, 5));
+  EXPECT_EQ(DateValue(9999, 1, 2),
+      DateValue::CreateFromIso8601WeekBasedDateVals(9998, 53, 6));
+  EXPECT_EQ(DateValue(9999, 1, 3),
+      DateValue::CreateFromIso8601WeekBasedDateVals(9998, 53, 7));
+  EXPECT_EQ(DateValue(9999, 1, 4),
+      DateValue::CreateFromIso8601WeekBasedDateVals(9999, 1, 1));
+  // 9999-12-31 is Friday, belongs to week 52 of year 9999.
+  EXPECT_EQ(DateValue(9999, 12, 31),
+      DateValue::CreateFromIso8601WeekBasedDateVals(9999, 52, 5));
+  EXPECT_FALSE(DateValue::CreateFromIso8601WeekBasedDateVals(9999, 52, 6).IsValid());
 }
 
 TEST(DateTest, LastDay) {
diff --git a/be/src/runtime/date-value.cc b/be/src/runtime/date-value.cc
index d3fd4a0..ed783b1 100644
--- a/be/src/runtime/date-value.cc
+++ b/be/src/runtime/date-value.cc
@@ -58,6 +58,71 @@ DateValue::DateValue(int64_t year, int64_t month, int64_t day)
   }
 }
 
+namespace {
+
+inline int32_t CalcFirstDayOfYearSinceEpoch(int year) {
+  int m400 = year % 400;
+  int m100 = m400 % 100;
+  int m4 = m100 % 4;
+
+  return (year - EPOCH_YEAR) * 365
+      + ((year - EPOCH_YEAR / 4 * 4 + ((m4 != 0) ? 4 - m4 : 0)) / 4 - 1)
+      - ((year - EPOCH_YEAR / 100 * 100 + ((m100 != 0) ? 100 - m100 : 0)) / 100 - 1)
+      + ((year - EPOCH_YEAR / 400 * 400 + ((m400 != 0) ? 400 - m400 : 0)) / 400 - 1);
+}
+
+// Returns the Monday that belongs to the first ISO 8601 week of 'year'. That is:
+// - If Jan 1 falls on Monday, Tuesday, Wednesday or Thursday, then the week of Jan 1 is
+//   the first ISO 8601 week of 'year'. Therefore the Monday of the first ISO 8601 week in
+//   'year' is the first Monday before Jan 2.
+// - If Jan 1 falls on Friday, Saturday or Sunday, then the week of Jan 1 is the last ISO
+//   8601 week of the previous year. Therefore the Monday of the first ISO 8601 week in
+//   'year' is the first Monday following Jan 1.
+inline cctz::civil_day GetFirstIso8601MondayOfYear(int year) {
+  cctz::civil_day jan1 = cctz::civil_day(year, 1, 1);
+  if (cctz::get_weekday(jan1) <= cctz::weekday::thursday) {
+    // Get the previous Monday if 'jan1' is not already a Monday.
+    return cctz::next_weekday(jan1, cctz::weekday::monday) - 7;
+  } else {
+    // Get the next Monday.
+    return cctz::next_weekday(jan1, cctz::weekday::monday);
+  }
+}
+
+// Returns Sunday that belongs to the last ISO 8601 week of 'year'. That is:
+// - If Dec 31 falls on Thursday, Friday, Saturday or Sunday, then the week of Dec 31
+//   is the last ISO 8601 week of 'year'. Therefore the Sunday of the last ISO 8601 week
+//   in 'year' is the first Sunday after Dec 30.
+// - If Dec 31 falls on Monday, Tuesday or Wednesday, then the week of Dec 31 is the first
+//   ISO 8601 week of the next year. Therefore the Sunday of the last ISO 8601 week in
+//   'year' is the first Sunday previous to Dec 31.
+inline cctz::civil_day GetLastIso8601SundayOfYear(int year) {
+  cctz::civil_day dec31 = cctz::civil_day(year, 12, 31);
+  if (cctz::get_weekday(dec31) >= cctz::weekday::thursday) {
+    // Get the next Sunday if 'dec31' is not already a Sunday.
+    return cctz::prev_weekday(dec31, cctz::weekday::sunday) + 7;
+  } else {
+    // Get the previous Sunday.
+    return cctz::prev_weekday(dec31, cctz::weekday::sunday);
+  }
+}
+
+}
+
+DateValue DateValue::CreateFromIso8601WeekBasedDateVals(int year,
+    int week_of_year, int day_of_week) {
+  if (year >= MIN_YEAR && year <= MAX_YEAR
+      && week_of_year >= 1 && week_of_year <= 53
+      && 1 <= day_of_week && day_of_week <= 7) {
+    const cctz::civil_day first_monday = GetFirstIso8601MondayOfYear(year);
+    const cctz::civil_day last_sunday = GetLastIso8601SundayOfYear(year);
+
+    const cctz::civil_day today = first_monday + (week_of_year - 1) * 7 + day_of_week - 1;
+    if (today <= last_sunday) return DateValue(today - EPOCH_DATE);
+  }
+  return DateValue();
+}
+
 DateValue DateValue::ParseSimpleDateFormat(const char* str, int len,
     bool accept_time_toks) {
   DateValue dv;
@@ -87,21 +152,6 @@ string DateValue::Format(const DateTimeFormatContext& dt_ctx) const {
   return DateParser::Format(dt_ctx, *this);
 }
 
-namespace {
-
-inline int32_t CalcFirstDayOfYearSinceEpoch(int year) {
-  int m400 = year % 400;
-  int m100 = m400 % 100;
-  int m4 = m100 % 4;
-
-  return (year - EPOCH_YEAR) * 365
-      + ((year - EPOCH_YEAR / 4 * 4 + ((m4 != 0) ? 4 - m4 : 0)) / 4 - 1)
-      - ((year - EPOCH_YEAR / 100 * 100 + ((m100 != 0) ? 100 - m100 : 0)) / 100 - 1)
-      + ((year - EPOCH_YEAR / 400 * 400 + ((m400 != 0) ? 400 - m400 : 0)) / 400 - 1);
-}
-
-}
-
 bool DateValue::ToYear(int* year) const {
   DCHECK(year != nullptr);
   if (UNLIKELY(!IsValid())) return false;
@@ -144,7 +194,7 @@ bool DateValue::ToYear(int* year) const {
   // f(year) = - ((m4 != 0) ? (4 - m4) * 100 : 0)
   //           + ((m100 != 0) ? (100 - m100) * 4 : 0)
   //           - ((m400 != 0) ? 400 - m400 : 0)
-  // and 'year' is in the [0, 9999] range, then it follows that (B):
+  // and 'year' is in the [1, 9999] range, then it follows that (B):
   // f(year) must fall into the [-591, 288] range.
   //
   // Finally, if we put (A) and (B) together we can conclude that 'year' must fall into
@@ -206,38 +256,24 @@ int DateValue::DayOfYear() const {
   return static_cast<int>(cctz::get_yearday(cd));
 }
 
-int DateValue::WeekOfYear() const {
+int DateValue::Iso8601WeekOfYear() const {
   if (UNLIKELY(!IsValid())) return -1;
   const cctz::civil_day today = EPOCH_DATE + days_since_epoch_;
-
-  cctz::civil_day jan1 = cctz::civil_day(today.year(), 1, 1);
-  cctz::civil_day first_monday;
-  if (cctz::get_weekday(jan1) <= cctz::weekday::thursday) {
-    // Get the previous Monday if 'jan1' is not already a Monday.
-    first_monday = cctz::next_weekday(jan1, cctz::weekday::monday) - 7;
-  } else {
-    // Get the next Monday.
-    first_monday = cctz::next_weekday(jan1, cctz::weekday::monday);
-  }
-
-  cctz::civil_day dec31 = cctz::civil_day(today.year(), 12, 31);
-  cctz::civil_day last_sunday;
-  if (cctz::get_weekday(dec31) >= cctz::weekday::thursday) {
-    // Get the next Sunday if 'dec31' is not already a Sunday.
-    last_sunday = cctz::prev_weekday(dec31, cctz::weekday::sunday) + 7;
-  } else {
-    // Get the previous Sunday.
-    last_sunday = cctz::prev_weekday(dec31, cctz::weekday::sunday);
-  }
+  const cctz::civil_day first_monday = GetFirstIso8601MondayOfYear(today.year());
+  const cctz::civil_day last_sunday = GetLastIso8601SundayOfYear(today.year());
 
   // 0001-01-01 is Monday in the proleptic Gregorian calendar.
   // 0001-01-01 belongs to year 0001.
+  // 9999-12-31 is Friday in the proleptic Gregorian calendar.
+  // 9999-12-31 belongs to year 9999.
   if (today >= first_monday && today <= last_sunday) {
     return (today - first_monday) / 7 + 1;
   } else if (today > last_sunday) {
+    DCHECK(today.year() >= MIN_YEAR && today.year() < MAX_YEAR);
     return 1;
   } else {
-    // today < first_monday && today.year() > 0
+    DCHECK(today < first_monday);
+    DCHECK(today.year() > MIN_YEAR && today.year() <= MAX_YEAR);
     cctz::civil_day prev_jan1 = cctz::civil_day(today.year() - 1, 1, 1);
     cctz::civil_day prev_first_monday;
     if (cctz::get_weekday(prev_jan1) <= cctz::weekday::thursday) {
@@ -251,6 +287,28 @@ int DateValue::WeekOfYear() const {
   }
 }
 
+int DateValue::Iso8601WeekNumberingYear() const {
+  if (UNLIKELY(!IsValid())) return -1;
+  const cctz::civil_day today = EPOCH_DATE + days_since_epoch_;
+  const cctz::civil_day first_monday = GetFirstIso8601MondayOfYear(today.year());
+  const cctz::civil_day last_sunday = GetLastIso8601SundayOfYear(today.year());
+
+  // 0001-01-01 is Monday in the proleptic Gregorian calendar.
+  // 0001-01-01 belongs to year 0001.
+  // 9999-12-31 is Friday in the proleptic Gregorian calendar.
+  // 9999-12-31 belongs to year 9999.
+  if (today >= first_monday && today <= last_sunday) {
+    return today.year();
+  } else if (today > last_sunday) {
+    DCHECK(today.year() >= MIN_YEAR && today.year() < MAX_YEAR);
+    return today.year() + 1;
+  } else {
+    DCHECK(today < first_monday);
+    DCHECK(today.year() > MIN_YEAR && today.year() <= MAX_YEAR);
+    return today.year() - 1;
+  }
+}
+
 DateValue DateValue::AddDays(int64_t days) const {
   if (UNLIKELY(!IsValid()
       || days < MIN_DAYS_SINCE_EPOCH - days_since_epoch_
diff --git a/be/src/runtime/date-value.h b/be/src/runtime/date-value.h
index 2719466..e7c48f5 100644
--- a/be/src/runtime/date-value.h
+++ b/be/src/runtime/date-value.h
@@ -71,6 +71,15 @@ class DateValue {
 
   DateValue(int64_t year, int64_t month, int64_t day);
 
+  /// Creates a DateValue instance from the ISO 8601 week-based date values:
+  /// 'year' is expected to be in the [1, 9999] range.
+  /// 'week_of_year' is expected to be in the [1, 53] range.
+  /// 'day_of_week' is expected to be in the [1, 7] range.
+  /// If any of the parameters is out of range or 'year' has less than 'week_of_year'
+  /// ISO 8601 weeks, returns an invalid DateValue instance.
+  static DateValue CreateFromIso8601WeekBasedDateVals(int week_numbering_year,
+      int week_of_year, int day_of_week);
+
   bool IsValid() const {
     return days_since_epoch_ != INVALID_DAYS_SINCE_EPOCH;
   }
@@ -99,10 +108,20 @@ class DateValue {
   /// Otherwise, return -1.
   int DayOfYear() const;
 
-  /// Returns the week corresponding to his date. Returned value is in the [1, 53] range.
-  /// Weeks start with Monday. Each week's year is the Gregorian year in which the
-  /// Thursday falls.
-  int WeekOfYear() const;
+  /// Returns the ISO 8601 week corresponding to this date.
+  /// If this DateValue instance is invalid, the return value is -1; otherwise the return
+  /// value is in the [1, 53] range.
+  /// ISO 8601 weeks start with Monday. Each ISO 8601 week's year is the Gregorian year in
+  /// which the Thursday falls.
+  int Iso8601WeekOfYear() const;
+
+  /// Returns the year that the current ISO 8601 week belongs to.
+  /// If this DateValue instance is invalid, the return value is -1; otherwise the return
+  /// value is in the [current_year - 1, current_year + 1] range, which always falls into
+  /// the [1, 9999] range.
+  /// ISO 8601 weeks start with Monday. Each ISO 8601 week's year is the Gregorian year in
+  /// which the Thursday falls.
+  int Iso8601WeekNumberingYear() const;
 
   /// If this DateValue instance valid, add 'days' days to it and return the result.
   /// Otherwise, return an invalid DateValue instance.
diff --git a/be/src/runtime/datetime-iso-sql-format-parser.cc b/be/src/runtime/datetime-iso-sql-format-parser.cc
index bd7fb5e..2069256 100644
--- a/be/src/runtime/datetime-iso-sql-format-parser.cc
+++ b/be/src/runtime/datetime-iso-sql-format-parser.cc
@@ -30,11 +30,14 @@ namespace datetime_parse_util {
 bool IsoSqlFormatParser::ParseDateTime(const char* input_str, int input_len,
       const DateTimeFormatContext& dt_ctx, DateTimeParseResult* result) {
   DCHECK(dt_ctx.toks.size() > 0);
+  DCHECK(dt_ctx.current_time != nullptr);
+  DCHECK(dt_ctx.current_time->HasDateAndTime());
   DCHECK(result != nullptr);
   DCHECK(result->hour == 0);
   if (input_str == nullptr || input_len <= 0) return false;
 
   int day_in_year = -1;
+  Iso8601WeekBasedDateParseResult iso8601_week_based_date_values;
 
   const char* current_pos = input_str;
   const char* end_pos = input_str + input_len;
@@ -87,7 +90,8 @@ bool IsoSqlFormatParser::ParseDateTime(const char* input_str, int input_len,
           return false;
         }
         if (token_len < 4) {
-            PrefixYearFromCurrentYear(token_len, dt_ctx.current_time, result);
+          result->year = PrefixYearFromCurrentYear(result->year, token_len,
+              dt_ctx.current_time->date().year());
         }
         break;
       }
@@ -95,9 +99,13 @@ bool IsoSqlFormatParser::ParseDateTime(const char* input_str, int input_len,
         if (!ParseAndValidate(current_pos, token_len, 0, 9999, &result->year)) {
           return false;
         }
-        if (token_len == 2) GetRoundYear(dt_ctx.current_time, result);
+        if (token_len == 2) {
+          result->year = RoundYearFromCurrentYear(result->year,
+              dt_ctx.current_time->date().year());
+        }
         if (token_len == 3 || token_len == 1) {
-            PrefixYearFromCurrentYear(token_len, dt_ctx.current_time, result);
+          result->year = PrefixYearFromCurrentYear(result->year, token_len,
+              dt_ctx.current_time->date().year());
         }
         break;
       }
@@ -115,6 +123,14 @@ bool IsoSqlFormatParser::ParseDateTime(const char* input_str, int input_len,
         }
         break;
       }
+      case DAY_NAME:
+      case DAY_NAME_SHORT: {
+        if (!ParseDayNameToken(*tok, current_pos, &token_end_pos, dt_ctx.fx_modifier,
+            &iso8601_week_based_date_values.day_of_week)) {
+          return false;
+        }
+        break;
+      }
       case DAY_IN_MONTH: {
         if (!ParseAndValidate(current_pos, token_len, 1, 31, &result->day)) return false;
         break;
@@ -194,6 +210,32 @@ bool IsoSqlFormatParser::ParseDateTime(const char* input_str, int input_len,
         if (toupper(*current_pos) != toupper(*tok->val)) return false;
         break;
       }
+      case ISO8601_WEEK_NUMBERING_YEAR: {
+        if (!ParseAndValidate(current_pos, token_len, 0, 9999,
+            &iso8601_week_based_date_values.year)) {
+          return false;
+        }
+        if (token_len < 4) {
+          iso8601_week_based_date_values.year = PrefixYearFromCurrentYear(
+              iso8601_week_based_date_values.year, token_len,
+              GetIso8601WeekNumberingYear(dt_ctx.current_time));
+        }
+        break;
+      }
+      case ISO8601_WEEK_OF_YEAR: {
+        if (!ParseAndValidate(current_pos, token_len, 1, 53,
+            &iso8601_week_based_date_values.week_of_year)) {
+          return false;
+        }
+        break;
+      }
+      case ISO8601_DAY_OF_WEEK: {
+        if (!ParseAndValidate(current_pos, token_len, 1, 7,
+            &iso8601_week_based_date_values.day_of_week)) {
+          return false;
+        }
+        break;
+      }
       default: {
         return false;
       }
@@ -213,6 +255,10 @@ bool IsoSqlFormatParser::ParseDateTime(const char* input_str, int input_len,
     }
   }
 
+  if (iso8601_week_based_date_values.IsSet()) {
+    if (!iso8601_week_based_date_values.ExtractYearMonthDay(result)) return false;
+  }
+
   return true;
 }
 
@@ -299,6 +345,10 @@ const char* IsoSqlFormatParser::FindEndOfToken(const char* input_str,
     if (input_len < MAX_MONTH_NAME_LENGTH) return nullptr;
     return input_str + MAX_MONTH_NAME_LENGTH;
   }
+  if (tok.type == DAY_NAME && fx_provided && !tok.fm_modifier) {
+    if (input_len < MAX_DAY_NAME_LENGTH) return nullptr;
+    return input_str + MAX_DAY_NAME_LENGTH;
+  }
 
   int max_tok_len = min(input_len, tok.len);
   const char* start_of_token = input_str;
@@ -335,25 +385,40 @@ const char* IsoSqlFormatParser::ParseMeridiemIndicatorFromInput(
   return nullptr;
 }
 
-void IsoSqlFormatParser::PrefixYearFromCurrentYear(int actual_token_len,
-    const TimestampValue* now,  DateTimeParseResult* result) {
-  DCHECK(actual_token_len > 0 && actual_token_len < 4);
-  DCHECK(now != nullptr);
-  DCHECK(result != nullptr);
-  int adjust_factor = pow(10, actual_token_len);
-  int adjustment = (now->date().year() / adjust_factor) * adjust_factor;
-  result->year += adjustment;
+int IsoSqlFormatParser::PrefixYearFromCurrentYear(int year, int year_token_len,
+    int current_year) {
+  DCHECK(year >= 0 && year < 1000);
+  DCHECK(year_token_len > 0 && year_token_len < 4);
+  int adjust_factor = pow(10, year_token_len);
+  int adjustment = (current_year / adjust_factor) * adjust_factor;
+  return year + adjustment;
+}
+
+int IsoSqlFormatParser::RoundYearFromCurrentYear(int year, int current_year) {
+  DCHECK(year >= 0 && year < 100);
+  int postfix_of_curr_year = current_year % 100;
+  if (year < 50 && postfix_of_curr_year > 49) year += 100;
+  if (year > 49 && postfix_of_curr_year < 50) year -= 100;
+  return year + (current_year / 100) * 100;
 }
 
-void IsoSqlFormatParser::GetRoundYear(const TimestampValue* now,
-    DateTimeParseResult* result) {
+int IsoSqlFormatParser::GetIso8601WeekNumberingYear(const TimestampValue* now) {
   DCHECK(now != nullptr);
+  DCHECK(now->HasDate());
+
+  static const boost::gregorian::date EPOCH(1970, 1, 1);
+  DateValue dv = DateValue((now->date() - EPOCH).days());
+  DCHECK(dv.IsValid());
+  return dv.Iso8601WeekNumberingYear();
+}
+
+bool IsoSqlFormatParser::Iso8601WeekBasedDateParseResult::ExtractYearMonthDay(
+    DateTimeParseResult* result) const {
   DCHECK(result != nullptr);
-  DCHECK(result->year >= 0 && result->year < 100);
-  int postfix_of_curr_year = now->date().year() % 100;
-  if (result->year < 50 && postfix_of_curr_year > 49) result->year += 100;
-  if (result->year > 49 && postfix_of_curr_year < 50) result->year -= 100;
-  result->year += (now->date().year() / 100) * 100;
+  DCHECK(IsSet());
+  DateValue dv = DateValue::CreateFromIso8601WeekBasedDateVals(year, week_of_year,
+      day_of_week);
+  return dv.ToYearMonthDay(&result->year, &result->month, &result->day);
 }
 
 }
diff --git a/be/src/runtime/datetime-iso-sql-format-parser.h b/be/src/runtime/datetime-iso-sql-format-parser.h
index 5491abf..3247ab0 100644
--- a/be/src/runtime/datetime-iso-sql-format-parser.h
+++ b/be/src/runtime/datetime-iso-sql-format-parser.h
@@ -40,6 +40,24 @@ public:
       WARN_UNUSED_RESULT;
 
 private:
+  // Struct used for capturing ISO 8601 week-based tokens during parsing.
+  struct Iso8601WeekBasedDateParseResult {
+    int year = -1;
+    int week_of_year = -1;
+    int day_of_week = -1;
+
+    bool IsSet() const {
+      return (year != -1 || week_of_year != -1 || day_of_week != -1);
+    }
+
+    /// Calcuates year/month/day date values corresponding to the ISO 8601 week-based
+    /// date values stored in this instance.
+    /// If the ISO 8601 week-based date values are valid, the resulting year/month/day
+    /// values are copied to 'result' and true is returned.
+    /// Otherwise, 'result' is left intact and false is returned.
+    bool ExtractYearMonthDay(DateTimeParseResult* result) const WARN_UNUSED_RESULT;
+  };
+
   /// 'input_str' points to a location in the input string where the parsing stands now.
   /// Given 'tok' as the next token in the list of tokens created by the tokenizer this
   /// function finds the end of the next token.
@@ -60,15 +78,16 @@ private:
   static const char* ParseMeridiemIndicatorFromInput(const char* input_str,
       int input_len);
 
-  /// If the year part of the input is shorter than 4 digits then prefixes the year with
-  /// digits from the current year. Puts the result into 'result->year'.
-  static void PrefixYearFromCurrentYear(int actual_token_len, const TimestampValue* now,
-      DateTimeParseResult* result);
+  /// Takes 'year' that is shorter than 4 digits and prefixes it with digits from
+  /// 'current_year'. Returns the resulting 4-digit year.
+  static int PrefixYearFromCurrentYear(int year, int year_token_len, int current_year);
+
+  /// This function takes a 2-digit 'year' and constructs a 4-digit year from it based on
+  /// the 'current_year'. Returns the resulting year.
+  static int RoundYearFromCurrentYear(int year, int current_year);
 
-  /// Uses 'result->year' as an input. Can call this function for 2-digit years and it
-  /// constructs a 4-digit year based on the year provided and the current date. Puts the
-  /// result back to 'result->year'.
-  static void GetRoundYear(const TimestampValue* now, DateTimeParseResult* result);
+  /// Returns the ISO 8601 week numbering year corresponding to 'now' date.
+  static int GetIso8601WeekNumberingYear(const TimestampValue* now);
 
   /// Gets a pointer to the current char in the input string and an index to the current
   /// token being processed within 'dt_ctx->toks'. Advances these pointers to the end of
diff --git a/be/src/runtime/datetime-iso-sql-format-tokenizer.cc b/be/src/runtime/datetime-iso-sql-format-tokenizer.cc
index 124f03a..b0f4912 100644
--- a/be/src/runtime/datetime-iso-sql-format-tokenizer.cc
+++ b/be/src/runtime/datetime-iso-sql-format-tokenizer.cc
@@ -70,7 +70,13 @@ const unordered_map<string, IsoSqlFormatTokenizer::TokenItem>
   {"D", IsoSqlFormatTokenizer::TokenItem(DAY_OF_WEEK, true, false)},
   {"Q", IsoSqlFormatTokenizer::TokenItem(QUARTER_OF_YEAR, true, false)},
   {"WW", IsoSqlFormatTokenizer::TokenItem(WEEK_OF_YEAR, true, false)},
-  {"W", IsoSqlFormatTokenizer::TokenItem(WEEK_OF_MONTH, true, false)}
+  {"W", IsoSqlFormatTokenizer::TokenItem(WEEK_OF_MONTH, true, false)},
+  {"IYYY", IsoSqlFormatTokenizer::TokenItem(ISO8601_WEEK_NUMBERING_YEAR, true, false)},
+  {"IYY", IsoSqlFormatTokenizer::TokenItem(ISO8601_WEEK_NUMBERING_YEAR, true, false)},
+  {"IY", IsoSqlFormatTokenizer::TokenItem(ISO8601_WEEK_NUMBERING_YEAR, true, false)},
+  {"I", IsoSqlFormatTokenizer::TokenItem(ISO8601_WEEK_NUMBERING_YEAR, true, false)},
+  {"IW", IsoSqlFormatTokenizer::TokenItem(ISO8601_WEEK_OF_YEAR, true, false)},
+  {"ID", IsoSqlFormatTokenizer::TokenItem(ISO8601_DAY_OF_WEEK, true, false)}
 });
 
 const unordered_map<string, int> IsoSqlFormatTokenizer::SPECIAL_LENGTHS({
@@ -187,7 +193,11 @@ FormatTokenizationResult IsoSqlFormatTokenizer::CheckIncompatibilities() const {
   short provided_year_count = IsUsedToken("YYYY") + IsUsedToken("YYY") +
       IsUsedToken("YY") + IsUsedToken("Y");
   short provided_round_year_count = IsUsedToken("RRRR") + IsUsedToken("RR");
-  if (provided_year_count > 1 || provided_round_year_count > 1) {
+  short provided_iso8601_week_numbering_year_count = IsUsedToken("IYYY") +
+      IsUsedToken("IYY") + IsUsedToken("IY") + IsUsedToken("I");
+
+  if (provided_year_count > 1 || provided_round_year_count > 1 ||
+      provided_iso8601_week_numbering_year_count > 1) {
     return CONFLICTING_YEAR_TOKENS_ERROR;
   }
 
@@ -197,20 +207,20 @@ FormatTokenizationResult IsoSqlFormatTokenizer::CheckIncompatibilities() const {
 
   if (IsUsedToken("Q")) return QUARTER_NOT_ALLOWED_FOR_PARSING;
 
-  short provided_month_tokens = IsUsedToken("MM") + IsUsedToken("MONTH") +
-      IsUsedToken("MON");
-  if (provided_month_tokens > 1) return CONFLICTING_MONTH_TOKENS_ERROR;
-
   if (IsUsedToken("WW") || IsUsedToken("W")) return WEEK_NUMBER_NOT_ALLOWED_FOR_PARSING;
 
-  if (IsUsedToken("DDD") && (IsUsedToken("DD") || provided_month_tokens == 1)) {
+  short provided_month_count = IsUsedToken("MM") + IsUsedToken("MONTH")
+      + IsUsedToken("MON");
+  if (provided_month_count > 1) return CONFLICTING_MONTH_TOKENS_ERROR;
+
+  bool is_used_day_of_year = IsUsedToken("DDD");
+  bool is_used_day_of_month = IsUsedToken("DD");
+  if (is_used_day_of_year && (is_used_day_of_month || provided_month_count == 1)) {
     return DAY_OF_YEAR_TOKEN_CONFLICT;
   }
 
   if (IsUsedToken("D")) return DAY_OF_WEEK_NOT_ALLOWED_FOR_PARSING;
 
-  if (IsUsedToken("DAY") || IsUsedToken("DY")) return DAY_NAME_NOT_ALLOWED_FOR_PARSING;
-
   short provided_hour_tokens = IsUsedToken("HH") + IsUsedToken("HH12") +
       IsUsedToken("HH24");
   if (provided_hour_tokens > 1) {
@@ -243,6 +253,27 @@ FormatTokenizationResult IsoSqlFormatTokenizer::CheckIncompatibilities() const {
   if (provided_fractional_second_count > 1) {
     return CONFLICTING_FRACTIONAL_SECOND_TOKENS_ERROR;
   }
+
+  short provided_day_of_week_count = IsUsedToken("ID") + IsUsedToken("DAY") +
+      IsUsedToken("DY");
+  if (provided_day_of_week_count > 1) return CONFLICTING_DAY_OF_WEEK_TOKENS_ERROR;
+
+  bool is_used_iso8601_week_of_year = IsUsedToken("IW");
+  if (provided_iso8601_week_numbering_year_count == 1 || is_used_iso8601_week_of_year ||
+      provided_day_of_week_count == 1) {
+    if (provided_year_count == 1 || provided_round_year_count == 1 ||
+        provided_month_count == 1 || is_used_day_of_year || is_used_day_of_month) {
+      if (IsUsedToken("DAY") || IsUsedToken("DY")) {
+        return DAY_NAME_NOT_ALLOWED_FOR_PARSING;
+      }
+      return CONFLICTING_DATE_TOKENS_ERROR;
+    }
+    if (provided_iso8601_week_numbering_year_count == 0 ||
+        !is_used_iso8601_week_of_year || provided_day_of_week_count == 0) {
+      return MISSING_ISO8601_WEEK_BASED_TOKEN_ERROR;
+    }
+  }
+
   return SUCCESS;
 }
 
diff --git a/be/src/runtime/datetime-parser-common.cc b/be/src/runtime/datetime-parser-common.cc
index 5ff3430..1cf1474 100644
--- a/be/src/runtime/datetime-parser-common.cc
+++ b/be/src/runtime/datetime-parser-common.cc
@@ -28,6 +28,7 @@
 
 using boost::algorithm::is_any_of;
 using std::string;
+using std::unordered_map;
 using std::unordered_set;
 
 namespace impala {
@@ -115,7 +116,7 @@ void ReportBadFormat(FunctionContext* context, FormatTokenizationResult error_ty
         ss << "PARSE ERROR: Time tokens provided with date type.";
         break;
       case CONFLICTING_FRACTIONAL_SECOND_TOKENS_ERROR:
-        ss << "PARSE ERROR: Multiple fractional second token provided.";
+        ss << "PARSE ERROR: Multiple fractional second tokens provided.";
         break;
       case TEXT_TOKEN_NOT_CLOSED:
         ss << "PARSE ERROR: Missing closing quotation mark.";
@@ -125,6 +126,16 @@ void ReportBadFormat(FunctionContext* context, FormatTokenizationResult error_ty
         break;
       case MISPLACED_FX_MODIFIER_ERROR:
         ss << "PARSE ERROR: FX modifier should be at the beginning of the format string.";
+      case CONFLICTING_DAY_OF_WEEK_TOKENS_ERROR:
+        ss << "PARSE ERROR: Multiple day of week tokens provided.";
+        break;
+      case MISSING_ISO8601_WEEK_BASED_TOKEN_ERROR:
+        ss << "PARSE ERROR: One or more required ISO 8601 week-based date tokens "
+              "(i.e. IYYY, IW, ID) are missing.";
+        break;
+      case CONFLICTING_DATE_TOKENS_ERROR:
+        ss << "PARSE ERROR: ISO 8601 week-based date tokens (i.e. IYYY, IW, ID) are not "
+           << "allowed to be used with regular date tokens.";
         break;
       case QUARTER_NOT_ALLOWED_FOR_PARSING:
         ss << "PARSE_ERROR: Quarter token is not allowed in a string to datetime "
@@ -136,7 +147,7 @@ void ReportBadFormat(FunctionContext* context, FormatTokenizationResult error_ty
         break;
       case DAY_NAME_NOT_ALLOWED_FOR_PARSING:
         ss << "PARSE_ERROR: Day name token is not allowed in a string to datetime "
-            "conversion";
+            "conversion except with IYYY|IYY|IY|I and IW tokens";
         break;
       case WEEK_NUMBER_NOT_ALLOWED_FOR_PARSING:
         ss << "PARSE_ERROR: Week number token is not allowed in a string to datetime "
@@ -189,51 +200,101 @@ int GetQuarter(int month) {
   return (month - 1) / 3 + 1;
 }
 
-bool ParseMonthNameToken(const DateTimeFormatToken& tok, const char* token_start,
-    const char** token_end, bool fx_modifier, int* month) {
+namespace {
+// Helper function used by ParseMonthNameToken() and ParseDayNameToken().
+// Parses 'token_start' string and returns true if it finds a valid short or normal name.
+// The valid name prefixes are stored as keys in 'prefix_to_suffix' map. The valid name
+// prefixes are expected to be equal to the corresponding valid short names.
+// The valid name suffixes and the corresponding int IDs are stored as values in
+// 'prefix_to_suffix' map.
+// Returns true iff parsing was successful.
+// If the parsed name in 'token_start' is not followed by a separator then the end of the
+// name is found using 'prefix_to_suffix'. If parsing is successful, '*token_end' is
+// adjusted to point to the character right after the end of the name.
+bool ParseNameTokenHelper(
+    bool is_short_name, int short_name_len, int max_name_len,
+    const char* token_start, const char** token_end,
+    bool is_strict_matching,
+    const unordered_map<string, pair<string, int>>& prefix_to_suffix,
+    int* ret_val) {
   DCHECK(token_start != nullptr);
-  DCHECK(tok.type == MONTH_NAME || tok.type == MONTH_NAME_SHORT);
-  DCHECK(month != nullptr);
+  DCHECK(ret_val != nullptr);
   DCHECK(token_end != nullptr && *token_end != nullptr);
   DCHECK(token_start <= *token_end);
   int token_len = *token_end - token_start;
-  if (token_len < SHORT_MONTH_NAME_LENGTH) return false;
+  if (token_len < short_name_len) return false;
 
   string buff(token_start, token_len);
   boost::to_lower(buff);
-  const string& prefix = buff.substr(0, SHORT_MONTH_NAME_LENGTH);
-  const auto month_iter = MONTH_PREFIX_TO_SUFFIX.find(prefix);
-  if (UNLIKELY(month_iter == MONTH_PREFIX_TO_SUFFIX.end())) return false;
-  if (tok.type == MONTH_NAME_SHORT) {
-    *month = month_iter->second.second;
+  const string& prefix = buff.substr(0, short_name_len);
+  const auto it = prefix_to_suffix.find(prefix);
+  if (UNLIKELY(it == prefix_to_suffix.end())) return false;
+  if (is_short_name) {
+    DCHECK(token_len == short_name_len);
+    *ret_val = it->second.second;
     return true;
   }
 
-  DCHECK(tok.type == MONTH_NAME);
-  if (fx_modifier && !tok.fm_modifier) {
-    DCHECK(buff.length() == MAX_MONTH_NAME_LENGTH);
+  if (is_strict_matching) {
+    DCHECK(buff.length() == max_name_len);
     trim_right_if(buff, is_any_of(" "));
   }
 
   // Check if the remaining characters match.
-  const string& expected_suffix = month_iter->second.first;
-  if (buff.length() - SHORT_MONTH_NAME_LENGTH < expected_suffix.length()) return false;
-  const char* actual_suffix = buff.c_str() + SHORT_MONTH_NAME_LENGTH;
+  const string& expected_suffix = it->second.first;
+  if (buff.length() - short_name_len < expected_suffix.length()) return false;
+  const char* actual_suffix = buff.c_str() + short_name_len;
   if (strncmp(actual_suffix, expected_suffix.c_str(), expected_suffix.length()) != 0) {
     return false;
   }
-  *month = month_iter->second.second;
+  *ret_val = it->second.second;
 
-  // If the end of the month token wasn't identified because it wasn't followed by a
-  // separator then the end of the month token has to be adjusted.
+  // If the end of the name token wasn't identified because it wasn't followed by a
+  // separator then the end of the name token has to be adjusted.
   if (prefix.length() + expected_suffix.length() < buff.length()) {
-    if (fx_modifier && !tok.fm_modifier) return false;
+    if (is_strict_matching) return false;
     *token_end = token_start + prefix.length() + expected_suffix.length();
   }
 
   return true;
 }
 
+}
+
+bool ParseMonthNameToken(const DateTimeFormatToken& tok, const char* token_start,
+    const char** token_end, bool fx_modifier, int* month) {
+  DCHECK(token_start != nullptr);
+  DCHECK(tok.type == MONTH_NAME || tok.type == MONTH_NAME_SHORT);
+  DCHECK(month != nullptr);
+  DCHECK(token_end != nullptr && *token_end != nullptr);
+  DCHECK(token_start <= *token_end);
+
+  return ParseNameTokenHelper(
+      tok.type == MONTH_NAME_SHORT,
+      SHORT_MONTH_NAME_LENGTH, MAX_MONTH_NAME_LENGTH,
+      token_start, token_end,
+      fx_modifier && !tok.fm_modifier,
+      MONTH_PREFIX_TO_SUFFIX,
+      month);
+}
+
+bool ParseDayNameToken(const DateTimeFormatToken& tok, const char* token_start,
+    const char** token_end, bool fx_modifier, int* day) {
+  DCHECK(token_start != nullptr);
+  DCHECK(tok.type == DAY_NAME || tok.type == DAY_NAME_SHORT);
+  DCHECK(day != nullptr);
+  DCHECK(token_end != nullptr && *token_end != nullptr);
+  DCHECK(token_start <= *token_end);
+
+  return ParseNameTokenHelper(
+      tok.type == DAY_NAME_SHORT,
+      SHORT_DAY_NAME_LENGTH, MAX_DAY_NAME_LENGTH,
+      token_start, token_end,
+      fx_modifier && !tok.fm_modifier,
+      DAY_PREFIX_TO_SUFFIX,
+      day);
+}
+
 int GetDayInYear(int year, int month, int day_in_month) {
   DCHECK(month >= 1 && month <= 12);
   const vector<int>& month_ranges = IsLeapYear(year) ? LEAP_YEAR_MONTH_RANGES :
@@ -343,5 +404,13 @@ int GetWeekOfMonth(int day) {
   return (day - 1) / 7 + 1;
 }
 
+int AdjustYearToLength(int year, int len) {
+  if (len < 4) {
+    int adjust_factor = std::pow(10, len);
+    return year % adjust_factor;
+  }
+  return year;
+}
+
 }
 }
diff --git a/be/src/runtime/datetime-parser-common.h b/be/src/runtime/datetime-parser-common.h
index 7ab1a2b..9cbc58f 100644
--- a/be/src/runtime/datetime-parser-common.h
+++ b/be/src/runtime/datetime-parser-common.h
@@ -73,6 +73,20 @@ const std::unordered_map<std::string, std::pair<std::string, int>>
         {"dec", {"ember", 12}}
 };
 
+/// Similar to 'MONTH_PREFIX_TO_SUFFIX' but maps the 3-letter prefix of a day name to the
+/// suffix of the day name and the ordinal number of the day (1 means Monday and 7 means
+/// Sunday).
+const std::unordered_map<std::string, std::pair<std::string, int>>
+    DAY_PREFIX_TO_SUFFIX = {
+        {"mon", {"day", 1}},
+        {"tue", {"sday", 2}},
+        {"wed", {"nesday", 3}},
+        {"thu", {"rsday", 4}},
+        {"fri", {"day", 5}},
+        {"sat", {"urday", 6}},
+        {"sun", {"day", 7}}
+};
+
 /// Length of short month names like 'JAN', 'FEB', etc.
 const int SHORT_MONTH_NAME_LENGTH = 3;
 
@@ -111,7 +125,10 @@ enum FormatTokenizationResult {
   QUARTER_NOT_ALLOWED_FOR_PARSING,
   DAY_OF_WEEK_NOT_ALLOWED_FOR_PARSING,
   DAY_NAME_NOT_ALLOWED_FOR_PARSING,
-  WEEK_NUMBER_NOT_ALLOWED_FOR_PARSING
+  WEEK_NUMBER_NOT_ALLOWED_FOR_PARSING,
+  CONFLICTING_DAY_OF_WEEK_TOKENS_ERROR,
+  MISSING_ISO8601_WEEK_BASED_TOKEN_ERROR,
+  CONFLICTING_DATE_TOKENS_ERROR
 };
 
 /// Holds all the token types that serve as building blocks for datetime format patterns.
@@ -145,7 +162,10 @@ enum DateTimeFormatTokenType {
   DAY_OF_WEEK,
   QUARTER_OF_YEAR,
   WEEK_OF_YEAR,
-  WEEK_OF_MONTH
+  WEEK_OF_MONTH,
+  ISO8601_WEEK_NUMBERING_YEAR,
+  ISO8601_WEEK_OF_YEAR,
+  ISO8601_DAY_OF_WEEK
 };
 
 /// Indicates whether the cast is a 'datetime to string' or a 'string to datetime' cast.
@@ -290,6 +310,20 @@ bool ParseMonthNameToken(const DateTimeFormatToken& tok, const char* token_start
     const char** token_end, bool fx_modifier, int* month)
     WARN_UNUSED_RESULT;
 
+/// Gets a day name token (either full or short name) and converts it to the ordinal
+/// number of day between 1 and 7. Make sure 'tok.type' is either DAY_NAME or
+/// DAY_NAME_SHORT.
+/// Result is stored in 'day'. Returns false if the given day name is invalid.
+/// 'fx_modifier' indicates if there is an active FX modifier on the whole format.
+/// If the day part of the input is not followed by a separator then the end of the day
+/// part is found using DAY_PREFIX_TO_SUFFIX. First, the 3 letter prefix of the day name
+/// identifies a particular day and then checks if the rest of the day name matches. If it
+/// does then '*token_end' is adjusted to point to the character right after the end of
+/// the day part.
+bool ParseDayNameToken(const DateTimeFormatToken& tok, const char* token_start,
+    const char** token_end, bool fx_modifier, int* day)
+    WARN_UNUSED_RESULT;
+
 inline bool IsLeapYear(int year) {
   return year % 4 == 0 && (year % 100 != 0 || year % 400 == 0);
 }
@@ -331,6 +365,9 @@ int GetWeekOfYear(int year, int month, int day);
 /// month starts from the first day of the month.
 int GetWeekOfMonth(int day);
 
+/// Returns the year adjusted to 'len' digits.
+/// E.g. AdjustYearToLength(1789, 3) returns 789.
+int AdjustYearToLength(int year, int len);
 }
 
 }
diff --git a/be/src/runtime/timestamp-parse-util.cc b/be/src/runtime/timestamp-parse-util.cc
index 93a6039..9c6bac0 100644
--- a/be/src/runtime/timestamp-parse-util.cc
+++ b/be/src/runtime/timestamp-parse-util.cc
@@ -20,9 +20,9 @@
 #include "common/names.h"
 #include "runtime/datetime-iso-sql-format-parser.h"
 #include "runtime/datetime-simple-date-format-parser.h"
+#include "runtime/date-value.h"
 #include "runtime/runtime-state.h"
 #include "runtime/string-value.inline.h"
-#include "runtime/timestamp-value.h"
 #include "udf/udf-internal.h"
 #include "util/string-parser.h"
 
@@ -227,11 +227,7 @@ string TimestampParser::Format(const DateTimeFormatContext& dt_ctx, const date&
     switch (tok.type) {
       case YEAR:
       case ROUND_YEAR: {
-        num_val = d.year();
-        if (tok.len < 4) {
-          int adjust_factor = std::pow(10, tok.len);
-          num_val %= adjust_factor;
-        }
+        num_val = AdjustYearToLength(d.year(), tok.len);
         break;
       }
       case QUARTER_OF_YEAR: {
@@ -307,6 +303,21 @@ string TimestampParser::Format(const DateTimeFormatContext& dt_ctx, const date&
         result.append(FormatTextToken(tok));
         break;
       }
+      case ISO8601_WEEK_NUMBERING_YEAR: {
+        num_val = AdjustYearToLength(GetIso8601WeekNumberingYear(d), tok.len);
+        break;
+      }
+      case ISO8601_WEEK_OF_YEAR: {
+        num_val = d.week_number();
+        break;
+      }
+      case ISO8601_DAY_OF_WEEK: {
+        // day_of_week() returns 0 for Sunday, 1 for Monday and 6 for Saturday.
+        num_val = d.day_of_week();
+        // We need to output 1 for Monday and 7 for Sunday.
+        if (num_val == 0) num_val = 7;
+        break;
+      }
       default: DCHECK(false) << "Unknown date/time format token";
     }
     if (num_val > -1) {
@@ -320,4 +331,19 @@ string TimestampParser::Format(const DateTimeFormatContext& dt_ctx, const date&
   return result;
 }
 
+int TimestampParser::GetIso8601WeekNumberingYear(const boost::gregorian::date& d) {
+  DCHECK(!d.is_special());
+  DCHECK(1400 <= d.year() && d.year() <= 9999);
+
+  static const boost::gregorian::date epoch(1970, 1, 1);
+  DateValue dv((d - epoch).days());
+  DCHECK(dv.IsValid());
+
+  int week_numbering_year = dv.Iso8601WeekNumberingYear();
+  // 1400.01.01 is Wednesday. 9999.12.31 is Friday.
+  // This means that week_numbering_year must fall in the [1400, 9999] range.
+  DCHECK(1400 <= week_numbering_year && week_numbering_year <= 9999);
+  return week_numbering_year;
+}
+
 }
diff --git a/be/src/runtime/timestamp-parse-util.h b/be/src/runtime/timestamp-parse-util.h
index e9b00a4..ca71836 100644
--- a/be/src/runtime/timestamp-parse-util.h
+++ b/be/src/runtime/timestamp-parse-util.h
@@ -102,6 +102,14 @@ class TimestampParser {
   /// to 24 or zero otherwise.
   static int AdjustWithTimezone(boost::posix_time::time_duration* t,
       const boost::posix_time::time_duration& tz_offset);
+
+  /// Returns the year that the current week belongs to. Returned value is in the
+  /// [d.year() - 1, d.year() + 1] range.
+  /// Weeks start with Monday. Each week's year is the Gregorian year in which the
+  /// Thursday falls.
+  /// 'd' date is expected to fall in the [1400, 9999] year range. The returned week
+  /// numbering year must also fall in the [1400, 9999] range.
+  static int GetIso8601WeekNumberingYear(const boost::gregorian::date& d);
 };
 
 }
diff --git a/tests/query_test/test_cast_with_format.py b/tests/query_test/test_cast_with_format.py
index 7cbae34..f584bf6 100644
--- a/tests/query_test/test_cast_with_format.py
+++ b/tests/query_test/test_cast_with_format.py
@@ -154,7 +154,7 @@ class TestCastWithFormat(ImpalaTestSuite):
         "timestamp FORMAT 'YYYY-MM-DDTHH24:MI:SSZ')")
     assert result.data == ["2018-11-10 15:11:04"]
 
-    # ISO8601 format elements are case-insensitive
+    # ISO 8601 format elements are case-insensitive
     result = self.client.execute("select cast('2018-11-09t15:11:04Z' as "
         "timestamp FORMAT 'YYYY-MM-DDTHH24:MI:SSz')")
     assert result.data == ["2018-11-09 15:11:04"]
@@ -803,7 +803,70 @@ class TestCastWithFormat(ImpalaTestSuite):
     assert result.data == ["123"]
 
   def test_day_name(self):
-    # Different lowercase and uppercase scenarios.
+    # String to datetime: Test different lowercase vs uppercase scenarios.
+    result = self.execute_query(
+        "select cast('2010-08-Tuesday' as timestamp FORMAT 'IYYY-IW-DAY'), "
+        "       cast('2010-monday-08' as timestamp FORMAT 'IYYY-DAY-IW'), "
+        "       cast('2010-Wednesday-08' as date FORMAT 'IYYY-DAY-IW'), "
+        "       cast('2010 08 THURSDAY' as timestamp FORMAT 'IYYY IW DAY'), "
+        "       cast('2010 08 Friday' as date FORMAT 'IYYY IW DAY'), "
+        "       cast('2010 08 saturday' as timestamp FORMAT 'IYYY IW DAY'), "
+        "       cast('sUnDay 2010 08' as date FORMAT 'DAY IYYY IW'), "
+        "       cast('Monday 2010 09' as date FORMAT 'DAY IYYY IW')")
+    assert result.data == [
+        "2010-02-23 00:00:00\t2010-02-22 00:00:00\t2010-02-24\t"
+        "2010-02-25 00:00:00\t2010-02-26\t2010-02-27 00:00:00\t"
+        "2010-02-28\t2010-03-01"]
+    # And now with short day names.
+    result = self.execute_query(
+        "select cast('2010-08-Tue' as timestamp FORMAT 'IYYY-IW-DY'), "
+        "       cast('2010-mon-08' as timestamp FORMAT 'IYYY-DY-IW'), "
+        "       cast('2010-Wed-08' as date FORMAT 'IYYY-DY-IW'), "
+        "       cast('2010 08 THU' as timestamp FORMAT 'IYYY IW DY'), "
+        "       cast('2010 08 Fri' as date FORMAT 'IYYY IW DY'), "
+        "       cast('2010 08 sat' as timestamp FORMAT 'IYYY IW DY'), "
+        "       cast('sUn 2010 08' as date FORMAT 'DY IYYY IW'), "
+        "       cast('Mon 2010 09' as date FORMAT 'DY IYYY IW')")
+    assert result.data == [
+        "2010-02-23 00:00:00\t2010-02-22 00:00:00\t2010-02-24\t"
+        "2010-02-25 00:00:00\t2010-02-26\t2010-02-27 00:00:00\t"
+        "2010-02-28\t2010-03-01"]
+
+    # String to datetime: Incorrect day name.
+    result = self.execute_query("select cast('2010 09 Mondau' as timestamp FORMAT "
+        "'IYYY IW DAY')")
+    assert result.data == ["NULL"]
+
+    # String to datetime: DAY token without surrounding separators.
+    result = self.execute_query(
+        "select cast('2010MONDAY09' as date FORMAT 'IYYYDAYIW'), "
+        "       cast('2010WEDNESDAY9' as timestamp FORMAT 'IYYYDAYIW')")
+    assert result.data == ["2010-03-01\t2010-03-03 00:00:00"]
+    # And now with short day names.
+    result = self.execute_query(
+        "select cast('2010MON09' as date FORMAT 'IYYYDYIW'), "
+        "       cast('2010WED9' as timestamp FORMAT 'IYYYDYIW')")
+    assert result.data == ["2010-03-01\t2010-03-03 00:00:00"]
+
+    # String to datetime: FX and FM modifiers.
+    result = self.execute_query(
+        "select cast('2010-Monday-09' as timestamp FORMAT 'FXIYYY-DAY-IW'), "
+        "       cast('2010-Monday  X-09' as timestamp FORMAT 'FXIYYY-DAY-IW')")
+    assert result.data == ["NULL\tNULL"]
+
+    result = self.execute_query(
+        "select cast('2010-Monday   -09' as timestamp FORMAT 'FXIYYY-DAY-IW'), "
+        "       cast('2010-Monday   09' as date FORMAT 'FXIYYY-DAYIW')")
+    assert result.data == ["2010-03-01 00:00:00\t2010-03-01"]
+
+    result = self.execute_query(
+        "select cast('2010-Monday-09' as timestamp FORMAT 'FXIYYY-FMDAY-IW'), "
+        "       cast('2010-Monday09' as timestamp FORMAT 'FXIYYY-FMDAYIW'), "
+        "       cast('2010Monday09' as date FORMAT 'FXIYYYFMDAYIW')")
+    assert result.data == ["2010-03-01 00:00:00\t2010-03-01 00:00:00\t"
+                           "2010-03-01"]
+
+    # Datetime to string: Different lowercase and uppercase scenarios.
     result = self.execute_query("select cast(date'2019-11-13' as string "
         "format 'DAY Day day DY Dy dy')")
     assert result.data == ["WEDNESDAY Wednesday wednesday WED Wed wed"]
@@ -832,7 +895,7 @@ class TestCastWithFormat(ImpalaTestSuite):
         "format 'DAY Day day DY Dy dy')")
     assert result.data == ["TUESDAY   Tuesday   tuesday   TUE Tue tue"]
 
-    # Different lowercase and uppercase scenarios when FM is provided.
+    # Datetime to string: Different lowercase and uppercase scenarios when FM is provided.
     result = self.execute_query("select cast(cast('2019-11-13' as timestamp) as string "
         "format 'FMDAY FMDay FMday FMDY FMDy FMdy')")
     assert result.data == ["WEDNESDAY Wednesday wednesday WED Wed wed"]
@@ -861,12 +924,12 @@ class TestCastWithFormat(ImpalaTestSuite):
         "format 'FMDAY FMDay FMday FMDY FMDy FMdy')")
     assert result.data == ["TUESDAY Tuesday tuesday TUE Tue tue"]
 
-    # Test odd casing of day token.
+    # Datetime to string: Test odd casing of day token.
     result = self.execute_query("select cast(date'2010-01-20' as string FORMAT "
         "'DAy dAY daY dY')")
     assert result.data == ["WEDNESDAY wednesday wednesday wed"]
 
-    # Day token without surrounding separators
+    # Datetime to string: Day token without surrounding separators.
     result = self.execute_query("select cast(date'2019-11-11' as string "
         "format 'YYYYDayMonth')")
     assert result.data == ["2019Monday   November "]
@@ -883,7 +946,7 @@ class TestCastWithFormat(ImpalaTestSuite):
         "format 'YYYYDYDD')")
     assert result.data == ["2019TUE12"]
 
-    # Day token with FM and FX modifiers.
+    # Datetime to string: Day token with FM and FX modifiers.
     result = self.execute_query("select cast(cast('2019-01-01' as timestamp) as string "
         "format 'FXYYYY DAY DD')")
     assert result.data == ["2019 TUESDAY   01"]
@@ -1388,6 +1451,143 @@ class TestCastWithFormat(ImpalaTestSuite):
         r''' 'FXYYYY-"\\\'"-MM-DD') ''')
     assert result.data == ["2010-02-01"]
 
+  def test_iso8601_week_based_date_tokens(self):
+    # Format 0001-01-01 and 9999-12-31 dates.
+    # 0001-01-01 is Monday, belongs to the 1st week of year 1.
+    # 9999-12-31 is Friday, belongs to the 52nd week of year 9999.
+    result = self.client.execute(
+        "select cast(date'0001-01-01' as string format 'IYYY/IW/ID'), "
+        "       cast(date'9999-12-31' as string format 'IYYY/IW/ID')")
+    assert result.data == ["0001/01/01\t9999/52/05"]
+
+    # Parse 0001-01-01 and 9999-12-31 dates.
+    result = self.client.execute(
+        "select cast('0001/01/01' as date format 'IYYY/IW/ID'), "
+        "       cast('9999/52/05' as date format 'IYYY/IW/ID')")
+    assert result.data == ["0001-01-01\t9999-12-31"]
+
+    # Parse out-of-range dates.
+    # Year 9999 has 52 weeks. 9999-12-31 is Friday.
+    err = self.execute_query_expect_failure(self.client,
+        "select cast('9999/52/06' as date format 'IYYY/IW/ID')")
+    assert 'String to Date parse failed. Invalid string val: "9999/52/06"' in str(err)
+    err = self.execute_query_expect_failure(self.client,
+        "select cast('9999/53/01' as date format 'IYYY/IW/ID')")
+    assert 'String to Date parse failed. Invalid string val: "9999/53/01"' in str(err)
+
+    # Format 1400-01-01 and 9999-12-31 timestamps.
+    # 1400-01-01 is Wednesday, belongs to the 1st week of year 1400.
+    # 9999-12-31 is Friday, belongs to the 52nd week of year 9999.
+    result = self.client.execute(
+        "select cast(cast('1400-01-01' as timestamp) as string format 'IYYY/IW/ID'), "
+        "       cast(cast('9999-12-31' as timestamp) as string format 'IYYY/IW/ID')")
+    assert result.data == ["1400/01/03\t9999/52/05"]
+
+    # Parse 1400-01-01 and 9999-12-31 timestamps.
+    result = self.client.execute(
+        "select cast('1400/01/03' as timestamp format 'IYYY/IW/ID'), "
+        "       cast('9999/52/05' as timestamp format 'IYYY/IW/ID')")
+    assert result.data == ["1400-01-01 00:00:00\t9999-12-31 00:00:00"]
+
+    # Parse out-of-range timestamps.
+    # - Tuesday of the 1st week of year 1400 is 1399-12-31, which is out of the valid
+    # timestamp range.
+    # - Year 9999 has 52 weeks. 9999-12-31 is Friday.
+    result = self.client.execute(
+        "select cast('1400/01/02' as timestamp format 'IYYY/IW/ID'), "
+        "       cast('9999/52/06' as timestamp format 'IYYY/IW/ID'), "
+        "       cast('9999/53/01' as timestamp format 'IYYY/IW/ID')")
+    assert result.data == ["NULL\tNULL\tNULL"]
+
+    # Formatting dates arond Dec 31.
+    # 2019-12-31 is Tuesday, belongs to 1st week of year 2020.
+    # 2020-12-31 is Thursday, belongs to 53rd week of year 2020.
+    result = self.client.execute(
+        "select cast(date'2019-12-29' as string format 'IYYY/IW/ID'), "
+        "       cast(date'2019-12-30' as string format 'IYYY/IW/ID'), "
+        "       cast(date'2019-12-31' as string format 'IYYY/IW/ID'), "
+        "       cast(date'2020-01-01' as string format 'IYYY/IW/ID'), "
+        "       cast(date'2020-12-31' as string format 'IYYY/IW/ID'), "
+        "       cast(date'2021-01-01' as string format 'IYYY/IW/ID')")
+    assert result.data == [
+        "2019/52/07\t2020/01/01\t2020/01/02\t2020/01/03\t2020/53/04\t2020/53/05"]
+
+    # Parsing dates around Dec 31.
+    result = self.client.execute(
+        "select cast('2019/52/07' as date format 'IYYY/IW/ID'), "
+        "       cast('2020/01/01' as date format 'IYYY/IW/ID'), "
+        "       cast('2020/01/02' as date format 'IYYY/IW/ID'), "
+        "       cast('2020/01/03' as date format 'IYYY/IW/ID'), "
+        "       cast('2020/53/04' as date format 'IYYY/IW/ID'), "
+        "       cast('2020/53/05' as date format 'IYYY/IW/ID')")
+    assert result.data == [
+        "2019-12-29\t2019-12-30\t2019-12-31\t2020-01-01\t2020-12-31\t2021-01-01"]
+
+    err = self.execute_query_expect_failure(self.client,
+        "select cast('2019/53/01' as date format 'IYYY/IW/ID')")
+    assert 'String to Date parse failed. Invalid string val: "2019/53/01"' in str(err)
+
+    # Format 4, 3, 2, 1-digit week numbering year.
+    # 2020-01-01 is Wednesday, belongs to week 1 of year 2020.
+    query_options = dict({'now_string': '2019-01-01 11:11:11'})
+    result = self.execute_query(
+        "select cast(date'2020-01-01' as string format 'IYYY/IW/ID'), "
+        "       cast(date'2020-01-01' as string format 'IYY/IW/ID'), "
+        "       cast(date'2020-01-01' as string format 'IY/IW/ID'), "
+        "       cast(date'2020-01-01' as string format 'I/IW/ID')", query_options)
+    assert result.data == ["2020/01/03\t020/01/03\t20/01/03\t0/01/03"]
+
+    # Parse 4, 3, 2, 1-digit week numbering year.
+    result = self.execute_query(
+        "select cast('2020/01/03' as date format 'IYYY/IW/ID'), "
+        "       cast('020/01/03' as date format 'IYYY/IW/ID'), "
+        "       cast('20/01/03' as date format 'IYYY/IW/ID'), "
+        "       cast('0/01/03' as date format 'IYYY/IW/ID'), "
+        "       cast('020/01/03' as date format 'IYY/IW/ID'), "
+        "       cast('20/01/03' as date format 'IYY/IW/ID'), "
+        "       cast('0/01/03' as date format 'IYY/IW/ID'), "
+        "       cast('20/01/03' as date format 'IY/IW/ID'), "
+        "       cast('0/01/03' as date format 'IY/IW/ID'), "
+        "       cast('0/01/03' as date format 'I/IW/ID')", query_options)
+    assert result.data == ['2020-01-01\t2020-01-01\t2020-01-01\t2010-01-06\t'
+                           '2020-01-01\t2020-01-01\t2010-01-06\t'
+                           '2020-01-01\t2010-01-06\t'
+                           '2010-01-06']
+
+    # 2000-01-01 is Saturday, so it belongs to the 1999 ISO 8601 week-numbering year.
+    # Test that 1999 is used for prefixing 3, 2, 1-digit week numbering year.
+    query_options = dict({'now_string': '2000-01-01 11:11:11'})
+    result = self.execute_query(
+        "select cast('2005/01/01' as date format 'IYYY/IW/ID'), "
+        "       cast('005/01/01' as date format 'IYYY/IW/ID'), "
+        "       cast('05/01/01' as date format 'IYYY/IW/ID'), "
+        "       cast('5/01/01' as date format 'IYYY/IW/ID'), "
+        "       cast('05/01/01' as date format 'IY/IW/ID'), "
+        "       cast('5/01/01' as date format 'IY/IW/ID'), "
+        "       cast('5/01/01' as date format 'I/IW/ID')", query_options)
+    assert result.data == ['2005-01-03\t1004-12-31\t1905-01-02\t1995-01-02\t'
+                           '1905-01-02\t1995-01-02\t'
+                           '1995-01-02']
+
+    # Parse 1-digit week of year and 1-digit week day.
+    result = self.client.execute(
+        "select cast('2020/53/4' as date format 'IYYY/IW/ID'), "
+        "       cast('2020/1/3' as date format 'IYYY/IW/ID')")
+    assert result.data == ["2020-12-31\t2020-01-01"]
+
+    # Parse dayname with week-based tokens
+    result = self.client.execute(
+        "select cast('2020/wed/1' as date format 'IYYY/DY/IW'), "
+        "       cast('2020/wed1' as date format 'iyyy/dyiw'), "
+        "       cast('2020wed1' as date format 'IYYYDYIW'), "
+        "       cast('2020WEd1' as date format 'iyyydyiw'), "
+        "       cast('2020/wednesday/1' as date format 'IYYY/DAY/IW'), "
+        "       cast('2020/wednesday1' as date format 'iyyy/dayiw'), "
+        "       cast('2020wednesday1' as date format 'IYYYDAYIW'), "
+        "       cast('2020wEdnESday1' as date format 'iyyydayiw')")
+    assert result.data == ["2020-01-01\t2020-01-01\t2020-01-01\t2020-01-01\t"
+                           "2020-01-01\t2020-01-01\t2020-01-01\t2020-01-01"]
+
   def test_fm_fx_modifiers(self):
     # Exact mathcing for the whole format.
     result = self.client.execute("select cast('2001-03-01 03:10:15.123456 -01:30' as "
@@ -1456,6 +1656,32 @@ class TestCastWithFormat(ImpalaTestSuite):
         "format 'FXYYYY-MM-DD HH12:MI:SS.FF')")
     assert result.data == ["NULL"]
 
+    # Strict week-based token length matching.
+    result = self.client.execute(
+        "select cast('2015/3/05' as timestamp format 'FXIYYY/IW/ID'), "
+        "       cast('2015/03/5' as timestamp format 'FXIYYY/IW/ID'), "
+        "       cast('015/03/05' as timestamp format 'FXIYYY/IW/ID'), "
+        "       cast('15/03/05' as timestamp format 'FXIYYY/IW/ID'), "
+        "       cast('5/03/05' as timestamp format 'FXIYYY/IW/ID')")
+    assert result.data == ["NULL\tNULL\tNULL\tNULL\tNULL"]
+    err = self.execute_query_expect_failure(self.client,
+        "select cast('2015/3/05' as date format 'FXIYYY/IW/ID')")
+    assert 'String to Date parse failed. Invalid string val: "2015/3/05"' in str(err)
+
+    query_options = dict({'now_string': '2019-01-01 11:11:11'})
+    result = self.execute_query(
+        "select cast('2015/03/05' as timestamp format 'FXIYYY/IW/ID'), "
+        "       cast('015/03/05' as timestamp format 'FXIYY/IW/ID'), "
+        "       cast('15/03/05' as timestamp format 'FXIY/IW/ID'), "
+        "       cast('5/03/05' as timestamp format 'FXI/IW/ID'), "
+        "       cast('2015/03/05' as date format 'FXIYYY/IW/ID'), "
+        "       cast('015/03/05' as date format 'FXIYY/IW/ID'), "
+        "       cast('15/03/05' as date format 'FXIY/IW/ID'), "
+        "       cast('5/03/05' as date format 'FXI/IW/ID')", query_options)
+    assert result.data == ["2015-01-16 00:00:00\t2015-01-16 00:00:00\t"
+        "2015-01-16 00:00:00\t2015-01-16 00:00:00\t"
+        "2015-01-16\t2015-01-16\t2015-01-16\t2015-01-16"]
+
     # Strict token length matching with text token containing escaped double quote.
     result = self.client.execute(r'''select cast('2001-03-09 some "text03:25:00' '''
         r'''as timestamp format "FXYYYY-MM-DD \"some \\\"text\"HH12:MI:SS")''')
@@ -1514,6 +1740,11 @@ class TestCastWithFormat(ImpalaTestSuite):
         "timestamp) as string format 'FXYYYY-MM-DD HH24:MI:SS.FF7')")
     assert result.data == ["2001-03-05 03:10:15.1234560"]
 
+    result = self.client.execute(
+        "select cast(date'0001-01-10' as string format 'FXIYYY-IW-ID'), "
+        "       cast(date'0001-10-10' as string format 'FXIYYY-IW-ID')")
+    assert result.data == ["0001-02-03\t0001-41-03"]
+
     # Datetime to string path: Tokens with FM modifier don't pad output to a given
     # length.
     result = self.client.execute("select cast(cast('2001-03-14 03:06:08' as timestamp) "
@@ -1528,6 +1759,11 @@ class TestCastWithFormat(ImpalaTestSuite):
         "'FMYY-FMMM-FMDD')")
     assert result.data == ["1-3-10"]
 
+    result = self.client.execute(
+        "select cast(date'0001-01-10' as string format 'FMIYYY-FMIW-FMID'), "
+        "       cast(date'0001-10-10' as string format 'FMIYYY-FMIW-FMID')")
+    assert result.data == ["1-2-3\t1-41-3"]
+
     # Datetime to string path: FM modifier is effective even if FX modifier is also
     # given.
     result = self.client.execute("select cast(cast('2001-03-15 03:06:08' as "
@@ -1542,11 +1778,17 @@ class TestCastWithFormat(ImpalaTestSuite):
         "as string format 'FXFMYYYY-FMMM-FMDD')")
     assert result.data == ["1-4-10"]
 
+    result = self.client.execute(
+        "select cast(date'0001-01-10' as string format 'FXFMIYYY-FMIW-FMID'), "
+        "       cast(date'0001-10-10' as string format 'FXFMIYYY-FMIW-FMID')")
+    assert result.data == ["1-2-3\t1-41-3"]
+
     # FX and FM modifiers are case-insensitive.
     result = self.client.execute("select cast('2019-5-10' as date format "
         "'fxYYYY-fmMM-DD')")
     assert result.data == ["2019-05-10"]
 
+
   def test_quarter(self):
     result = self.client.execute("select cast(date'2001-01-01' as string "
         "FORMAT 'YYYY Q MM')")
@@ -1636,6 +1878,19 @@ class TestCastWithFormat(ImpalaTestSuite):
         "select cast('2017-05-01' as timestamp format 'YYYY-MONTH-DD-MON')")
     assert "Multiple month tokens provided" in str(err)
 
+    # Conflict between DAY, DY and ID tokens.
+    err = self.execute_query_expect_failure(self.client,
+        "select cast('2017-05-01-Monday' as timestamp format 'IYYY-IW-ID-DAY')")
+    assert "Multiple day of week tokens provided" in str(err)
+
+    err = self.execute_query_expect_failure(self.client,
+        "select cast('2017-05-01-Mon' as timestamp format 'IYYY-IW-ID-DY')")
+    assert "Multiple day of week tokens provided" in str(err)
+
+    err = self.execute_query_expect_failure(self.client,
+        "select cast('2017-05-Monday-Mon' as timestamp format 'IYYY-IW-DAY-DY')")
+    assert "Multiple day of week tokens provided" in str(err)
+
     # Week of year token not allowed in a string to datetime conversion.
     err = self.execute_query_expect_failure(self.client,
         "select cast('2017-1-01' as timestamp format 'YYYY-WW-DD')")
@@ -1674,8 +1929,8 @@ class TestCastWithFormat(ImpalaTestSuite):
     # Day name token not allowed in a string to datetime conversion.
     err = self.execute_query_expect_failure(self.client,
         "select cast('2017-1-02 Monday' as timestamp format 'YYYY-DD-MM DAY')")
-    assert "Day name token is not allowed in a string to datetime conversion" in \
-        str(err)
+    assert "Day name token is not allowed in a string to datetime conversion except " \
+        "with IYYY|IYY|IY|I and IW tokens" in str(err)
 
     # Conflict between hour tokens
     err = self.execute_query_expect_failure(self.client,
@@ -1764,23 +2019,77 @@ class TestCastWithFormat(ImpalaTestSuite):
     # Multiple fraction second token conflict
     err = self.execute_query_expect_failure(self.client,
         "select cast('2018-10-10' as timestamp format 'FF FF1')")
-    assert "Multiple fractional second token provided." in str(err)
+    assert "Multiple fractional second tokens provided." in str(err)
 
     err = self.execute_query_expect_failure(self.client,
         "select cast('2018-10-10' as timestamp format 'FF2 FF3')")
-    assert "Multiple fractional second token provided." in str(err)
+    assert "Multiple fractional second tokens provided." in str(err)
 
     err = self.execute_query_expect_failure(self.client,
         "select cast('2018-10-10' as timestamp format 'FF4 FF5')")
-    assert "Multiple fractional second token provided." in str(err)
+    assert "Multiple fractional second tokens provided." in str(err)
 
     err = self.execute_query_expect_failure(self.client,
         "select cast('2018-10-10' as timestamp format 'FF6 FF7')")
-    assert "Multiple fractional second token provided." in str(err)
+    assert "Multiple fractional second tokens provided." in str(err)
 
     err = self.execute_query_expect_failure(self.client,
         "select cast('2018-10-10' as timestamp format 'FF8 FF9')")
-    assert "Multiple fractional second token provided." in str(err)
+    assert "Multiple fractional second tokens provided." in str(err)
+
+    # ISO 8601 Week-based and normal date pattern tokens must not be mixed.
+    err = self.execute_query_expect_failure(self.client,
+        "select cast('2018-10-01' as date format 'IYYY-MM-ID')")
+    assert "ISO 8601 week-based date tokens (i.e. IYYY, IW, ID) are not allowed to be " \
+           "used with regular date tokens." in str(err)
+    err = self.execute_query_expect_failure(self.client,
+        "select cast('2018-10-01 01:00' as timestamp format 'IYYY-MM-ID HH24:MI')")
+    assert "ISO 8601 week-based date tokens (i.e. IYYY, IW, ID) are not allowed to be " \
+           "used with regular date tokens." in str(err)
+
+    err = self.execute_query_expect_failure(self.client,
+        "select cast('2018-10-01' as date format 'YYYY-IW-DD')")
+    assert "ISO 8601 week-based date tokens (i.e. IYYY, IW, ID) are not allowed to be " \
+           "used with regular date tokens." in str(err)
+    err = self.execute_query_expect_failure(self.client,
+        "select cast('2018-10-01' as timestamp format 'IYYY-IW-DD')")
+    assert "ISO 8601 week-based date tokens (i.e. IYYY, IW, ID) are not allowed to be " \
+           "used with regular date tokens." in str(err)
+
+    # Missing ISO 8601 week-based pattern tokens.
+    err = self.execute_query_expect_failure(self.client,
+        "select cast('2018-10' as date format 'IYYY-IW')")
+    assert "One or more required ISO 8601 week-based date tokens (i.e. IYYY, IW, ID) " \
+           "are missing." in str(err)
+    err = self.execute_query_expect_failure(self.client,
+        "select cast('2018-10 01:00' as timestamp format 'IYYY-IW HH24:MI')")
+    assert "One or more required ISO 8601 week-based date tokens (i.e. IYYY, IW, ID) " \
+           "are missing." in str(err)
+
+    err = self.execute_query_expect_failure(self.client,
+        "select cast('18-07' as date format 'IY-ID')")
+    assert "One or more required ISO 8601 week-based date tokens (i.e. IYYY, IW, ID) " \
+           "are missing." in str(err)
+    err = self.execute_query_expect_failure(self.client,
+        "select cast('18-07 01:00' as timestamp format 'IY-ID HH24:MI')")
+    assert "One or more required ISO 8601 week-based date tokens (i.e. IYYY, IW, ID) " \
+           "are missing." in str(err)
+
+    # ISO 8601 Week numbering year conflict
+    err = self.execute_query_expect_failure(self.client,
+        "select cast('2018-018-10-01' as date format 'IYYY-IYY-IW-DD')")
+    assert "Multiple year tokens provided" in str(err)
+    err = self.execute_query_expect_failure(self.client,
+        "select cast('2018-018-10-01 01:00' as timestamp format "
+        "'IYYY-IYY-IW-DD HH24:MI')")
+    assert "Multiple year tokens provided" in str(err)
+
+    err = self.execute_query_expect_failure(self.client,
+        "select cast('018-8-10-01' as date format 'IYY-I-IW-DD')")
+    assert "Multiple year tokens provided" in str(err)
+    err = self.execute_query_expect_failure(self.client,
+        "select cast('018-8-10-01 01:00' as timestamp format 'IYY-I-IW-DD HH24:MI')")
+    assert "Multiple year tokens provided" in str(err)
 
     # Verify that conflict check is not skipped when format ends with separators.
     err = self.execute_query_expect_failure(self.client,


[impala] 02/04: IMPALA-9092: Add support for creating external Kudu table

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6ebea33a9d249fc0097746c21f04d977fdeaa13c
Author: Vihang Karajgaonkar <vi...@apache.org>
AuthorDate: Tue Nov 12 10:22:50 2019 -0800

    IMPALA-9092: Add support for creating external Kudu table
    
    In HMS-3 the translation layer converts a managed kudu table into an
    external kudu table and adds additional table property
    'external.table.purge' to 'true'. This means any installation which
    is using HMS-3 (or a Hive version which has HIVE-22158) will always
    create Kudu tables as external tables. This is problematic since the
    output of show create table will now be different and may confuse
    the users.
    
    In order to improve the user experience of such synchronized tables
    (external tables with external.table.purge property set to true),
    this patch adds support in Impala to create
    external Kudu tables. Previous versions of Impala disallowed
    creating a external Kudu table if the Kudu table did not exist.
    After this patch, Impala will check if the Kudu table exists and if
    it does not it will create a Kudu table based on the schema provided
    in the create table statement. The command will error out if the Kudu
    table already exists. However, this applies to only the synchronized
    tables. Previous way to create a pure external table behaves the
    same.
    
    Following syntax of creating a synchronized table is now allowed:
    
    CREATE EXTERNAL TABLE foo (
      id int PRIMARY KEY,
      name string)
    PARTITION BY HASH PARTITIONS 8
    STORED AS KUDU
    TBLPROPERTIES ('external.table.purge'='true')
    
    The syntax is very similar to creating a managed table, except for
    the EXTERNAL keyword and additional table property. A synchronized
    table will behave similar to managed Kudu tables (drops and renames
    are allowed). The output of show create table on a synchronized
    table will display the full column and partition spec similar to the
    managed tables.
    
    Testing:
    1. After the CDP version bump all of the existing Kudu tables now
    create synchronized tables so there is good coverage there.
    2. Added additional tests which create synchronized tables and
    compares the show create table output.
    3. Ran exhaustive tests with both CDP and CDH builds.
    
    Change-Id: I76f81d41db0cf2269ee1b365857164a43677e14d
    Reviewed-on: http://gerrit.cloudera.org:8080/14750
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../analysis/AlterTableSetTblProperties.java       |   7 +-
 .../apache/impala/analysis/CreateTableStmt.java    |  59 +++--
 .../org/apache/impala/analysis/ToSqlUtils.java     |  13 +-
 .../java/org/apache/impala/catalog/KuduTable.java  |  15 +-
 .../main/java/org/apache/impala/catalog/Table.java |  23 ++
 .../apache/impala/service/CatalogOpExecutor.java   |  24 +-
 .../impala/service/KuduCatalogOpExecutor.java      |  16 +-
 .../apache/impala/analysis/AnalyzeKuduDDLTest.java | 283 +++++++++++++--------
 .../org/apache/impala/catalog/CatalogTest.java     |  45 +++-
 .../impala/catalog/local/LocalCatalogTest.java     |  35 ++-
 .../org/apache/impala/common/FrontendFixture.java  |   2 +-
 .../queries/QueryTest/show-create-table.test       | 188 ++++++++++++--
 tests/common/skip.py                               |   6 +-
 tests/metadata/test_show_create_table.py           |  18 +-
 tests/query_test/test_kudu.py                      | 211 ++++++++++++++-
 15 files changed, 714 insertions(+), 231 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
index 6f7431c..dce7fae 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
@@ -118,18 +118,13 @@ public class AlterTableSetTblProperties extends AlterTableSetStmt {
     // TODO IMPALA-6375: Allow setting kudu.table_name for synchronized Kudu tables
     if (KuduTable.isSynchronizedTable(table_.getMetaStoreTable())) {
       AnalysisUtils.throwIfNotNull(tblProperties_.get(KuduTable.KEY_TABLE_NAME),
-          String.format("Not allowed to set '%s' manually for managed Kudu tables .",
+          String.format("Not allowed to set '%s' manually for synchronized Kudu tables .",
               KuduTable.KEY_TABLE_NAME));
     }
     // Throw error if kudu.table_id is provided for Kudu tables.
     AnalysisUtils.throwIfNotNull(tblProperties_.get(KuduTable.KEY_TABLE_ID),
         String.format("Property '%s' cannot be altered for Kudu tables",
             KuduTable.KEY_TABLE_ID));
-    // Throw error if 'external.table.purge' is manually set.
-    AnalysisUtils.throwIfNotNull(
-        tblProperties_.get(KuduTable.TBL_PROP_EXTERNAL_TABLE_PURGE),
-        String.format("Property '%s' cannot be altered for Kudu tables",
-            KuduTable.TBL_PROP_EXTERNAL_TABLE_PURGE));
     AuthorizationConfig authzConfig = analyzer.getAuthzConfig();
     if (authzConfig.isEnabled()) {
       // Checking for 'EXTERNAL' is case-insensitive, see IMPALA-5637.
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index 7e4d33d..4218248 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.impala.authorization.AuthorizationConfig;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.RowFormat;
+import org.apache.impala.catalog.Table;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.RuntimeEnv;
@@ -286,10 +287,14 @@ public class CreateTableStmt extends StatementBase {
     }
 
     analyzeKuduTableProperties(analyzer);
-    if (isExternal()) {
+    if (isExternal() && !Boolean.parseBoolean(getTblProperties().get(
+        Table.TBL_PROP_EXTERNAL_TABLE_PURGE))) {
+      // this is an external table
       analyzeExternalKuduTableParams(analyzer);
     } else {
-      analyzeManagedKuduTableParams(analyzer);
+      // this is either a managed table or an external table with external.table.purge
+      // property set to true
+      analyzeSynchronizedKuduTableParams(analyzer);
     }
   }
 
@@ -323,7 +328,7 @@ public class CreateTableStmt extends StatementBase {
     putGeneratedKuduProperty(KuduTable.KEY_STORAGE_HANDLER,
         KuduTable.KUDU_STORAGE_HANDLER);
 
-    String kuduMasters = populateKuduMasters(analyzer);
+    String kuduMasters = getKuduMasters(analyzer);
     if (kuduMasters.isEmpty()) {
       throw new AnalysisException(String.format(
           "Table property '%s' is required when the impalad startup flag " +
@@ -348,7 +353,7 @@ public class CreateTableStmt extends StatementBase {
    *  Populates Kudu master addresses either from table property or
    *  the -kudu_master_hosts flag.
    */
-  private String populateKuduMasters(Analyzer analyzer) {
+  private String getKuduMasters(Analyzer analyzer) {
     String kuduMasters = getTblProperties().get(KuduTable.KEY_MASTER_HOSTS);
     if (Strings.isNullOrEmpty(kuduMasters)) {
       kuduMasters = analyzer.getCatalog().getDefaultKuduMasterHosts();
@@ -361,6 +366,9 @@ public class CreateTableStmt extends StatementBase {
    */
   private void analyzeExternalKuduTableParams(Analyzer analyzer)
       throws AnalysisException {
+    Preconditions.checkState(!Boolean
+        .parseBoolean(getTblProperties().get(KuduTable.TBL_PROP_EXTERNAL_TABLE_PURGE)));
+    // this is just a regular external table. Kudu table name must be specified
     AnalysisUtils.throwIfNull(getTblProperties().get(KuduTable.KEY_TABLE_NAME),
         String.format("Table property %s must be specified when creating " +
             "an external Kudu table.", KuduTable.KEY_TABLE_NAME));
@@ -372,13 +380,6 @@ public class CreateTableStmt extends StatementBase {
     AnalysisUtils.throwIfNotNull(getTblProperties().get(KuduTable.KEY_TABLET_REPLICAS),
         String.format("Table property '%s' cannot be used with an external Kudu table.",
             KuduTable.KEY_TABLET_REPLICAS));
-    // External table cannot have 'external.table.purge' property set, which is considered
-    // equivalent to managed table.
-    if (Boolean.parseBoolean(
-            getTblProperties().get(KuduTable.TBL_PROP_EXTERNAL_TABLE_PURGE))) {
-      throw new AnalysisException(String.format("Table property '%s' cannot be set to " +
-          "true with an external Kudu table.", KuduTable.TBL_PROP_EXTERNAL_TABLE_PURGE));
-    }
     AnalysisUtils.throwIfNotEmpty(getColumnDefs(),
         "Columns cannot be specified with an external Kudu table.");
     AnalysisUtils.throwIfNotEmpty(getKuduPartitionParams(),
@@ -386,10 +387,17 @@ public class CreateTableStmt extends StatementBase {
   }
 
   /**
-   * Analyzes and checks parameters specified for managed Kudu tables.
+   * Analyzes and checks parameters specified for synchronized Kudu tables.
    */
-  private void analyzeManagedKuduTableParams(Analyzer analyzer) throws AnalysisException {
-    analyzeManagedKuduTableName(analyzer);
+  private void analyzeSynchronizedKuduTableParams(Analyzer analyzer)
+      throws AnalysisException {
+    // A managed table cannot have 'external.table.purge' property set
+    if (!isExternal() && Boolean.parseBoolean(
+        getTblProperties().get(KuduTable.TBL_PROP_EXTERNAL_TABLE_PURGE))) {
+      throw new AnalysisException(String.format("Table property '%s' cannot be set to " +
+          "true with an managed Kudu table.", KuduTable.TBL_PROP_EXTERNAL_TABLE_PURGE));
+    }
+    analyzeSynchronizedKuduTableName(analyzer);
 
     // Check column types are valid Kudu types
     for (ColumnDef col: getColumnDefs()) {
@@ -418,13 +426,7 @@ public class CreateTableStmt extends StatementBase {
             "zero. Given number of replicas is: " + r.toString());
       }
     }
-
-    if (!getKuduPartitionParams().isEmpty()) {
-      analyzeKuduPartitionParams(analyzer);
-    } else {
-      analyzer.addWarning(
-          "Unpartitioned Kudu tables are inefficient for large data sizes.");
-    }
+    analyzeKuduPartitionParams(analyzer);
   }
 
   /**
@@ -432,11 +434,12 @@ public class CreateTableStmt extends StatementBase {
    * it in TableDef.generatedKuduTableName_. Throws if the Kudu table
    * name was given manually via TBLPROPERTIES.
    */
-  private void analyzeManagedKuduTableName(Analyzer analyzer) throws AnalysisException {
+  private void analyzeSynchronizedKuduTableName(Analyzer analyzer)
+      throws AnalysisException {
     AnalysisUtils.throwIfNotNull(getTblProperties().get(KuduTable.KEY_TABLE_NAME),
-        String.format("Not allowed to set '%s' manually for managed Kudu tables .",
+        String.format("Not allowed to set '%s' manually for synchronized Kudu tables.",
             KuduTable.KEY_TABLE_NAME));
-    String kuduMasters = populateKuduMasters(analyzer);
+    String kuduMasters = getKuduMasters(analyzer);
     boolean isHMSIntegrationEnabled;
     try {
       // Check if Kudu's integration with the Hive Metastore is enabled. Validation
@@ -452,10 +455,16 @@ public class CreateTableStmt extends StatementBase {
   }
 
   /**
-   * Analyzes the partitioning schemes specified in the CREATE TABLE statement.
+   * Analyzes the partitioning schemes specified in the CREATE TABLE statement. Also,
+   * adds primary keys to the partitioning scheme if no partitioning keys are provided
    */
   private void analyzeKuduPartitionParams(Analyzer analyzer) throws AnalysisException {
     Preconditions.checkState(getFileFormat() == THdfsFileFormat.KUDU);
+    if (getKuduPartitionParams().isEmpty()) {
+      analyzer.addWarning(
+          "Unpartitioned Kudu tables are inefficient for large data sizes.");
+      return;
+    }
     Map<String, ColumnDef> pkColDefsByName =
         ColumnDef.mapByColumnNames(getPrimaryKeyColumnDefs());
     for (KuduPartitionParam partitionParam: getKuduPartitionParams()) {
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index 2df6eb8..5b2dedf 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -72,7 +72,7 @@ public class ToSqlUtils {
   @VisibleForTesting
   protected static final ImmutableSet<String> HIDDEN_TABLE_PROPERTIES = ImmutableSet.of(
       "EXTERNAL", "comment", AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS,
-      AlterTableSortByStmt.TBL_PROP_SORT_ORDER);
+      AlterTableSortByStmt.TBL_PROP_SORT_ORDER, "TRANSLATED_TO_EXTERNAL");
 
   /**
    * Removes all hidden properties from the given 'tblProperties' map.
@@ -311,8 +311,8 @@ public class ToSqlUtils {
     if (properties.containsKey(Table.TBL_PROP_LAST_DDL_TIME)) {
       properties.remove(Table.TBL_PROP_LAST_DDL_TIME);
     }
-    boolean isExternal = msTable.getTableType() != null &&
-        msTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString());
+    boolean isExternal = Table.isExternalTable(msTable);
+
     List<String> sortColsSql = getSortColumns(properties);
     TSortingOrder sortingOrder = TSortingOrder.valueOf(getSortingOrder(properties));
     String comment = properties.get("comment");
@@ -346,7 +346,7 @@ public class ToSqlUtils {
       storageHandlerClassName = null;
       properties.remove(KuduTable.KEY_STORAGE_HANDLER);
       String kuduTableName = properties.get(KuduTable.KEY_TABLE_NAME);
-      // Remove the hidden table property 'kudu.table_name' for a managed Kudu table.
+      // Remove the hidden table property 'kudu.table_name' for a synchronized Kudu table.
       if (kuduTableName != null &&
           KuduUtil.isDefaultKuduTableName(kuduTableName,
               table.getDb().getName(), table.getName())) {
@@ -357,7 +357,7 @@ public class ToSqlUtils {
       // Internal property, should not be exposed to the user.
       properties.remove(StatsSetupConst.DO_NOT_UPDATE_STATS);
 
-      if (!isExternal) {
+      if (KuduTable.isSynchronizedTable(msTable)) {
         primaryKeySql.addAll(kuduTable.getPrimaryKeyColumnNames());
 
         List<String> paramsSql = new ArrayList<>();
@@ -366,7 +366,8 @@ public class ToSqlUtils {
         }
         kuduPartitionByParams = Joiner.on(", ").join(paramsSql);
       } else {
-        // We shouldn't output the columns for external tables
+        // we don't output the column spec if this is not a synchronized table (not
+        // managed and not external.purge table)
         colsSql = null;
       }
     } else if (table instanceof FeFsTable) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index 4ef5d95..6bdcf17 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -143,8 +143,7 @@ public class KuduTable extends Table implements FeKuduTable {
     // HIVE-22158: A translated table can have external purge property set to true
     // in such case we sync operations in Impala and Kudu
     // it is possible that in older versions of HMS a managed Kudu table is present
-    return isManagedTable(msTbl) || (isExternalTable(msTbl) && Boolean
-        .parseBoolean(msTbl.getParameters().get(TBL_PROP_EXTERNAL_TABLE_PURGE)));
+    return isManagedTable(msTbl) || isExternalPurgeTable(msTbl);
   }
 
   /**
@@ -155,18 +154,6 @@ public class KuduTable extends Table implements FeKuduTable {
     return msTbl.getTableType().equalsIgnoreCase(TableType.MANAGED_TABLE.toString());
   }
 
-  /**
-   * Returns if the given HMS table is external table or not based on table type or table
-   * properties. Implementation is based on org.apache.hadoop.hive.metastore.utils
-   * .MetaStoreUtils.isExternalTable()
-   */
-  public static boolean isExternalTable(
-      org.apache.hadoop.hive.metastore.api.Table msTbl) {
-    // HIVE-19253: table property can also indicate an external table.
-    return (msTbl.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString()) ||
-        ("TRUE").equalsIgnoreCase(msTbl.getParameters().get(TBL_PROP_EXTERNAL_TABLE)));
-  }
-
   @Override
   public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.TABLE; }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index ee6e92a..ed3b5ad 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -195,6 +195,29 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
     initMetrics();
   }
 
+  /**
+   * Returns if the given HMS table is an external table (uses table type if
+   * available or else uses table properties). Implementation is based on org.apache
+   * .hadoop.hive.metastore.utils.MetaStoreUtils.isExternalTable()
+   */
+  public static boolean isExternalTable(
+      org.apache.hadoop.hive.metastore.api.Table msTbl) {
+    // HIVE-19253: table property can also indicate an external table.
+    return (msTbl.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString()) ||
+        ("TRUE").equalsIgnoreCase(msTbl.getParameters().get(TBL_PROP_EXTERNAL_TABLE)));
+  }
+
+  /**
+   * In certain versions of Hive (See HIVE-22158) HMS translates a managed table to a
+   * external and sets additional property of "external.table.purge" = true. This
+   * method can be used to identify such translated tables.
+   */
+  public static boolean isExternalPurgeTable(
+      org.apache.hadoop.hive.metastore.api.Table msTbl) {
+    return isExternalTable(msTbl) && Boolean
+        .parseBoolean(msTbl.getParameters().get(TBL_PROP_EXTERNAL_TABLE_PURGE));
+  }
+
   public ReentrantLock getLock() { return tableLock_; }
   @Override
   public abstract TTableDescriptor toThriftDescriptor(
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 5689a0d..f177e65 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -2209,7 +2209,8 @@ public class CatalogOpExecutor {
    * tables should be treated as managed (synchronized) tables to keep the user facing
    * behavior consistent.
    *
-   * For managed tables:
+   * For synchronized tables (managed or external tables with external.table.purge=true
+   * in tblproperties):
    *  1. If Kudu's integration with the Hive Metastore is not enabled, the Kudu
    *     table is first created in Kudu, then in the HMS.
    *  2. Otherwise, when the table is created in Kudu, we rely on Kudu to have
@@ -2226,20 +2227,23 @@ public class CatalogOpExecutor {
   private boolean createKuduTable(org.apache.hadoop.hive.metastore.api.Table newTable,
       TCreateTableParams params, TDdlExecResponse response) throws ImpalaException {
     Preconditions.checkState(KuduTable.isKuduTable(newTable));
-    if (KuduTable.isExternalTable(newTable)) {
+    boolean createHMSTable;
+    if (!KuduTable.isSynchronizedTable(newTable)) {
+      // if this is not a synchronized table, we assume that the table must be existing
+      // in kudu and use the column spec from Kudu
       KuduCatalogOpExecutor.populateExternalTableColsFromKudu(newTable);
+      createHMSTable = true;
     } else {
-      KuduCatalogOpExecutor.createManagedTable(newTable, params);
+      // if this is a synchronized table (managed or external.purge table) then we
+      // create it in Kudu first
+      KuduCatalogOpExecutor.createSynchronizedTable(newTable, params);
+      createHMSTable = !isKuduHmsIntegrationEnabled(newTable);
     }
-    // When Kudu's integration with the Hive Metastore is enabled, Kudu will create
-    // the HMS table for managed tables.
-    boolean createsHMSTable = KuduTable.isExternalTable(newTable) ?
-        true : !isKuduHmsIntegrationEnabled(newTable);
     try {
       // Add the table to the HMS and the catalog cache. Acquire metastoreDdlLock_ to
       // ensure the atomicity of these operations.
       synchronized (metastoreDdlLock_) {
-        if (createsHMSTable) {
+        if (createHMSTable) {
           try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
             msClient.getHiveClient().createTable(newTable);
           }
@@ -2251,8 +2255,8 @@ public class CatalogOpExecutor {
       }
     } catch (Exception e) {
       try {
-        // Error creating the table in HMS, drop the managed table from Kudu.
-        if (!KuduTable.isExternalTable(newTable)) {
+        // Error creating the table in HMS, drop the synchronized table from Kudu.
+        if (!KuduTable.isSynchronizedTable(newTable)) {
           KuduCatalogOpExecutor.dropTable(newTable, false);
         }
       } catch (Exception logged) {
diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
index a04e342..eef1e5f 100644
--- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.KuduTable;
+import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.TableNotFoundException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.ImpalaRuntimeException;
@@ -70,9 +71,9 @@ public class KuduCatalogOpExecutor {
    * Throws an exception if 'msTbl' represents an external table or if the table couldn't
    * be created in Kudu.
    */
-  static void createManagedTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
+  static void createSynchronizedTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
       TCreateTableParams params) throws ImpalaRuntimeException {
-    Preconditions.checkState(!KuduTable.isExternalTable(msTbl));
+    Preconditions.checkState(KuduTable.isSynchronizedTable(msTbl));
     Preconditions.checkState(
         msTbl.getParameters().get(KuduTable.KEY_TABLE_ID) == null);
     String kuduTableName = msTbl.getParameters().get(KuduTable.KEY_TABLE_NAME);
@@ -85,11 +86,16 @@ public class KuduCatalogOpExecutor {
     try {
       // TODO: The IF NOT EXISTS case should be handled by Kudu to ensure atomicity.
       // (see KUDU-1710).
-      if (kudu.tableExists(kuduTableName)) {
-        if (params.if_not_exists) return;
+      boolean tableExists = kudu.tableExists(kuduTableName);
+      if (tableExists && params.if_not_exists) return;
+
+      // if table is managed or external with external.purge.table = true in
+      // tblproperties we should create the Kudu table if it does not exist
+      if (tableExists) {
         throw new ImpalaRuntimeException(String.format(
             "Table '%s' already exists in Kudu.", kuduTableName));
       }
+      Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName));
       Schema schema = createTableSchema(params);
       CreateTableOptions tableOpts = buildTableOptions(msTbl, params, schema);
       org.apache.kudu.client.KuduTable table =
@@ -283,7 +289,7 @@ public class KuduCatalogOpExecutor {
     org.apache.hadoop.hive.metastore.api.Table msTblCopy = msTbl.deepCopy();
     List<FieldSchema> cols = msTblCopy.getSd().getCols();
     // External table should not have table ID.
-    Preconditions.checkState(KuduTable.isExternalTable(msTbl));
+    Preconditions.checkState(Table.isExternalTable(msTbl));
     Preconditions.checkState(
         msTblCopy.getParameters().get(KuduTable.KEY_TABLE_ID) == null);
     String kuduTableName = msTblCopy.getParameters().get(KuduTable.KEY_TABLE_NAME);
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeKuduDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeKuduDDLTest.java
index 8e9b632..288bce3 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeKuduDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeKuduDDLTest.java
@@ -35,212 +35,256 @@ import java.util.List;
  */
 public class AnalyzeKuduDDLTest extends FrontendTestBase {
 
-  @Test
-  public void TestCreateManagedKuduTable() {
+  /**
+   * This is wrapper around super.AnalyzesOk method. The additional boolean is used to
+   * add tblproperties for synchronized table
+   */
+  public ParseNode AnalyzesOk(String stmt, String errorStr, boolean isExternalPurgeTbl) {
+    return super
+        .AnalyzesOk(appendSynchronizedTblProps(stmt, isExternalPurgeTbl), errorStr);
+  }
+
+  /**
+   * Wrapper around super.AnalyzesOk with additional boolean for adding synchronized
+   * table properties
+   */
+  public ParseNode AnalyzesOk(String stmt, boolean isExternalPurgeTbl) {
+    return super.AnalyzesOk(appendSynchronizedTblProps(stmt, isExternalPurgeTbl));
+  }
+
+  private String appendSynchronizedTblProps(String stmt, boolean append) {
+    if (!append) { return stmt; }
+
+    stmt = stmt.replace("create table", "create external table");
+    if (!stmt.contains("tblproperties")) {
+      stmt += " tblproperties ('external.table.purge'='true')";
+    } else {
+      stmt = stmt.replaceAll("tblproperties\\s*\\(",
+          "tblproperties ('external.table.purge'='true', ");
+    }
+    return stmt;
+  }
+
+  public void AnalysisError(String stmt, String expectedError,
+      boolean isExternalPurgeTbl) {
+    super.AnalysisError(appendSynchronizedTblProps(stmt, isExternalPurgeTbl),
+        expectedError);
+  }
+
+  private void testDDlsOnKuduTable(boolean isExternalPurgeTbl) {
     TestUtils.assumeKuduIsSupported();
     // Test primary keys and partition by clauses
     AnalyzesOk("create table tab (x int primary key) partition by hash(x) " +
-        "partitions 8 stored as kudu");
+        "partitions 8 stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tab (x int, primary key(x)) partition by hash(x) " +
-        "partitions 8 stored as kudu");
+        "partitions 8 stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tab (x int, y int, primary key (x, y)) " +
-        "partition by hash(x, y) partitions 8 stored as kudu");
+        "partition by hash(x, y) partitions 8 stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tab (x int, y int, primary key (x)) " +
-        "partition by hash(x) partitions 8 stored as kudu");
+        "partition by hash(x) partitions 8 stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tab (x int, y int, primary key(x, y)) " +
-        "partition by hash(y) partitions 8 stored as kudu");
+        "partition by hash(y) partitions 8 stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tab (x timestamp, y timestamp, primary key(x)) " +
-        "partition by hash(x) partitions 8 stored as kudu");
+        "partition by hash(x) partitions 8 stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tab (x int, y string, primary key (x)) partition by " +
         "hash (x) partitions 3, range (x) (partition values < 1, partition " +
         "1 <= values < 10, partition 10 <= values < 20, partition value = 30) " +
-        "stored as kudu");
+        "stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tab (x int, y int, primary key (x, y)) partition by " +
         "range (x, y) (partition value = (2001, 1), partition value = (2002, 1), " +
-        "partition value = (2003, 2)) stored as kudu");
+        "partition value = (2003, 2)) stored as kudu", isExternalPurgeTbl);
     // Non-literal boundary values in range partitions
     AnalyzesOk("create table tab (x int, y int, primary key (x)) partition by " +
         "range (x) (partition values < 1 + 1, partition (1+3) + 2 < values < 10, " +
         "partition factorial(4) < values < factorial(5), " +
-        "partition value = factorial(6)) stored as kudu");
+        "partition value = factorial(6)) stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tab (x int, y int, primary key(x, y)) partition by " +
         "range(x, y) (partition value = (1+1, 2+2), partition value = ((1+1+1)+1, 10), " +
-        "partition value = (cast (30 as int), factorial(5))) stored as kudu");
+        "partition value = (cast (30 as int), factorial(5))) stored as kudu",
+        isExternalPurgeTbl);
     AnalysisError("create table tab (x int primary key) partition by range (x) " +
         "(partition values < x + 1) stored as kudu", "Only constant values are allowed " +
-        "for range-partition bounds: x + 1");
+        "for range-partition bounds: x + 1", isExternalPurgeTbl);
     AnalysisError("create table tab (x int primary key) partition by range (x) " +
         "(partition values <= isnull(null, null)) stored as kudu", "Range partition " +
         "values cannot be NULL. Range partition: 'PARTITION VALUES <= " +
-        "isnull(NULL, NULL)'");
+        "isnull(NULL, NULL)'", isExternalPurgeTbl);
     AnalysisError("create table tab (x int primary key) partition by range (x) " +
         "(partition values <= (select count(*) from functional.alltypestiny)) " +
         "stored as kudu", "Only constant values are allowed for range-partition " +
-        "bounds: (SELECT count(*) FROM functional.alltypestiny)");
+        "bounds: (SELECT count(*) FROM functional.alltypestiny)", isExternalPurgeTbl);
     // Multilevel partitioning. Data is split into 3 buckets based on 'x' and each
     // bucket is partitioned into 4 tablets based on the range partitions of 'y'.
     AnalyzesOk("create table tab (x int, y string, primary key(x, y)) " +
         "partition by hash(x) partitions 3, range(y) " +
         "(partition values < 'aa', partition 'aa' <= values < 'bb', " +
         "partition 'bb' <= values < 'cc', partition 'cc' <= values) " +
-        "stored as kudu");
+        "stored as kudu", isExternalPurgeTbl);
     // Key column in upper case
     AnalyzesOk("create table tab (x int, y int, primary key (X)) " +
-        "partition by hash (x) partitions 8 stored as kudu");
+        "partition by hash (x) partitions 8 stored as kudu", isExternalPurgeTbl);
     // Flexible Partitioning
     AnalyzesOk("create table tab (a int, b int, c int, d int, primary key (a, b, c))" +
         "partition by hash (a, b) partitions 8, hash(c) partitions 2 stored as " +
-        "kudu");
+        "kudu", isExternalPurgeTbl);
     // No columns specified in the PARTITION BY HASH clause
     AnalyzesOk("create table tab (a int primary key, b int, c int, d int) " +
-        "partition by hash partitions 8 stored as kudu");
+        "partition by hash partitions 8 stored as kudu", isExternalPurgeTbl);
     // Distribute range data types are picked up during analysis and forwarded to Kudu.
     // Column names in distribute params should also be case-insensitive.
     AnalyzesOk("create table tab (a int, b int, c int, d int, primary key(a, b, c, d))" +
         "partition by hash (a, B, c) partitions 8, " +
         "range (A) (partition values < 1, partition 1 <= values < 2, " +
         "partition 2 <= values < 3, partition 3 <= values < 4, partition 4 <= values) " +
-        "stored as kudu");
+        "stored as kudu", isExternalPurgeTbl);
     // Allowing range partitioning on a subset of the primary keys
     AnalyzesOk("create table tab (id int, name string, valf float, vali bigint, " +
         "primary key (id, name)) partition by range (name) " +
-        "(partition 'aa' < values <= 'bb') stored as kudu");
+        "(partition 'aa' < values <= 'bb') stored as kudu", isExternalPurgeTbl);
     // Null values in range partition values
     AnalysisError("create table tab (id int, name string, primary key(id, name)) " +
         "partition by hash (id) partitions 3, range (name) " +
         "(partition value = null, partition value = 1) stored as kudu",
-"Range partition values cannot be NULL. Range partition: 'PARTITION " +
-        "VALUE = NULL'");
+        "Range partition values cannot be NULL. Range partition: 'PARTITION " +
+        "VALUE = NULL'", isExternalPurgeTbl);
     // Primary key specified in tblproperties
     AnalysisError(String.format("create table tab (x int) partition by hash (x) " +
         "partitions 8 stored as kudu tblproperties ('%s' = 'x')",
         KuduTable.KEY_KEY_COLUMNS), "PRIMARY KEY must be used instead of the table " +
-        "property");
+        "property", isExternalPurgeTbl);
     // Primary key column that doesn't exist
     AnalysisError("create table tab (x int, y int, primary key (z)) " +
         "partition by hash (x) partitions 8 stored as kudu",
-        "PRIMARY KEY column 'z' does not exist in the table");
+        "PRIMARY KEY column 'z' does not exist in the table", isExternalPurgeTbl);
     // Invalid composite primary key
     AnalysisError("create table tab (x int primary key, primary key(x)) stored " +
         "as kudu", "Multiple primary keys specified. Composite primary keys can " +
         "be specified using the PRIMARY KEY (col1, col2, ...) syntax at the end " +
-        "of the column definition.");
+        "of the column definition.", isExternalPurgeTbl);
     AnalysisError("create table tab (x int primary key, y int primary key) stored " +
         "as kudu", "Multiple primary keys specified. Composite primary keys can " +
         "be specified using the PRIMARY KEY (col1, col2, ...) syntax at the end " +
-        "of the column definition.");
+        "of the column definition.", isExternalPurgeTbl);
     // Specifying the same primary key column multiple times
     AnalysisError("create table tab (x int, primary key (x, x)) partition by hash (x) " +
         "partitions 8 stored as kudu",
-        "Column 'x' is listed multiple times as a PRIMARY KEY.");
+        "Column 'x' is listed multiple times as a PRIMARY KEY.", isExternalPurgeTbl);
     // Number of range partition boundary values should be equal to the number of range
     // columns.
     AnalysisError("create table tab (a int, b int, c int, d int, primary key(a, b, c)) " +
         "partition by range(a) (partition value = (1, 2), " +
         "partition value = 3, partition value = 4) stored as kudu",
-"Number of specified range partition values is different than the number of " +
-        "partitioning columns: (2 vs 1). Range partition: 'PARTITION VALUE = (1, 2)'");
+        "Number of specified range partition values is different than the number of " +
+        "partitioning columns: (2 vs 1). Range partition: 'PARTITION VALUE = (1, 2)'",
+        isExternalPurgeTbl);
     // Key ranges must match the column types.
     AnalysisError("create table tab (a int, b int, c int, d int, primary key(a, b, c)) " +
         "partition by hash (a, b, c) partitions 8, range (a) " +
         "(partition value = 1, partition value = 'abc', partition 3 <= values) " +
         "stored as kudu", "Range partition value 'abc' (type: STRING) is not type " +
-        "compatible with partitioning column 'a' (type: INT).");
+        "compatible with partitioning column 'a' (type: INT).", isExternalPurgeTbl);
     AnalysisError("create table tab (a tinyint primary key) partition by range (a) " +
         "(partition value = 128) stored as kudu", "Range partition value 128 " +
         "(type: SMALLINT) is not type compatible with partitioning column 'a' " +
-        "(type: TINYINT)");
+        "(type: TINYINT)", isExternalPurgeTbl);
     AnalysisError("create table tab (a smallint primary key) partition by range (a) " +
         "(partition value = 32768) stored as kudu", "Range partition value 32768 " +
         "(type: INT) is not type compatible with partitioning column 'a' " +
-        "(type: SMALLINT)");
+        "(type: SMALLINT)", isExternalPurgeTbl);
     AnalysisError("create table tab (a int primary key) partition by range (a) " +
         "(partition value = 2147483648) stored as kudu", "Range partition value " +
         "2147483648 (type: BIGINT) is not type compatible with partitioning column 'a' " +
-        "(type: INT)");
+        "(type: INT)", isExternalPurgeTbl);
     AnalysisError("create table tab (a bigint primary key) partition by range (a) " +
         "(partition value = 9223372036854775808) stored as kudu", "Range partition " +
         "value 9223372036854775808 (type: DECIMAL(19,0)) is not type compatible with " +
-        "partitioning column 'a' (type: BIGINT)");
+        "partitioning column 'a' (type: BIGINT)", isExternalPurgeTbl);
     // Test implicit casting/folding of partition values.
     AnalyzesOk("create table tab (a int primary key) partition by range (a) " +
-        "(partition value = false, partition value = true) stored as kudu");
+        "(partition value = false, partition value = true) stored as kudu",
+        isExternalPurgeTbl);
     // Non-key column used in PARTITION BY
     AnalysisError("create table tab (a int, b string, c bigint, primary key (a)) " +
         "partition by range (b) (partition value = 'abc') stored as kudu",
-  "Column 'b' in 'RANGE (b) (PARTITION VALUE = 'abc')' is not a key column. " +
-        "Only key columns can be used in PARTITION BY.");
+        "Column 'b' in 'RANGE (b) (PARTITION VALUE = 'abc')' is not a key column. " +
+        "Only key columns can be used in PARTITION BY.", isExternalPurgeTbl);
     // No float range partition values
     AnalysisError("create table tab (a int, b int, c int, d int, primary key (a, b, c))" +
         "partition by hash (a, b, c) partitions 8, " +
         "range (a) (partition value = 1.2, partition value = 2) stored as kudu",
         "Range partition value 1.2 (type: DECIMAL(2,1)) is not type compatible with " +
-        "partitioning column 'a' (type: INT).");
+        "partitioning column 'a' (type: INT).", isExternalPurgeTbl);
     // Non-existing column used in PARTITION BY
     AnalysisError("create table tab (a int, b int, primary key (a, b)) " +
         "partition by range(unknown_column) (partition value = 'abc') stored as kudu",
         "Column 'unknown_column' in 'RANGE (unknown_column) " +
         "(PARTITION VALUE = 'abc')' is not a key column. Only key columns can be used " +
-        "in PARTITION BY");
+        "in PARTITION BY", isExternalPurgeTbl);
     // Kudu num_tablet_replicas is specified in tblproperties
     String kuduMasters = catalog_.getDefaultKuduMasterHosts();
     AnalyzesOk(String.format("create table tab (x int primary key) partition by " +
         "hash (x) partitions 8 stored as kudu tblproperties " +
         "('kudu.num_tablet_replicas'='1', 'kudu.master_addresses' = '%s')",
-        kuduMasters));
+        kuduMasters), isExternalPurgeTbl);
     // Kudu table name is specified in tblproperties resulting in an error
     AnalysisError("create table tab (x int primary key) partition by hash (x) " +
         "partitions 8 stored as kudu tblproperties ('kudu.table_name'='tab')",
-        "Not allowed to set 'kudu.table_name' manually for managed Kudu tables");
+        "Not allowed to set 'kudu.table_name' manually for synchronized Kudu tables.",
+        isExternalPurgeTbl);
     // No port is specified in kudu master address
     AnalyzesOk(String.format("create table tdata_no_port (id int primary key, " +
         "name string, valf float, vali bigint) partition by range(id) " +
         "(partition values <= 10, partition 10 < values <= 30, " +
         "partition 30 < values) stored as kudu tblproperties" +
-        "('kudu.master_addresses' = '%s')", kuduMasters));
+        "('kudu.master_addresses' = '%s')", kuduMasters), isExternalPurgeTbl);
     // Not using the STORED AS KUDU syntax to specify a Kudu table
     AnalysisError("create table tab (x int) tblproperties (" +
         "'storage_handler'='org.apache.hadoop.hive.kudu.KuduStorageHandler')",
-        CreateTableStmt.KUDU_STORAGE_HANDLER_ERROR_MESSAGE);
+        CreateTableStmt.KUDU_STORAGE_HANDLER_ERROR_MESSAGE, isExternalPurgeTbl);
     // Creating unpartitioned table results in a warning.
     AnalyzesOk("create table tab (x int primary key) stored as kudu tblproperties (" +
         "'storage_handler'='org.apache.hadoop.hive.kudu.KuduStorageHandler')",
-        "Unpartitioned Kudu tables are inefficient for large data sizes.");
+        "Unpartitioned Kudu tables are inefficient for large data sizes.",
+        isExternalPurgeTbl);
     // Invalid value for number of replicas
     AnalysisError("create table t (x int primary key) stored as kudu tblproperties (" +
         "'kudu.num_tablet_replicas'='1.1')",
-        "Table property 'kudu.num_tablet_replicas' must be an integer.");
+        "Table property 'kudu.num_tablet_replicas' must be an integer.",
+        isExternalPurgeTbl);
     // Don't allow caching
     AnalysisError("create table tab (x int primary key) stored as kudu cached in " +
-        "'testPool'", "A Kudu table cannot be cached in HDFS.");
+        "'testPool'", "A Kudu table cannot be cached in HDFS.", isExternalPurgeTbl);
     // LOCATION cannot be used with Kudu tables
     AnalysisError("create table tab (a int primary key) partition by hash (a) " +
         "partitions 3 stored as kudu location '/test-warehouse/'",
-        "LOCATION cannot be specified for a Kudu table.");
+        "LOCATION cannot be specified for a Kudu table.", isExternalPurgeTbl);
     // Creating unpartitioned table results in a warning.
     AnalyzesOk("create table tab (a int, primary key (a)) stored as kudu",
-        "Unpartitioned Kudu tables are inefficient for large data sizes.");
+        "Unpartitioned Kudu tables are inefficient for large data sizes.",
+        isExternalPurgeTbl);
     AnalysisError("create table tab (a int) stored as kudu",
-        "A primary key is required for a Kudu table.");
+        "A primary key is required for a Kudu table.", isExternalPurgeTbl);
     // Using ROW FORMAT with a Kudu table
     AnalysisError("create table tab (x int primary key) " +
         "row format delimited escaped by 'X' stored as kudu",
-        "ROW FORMAT cannot be specified for file format KUDU.");
+        "ROW FORMAT cannot be specified for file format KUDU.", isExternalPurgeTbl);
     // Using PARTITIONED BY with a Kudu table
     AnalysisError("create table tab (x int primary key) " +
         "partitioned by (y int) stored as kudu", "PARTITIONED BY cannot be used " +
-        "in Kudu tables.");
+        "in Kudu tables.", isExternalPurgeTbl);
     // Multi-column range partitions
     AnalyzesOk("create table tab (a bigint, b tinyint, c double, primary key(a, b)) " +
-        "partition by range(a, b) (partition (0, 0) < values <= (1, 1)) stored as kudu");
+        "partition by range(a, b) (partition (0, 0) < values <= (1, 1)) stored as kudu",
+        isExternalPurgeTbl);
     AnalysisError("create table tab (a bigint, b tinyint, c double, primary key(a, b)) " +
         "partition by range(a, b) (partition values <= (1, 'b')) stored as kudu",
         "Range partition value 'b' (type: STRING) is not type compatible with " +
-        "partitioning column 'b' (type: TINYINT)");
+        "partitioning column 'b' (type: TINYINT)", isExternalPurgeTbl);
     AnalysisError("create table tab (a bigint, b tinyint, c double, primary key(a, b)) " +
         "partition by range(a, b) (partition 0 < values <= 1) stored as kudu",
         "Number of specified range partition values is different than the number of " +
-        "partitioning columns: (1 vs 2). Range partition: 'PARTITION 0 < VALUES <= 1'");
+        "partitioning columns: (1 vs 2). Range partition: 'PARTITION 0 < VALUES <= 1'",
+        isExternalPurgeTbl);
 
 
     // Test unsupported Kudu types
@@ -253,12 +297,12 @@ public class AnalyzeKuduDDLTest extends FrontendTestBase {
       // Unsupported type is PK and partition col
       String stmt = String.format("create table tab (x %s primary key) " +
           "partition by hash(x) partitions 3 stored as kudu", t);
-      AnalysisError(stmt, expectedError);
+      AnalysisError(stmt, expectedError, isExternalPurgeTbl);
 
       // Unsupported type is not PK/partition col
       stmt = String.format("create table tab (x int primary key, y %s) " +
           "partition by hash(x) partitions 3 stored as kudu", t);
-      AnalysisError(stmt, expectedError);
+      AnalysisError(stmt, expectedError, isExternalPurgeTbl);
     }
 
     // Test column options
@@ -275,16 +319,17 @@ public class AnalyzeKuduDDLTest extends FrontendTestBase {
                   "not null encoding %s compression %s %s %s, y int encoding %s " +
                   "compression %s %s %s %s) partition by hash (x) " +
                   "partitions 3 stored as kudu", enc, comp, def, block, enc,
-                  comp, def, nul, block));
+                  comp, def, nul, block), isExternalPurgeTbl);
 
               // For a key column
               String createTblStr = String.format("create table tab (x int primary key " +
                   "%s encoding %s compression %s %s %s) partition by hash (x) " +
                   "partitions 3 stored as kudu", nul, enc, comp, def, block);
               if (nul.equals("null")) {
-                AnalysisError(createTblStr, "Primary key columns cannot be nullable");
+                AnalysisError(createTblStr, "Primary key columns cannot be nullable",
+                    isExternalPurgeTbl);
               } else {
-                AnalyzesOk(createTblStr);
+                AnalyzesOk(createTblStr, isExternalPurgeTbl);
               }
             }
           }
@@ -296,33 +341,34 @@ public class AnalyzeKuduDDLTest extends FrontendTestBase {
         "i2 smallint default null, i3 int default null, i4 bigint default null, " +
         "vals string default null, valf float default null, vald double default null, " +
         "valb boolean default null, valdec decimal(10, 5) default null) " +
-        "partition by hash (x) partitions 3 stored as kudu");
+        "partition by hash (x) partitions 3 stored as kudu", isExternalPurgeTbl);
     // Use NULL as a default value on a non-nullable column
     AnalysisError("create table tab (x int primary key, y int not null default null) " +
         "partition by hash (x) partitions 3 stored as kudu", "Default value of NULL " +
-        "not allowed on non-nullable column: 'y'");
+        "not allowed on non-nullable column: 'y'", isExternalPurgeTbl);
     // Primary key specified using the PRIMARY KEY clause
     AnalyzesOk("create table tab (x int not null encoding plain_encoding " +
         "compression snappy block_size 1, y int null encoding rle compression lz4 " +
         "default 1, primary key(x)) partition by hash (x) partitions 3 " +
-        "stored as kudu");
+        "stored as kudu", isExternalPurgeTbl);
     // Primary keys can't be null
     AnalysisError("create table tab (x int primary key null, y int not null) " +
         "partition by hash (x) partitions 3 stored as kudu", "Primary key columns " +
-        "cannot be nullable: x INT PRIMARY KEY NULL");
+        "cannot be nullable: x INT PRIMARY KEY NULL", isExternalPurgeTbl);
     AnalysisError("create table tab (x int not null, y int null, primary key (x, y)) " +
         "partition by hash (x) partitions 3 stored as kudu", "Primary key columns " +
-        "cannot be nullable: y INT NULL");
+        "cannot be nullable: y INT NULL", isExternalPurgeTbl);
     // Unsupported encoding value
     AnalysisError("create table tab (x int primary key, y int encoding invalid_enc) " +
         "partition by hash (x) partitions 3 stored as kudu", "Unsupported encoding " +
         "value 'INVALID_ENC'. Supported encoding values are: " +
-        Joiner.on(", ").join(Encoding.values()));
+        Joiner.on(", ").join(Encoding.values()), isExternalPurgeTbl);
     // Unsupported compression algorithm
     AnalysisError("create table tab (x int primary key, y int compression " +
         "invalid_comp) partition by hash (x) partitions 3 stored as kudu",
         "Unsupported compression algorithm 'INVALID_COMP'. Supported compression " +
-        "algorithms are: " + Joiner.on(", ").join(CompressionAlgorithm.values()));
+        "algorithms are: " + Joiner.on(", ").join(CompressionAlgorithm.values()),
+        isExternalPurgeTbl);
     // Default values
     AnalyzesOk("create table tab (i1 tinyint default 1, i2 smallint default 10, " +
         "i3 int default 100, i4 bigint default 1000, vals string default 'test', " +
@@ -330,119 +376,137 @@ public class AnalyzeKuduDDLTest extends FrontendTestBase {
         "cast(3.1452 as double), valb boolean default true, " +
         "valdec decimal(10, 5) default 3.14159, " +
         "primary key (i1, i2, i3, i4, vals)) partition by hash (i1) partitions 3 " +
-        "stored as kudu");
+        "stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tab (i int primary key default 1+1+1) " +
-        "partition by hash (i) partitions 3 stored as kudu");
+        "partition by hash (i) partitions 3 stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tab (i int primary key default factorial(5)) " +
-        "partition by hash (i) partitions 3 stored as kudu");
+        "partition by hash (i) partitions 3 stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tab (i int primary key, x int null default " +
-        "isnull(null, null)) partition by hash (i) partitions 3 stored as kudu");
+        "isnull(null, null)) partition by hash (i) partitions 3 stored as kudu",
+        isExternalPurgeTbl);
     // Invalid default values
     AnalysisError("create table tab (i int primary key default 'string_val') " +
         "partition by hash (i) partitions 3 stored as kudu", "Default value " +
-        "'string_val' (type: STRING) is not compatible with column 'i' (type: INT).");
+        "'string_val' (type: STRING) is not compatible with column 'i' (type: INT).",
+        isExternalPurgeTbl);
     AnalysisError("create table tab (i int primary key, x int default 1.1) " +
         "partition by hash (i) partitions 3 stored as kudu",
         "Default value 1.1 (type: DECIMAL(2,1)) is not compatible with column " +
-        "'x' (type: INT).");
+        "'x' (type: INT).", isExternalPurgeTbl);
     AnalysisError("create table tab (i tinyint primary key default 128) " +
         "partition by hash (i) partitions 3 stored as kudu", "Default value " +
-        "128 (type: SMALLINT) is not compatible with column 'i' (type: TINYINT).");
+        "128 (type: SMALLINT) is not compatible with column 'i' (type: TINYINT).",
+        isExternalPurgeTbl);
     AnalysisError("create table tab (i int primary key default isnull(null, null)) " +
         "partition by hash (i) partitions 3 stored as kudu", "Default value of " +
-        "NULL not allowed on non-nullable column: 'i'");
+        "NULL not allowed on non-nullable column: 'i'", isExternalPurgeTbl);
     AnalysisError("create table tab (i int primary key, x int not null " +
         "default isnull(null, null)) partition by hash (i) partitions 3 " +
         "stored as kudu", "Default value of NULL not allowed on non-nullable column: " +
-        "'x'");
+        "'x'", isExternalPurgeTbl);
     // Invalid block_size values
     AnalysisError("create table tab (i int primary key block_size 1.1) " +
         "partition by hash (i) partitions 3 stored as kudu", "Invalid value " +
-        "for BLOCK_SIZE: 1.1. A positive INTEGER value is expected.");
+        "for BLOCK_SIZE: 1.1. A positive INTEGER value is expected.",
+        isExternalPurgeTbl);
     AnalysisError("create table tab (i int primary key block_size 'val') " +
         "partition by hash (i) partitions 3 stored as kudu", "Invalid value " +
-        "for BLOCK_SIZE: 'val'. A positive INTEGER value is expected.");
+        "for BLOCK_SIZE: 'val'. A positive INTEGER value is expected.",
+        isExternalPurgeTbl);
 
     // Sort columns are not supported for Kudu tables.
     AnalysisError("create table tab (i int, x int primary key) partition by hash(x) " +
         "partitions 8 sort by(i) stored as kudu", "SORT BY is not supported for Kudu " +
-        "tables.");
+        "tables.", isExternalPurgeTbl);
 
     // Z-Sort columns are not supported for Kudu tables.
     BackendConfig.INSTANCE.setZOrderSortUnlocked(true);
 
     AnalysisError("create table tab (i int, x int primary key) partition by hash(x) " +
         "partitions 8 sort by zorder(i) stored as kudu", "SORT BY is not " +
-        "supported for Kudu tables.");
+        "supported for Kudu tables.", isExternalPurgeTbl);
 
     BackendConfig.INSTANCE.setZOrderSortUnlocked(false);
 
     // Range partitions with TIMESTAMP
     AnalyzesOk("create table ts_ranges (ts timestamp primary key) " +
         "partition by range (partition cast('2009-01-01 00:00:00' as timestamp) " +
-        "<= VALUES < '2009-01-02 00:00:00') stored as kudu");
+        "<= VALUES < '2009-01-02 00:00:00') stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table ts_ranges (ts timestamp primary key) " +
         "partition by range (partition value = cast('2009-01-01 00:00:00' as timestamp" +
-        ")) stored as kudu");
+        ")) stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table ts_ranges (ts timestamp primary key) " +
         "partition by range (partition value = '2009-01-01 00:00:00') " +
-        "stored as kudu");
+        "stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table ts_ranges (id int, ts timestamp, primary key(id, ts))" +
         "partition by range (partition value = (9, cast('2009-01-01 00:00:00' as " +
-        "timestamp))) stored as kudu");
+        "timestamp))) stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table ts_ranges (id int, ts timestamp, primary key(id, ts))" +
         "partition by range (partition value = (9, '2009-01-01 00:00:00')) " +
-        "stored as kudu");
+        "stored as kudu", isExternalPurgeTbl);
     AnalysisError("create table ts_ranges (ts timestamp primary key, i int)" +
         "partition by range (partition '2009-01-01 00:00:00' <= VALUES < " +
         "'NOT A TIMESTAMP') stored as kudu",
         "Range partition value 'NOT A TIMESTAMP' cannot be cast to target TIMESTAMP " +
-        "partitioning column.");
+        "partitioning column.", isExternalPurgeTbl);
     AnalysisError("create table ts_ranges (ts timestamp primary key, i int)" +
         "partition by range (partition 100 <= VALUES < 200) stored as kudu",
         "Range partition value 100 (type: TINYINT) is not type " +
-        "compatible with partitioning column 'ts' (type: TIMESTAMP).");
+        "compatible with partitioning column 'ts' (type: TIMESTAMP).",
+        isExternalPurgeTbl);
 
     // TIMESTAMP columns with default values
     AnalyzesOk("create table tdefault (id int primary key, ts timestamp default now())" +
-        "partition by hash(id) partitions 3 stored as kudu");
+        "partition by hash(id) partitions 3 stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tdefault (id int primary key, ts timestamp default " +
         "unix_micros_to_utc_timestamp(1230768000000000)) partition by hash(id) " +
-        "partitions 3 stored as kudu");
+        "partitions 3 stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tdefault (id int primary key, " +
         "ts timestamp not null default '2009-01-01 00:00:00') " +
-        "partition by hash(id) partitions 3 stored as kudu");
+        "partition by hash(id) partitions 3 stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tdefault (id int primary key, " +
         "ts timestamp not null default cast('2009-01-01 00:00:00' as timestamp)) " +
-        "partition by hash(id) partitions 3 stored as kudu");
+        "partition by hash(id) partitions 3 stored as kudu", isExternalPurgeTbl);
     AnalysisError("create table tdefault (id int primary key, ts timestamp " +
         "default null) partition by hash(id) partitions 3 stored as kudu",
-        "NULL cannot be cast to a TIMESTAMP literal.");
+        "NULL cannot be cast to a TIMESTAMP literal.", isExternalPurgeTbl);
     AnalysisError("create table tdefault (id int primary key, " +
         "ts timestamp not null default cast('00:00:00' as timestamp)) " +
         "partition by hash(id) partitions 3 stored as kudu",
-        "CAST('00:00:00' AS TIMESTAMP) cannot be cast to a TIMESTAMP literal.");
+        "CAST('00:00:00' AS TIMESTAMP) cannot be cast to a TIMESTAMP literal.",
+        isExternalPurgeTbl);
     AnalysisError("create table tdefault (id int primary key, " +
         "ts timestamp not null default '2009-1 foo') " +
         "partition by hash(id) partitions 3 stored as kudu",
-        "String '2009-1 foo' cannot be cast to a TIMESTAMP literal.");
+        "String '2009-1 foo' cannot be cast to a TIMESTAMP literal.",
+        isExternalPurgeTbl);
 
     // Test column comments.
     AnalyzesOk("create table tab (x int comment 'x', y int comment 'y', " +
-        "primary key (x, y)) stored as kudu");
+        "primary key (x, y)) stored as kudu", isExternalPurgeTbl);
 
     // Managed table is not allowed to set table property 'kudu.table_id'.
     AnalysisError("create table tab (x int primary key) partition by hash(x) " +
         "partitions 8 stored as kudu tblproperties ('kudu.table_id'='123456')",
         String.format("Table property %s should not be specified when " +
-            "creating a Kudu table.", KuduTable.KEY_TABLE_ID));
+            "creating a Kudu table.", KuduTable.KEY_TABLE_ID), isExternalPurgeTbl);
 
     // Kudu master address needs to be valid.
     AnalysisError("create table tab (x int primary key) partition by " +
         "hash (x) partitions 8 stored as kudu tblproperties " +
         "('kudu.master_addresses' = 'foo')",
         "Cannot analyze Kudu table 'tab': Error determining if Kudu's integration " +
-        "with the Hive Metastore is enabled");
+        "with the Hive Metastore is enabled", isExternalPurgeTbl);
+  }
+
+  @Test
+  public void TestCreateManagedKuduTable() {
+    testDDlsOnKuduTable(false);
+  }
+
+  @Test
+  public void TestCreateSynchronizedKuduTable() {
+    testDDlsOnKuduTable(true);
   }
 
   @Test
@@ -502,12 +566,15 @@ public class AnalyzeKuduDDLTest extends FrontendTestBase {
         "('kudu.table_name'='t', 'kudu.table_id'='123456')",
         String.format("Table property %s should not be specified when creating " +
             "a Kudu table.", KuduTable.KEY_TABLE_ID));
-    // External table is not allowed to set table property 'external.table.purge'
-    // to true.
+    // External table is not allowed to set table property 'kudu.table_name'
     AnalysisError("create external table t stored as kudu tblproperties " +
         "('external.table.purge'='true', 'kudu.table_name'='t')",
-        "Table property 'external.table.purge' cannot be set " +
-        "to true with an external Kudu table.");
+        "Not allowed to set 'kudu.table_name' manually for synchronized Kudu tables");
+    // trying to create the legacy external table syntax with external.table.purge
+    // property should error out
+    AnalysisError("create external table t stored as kudu tblproperties " +
+            "('external.table.purge'='true')",
+        "A primary key is required for a Kudu table.");
   }
 
   @Test
@@ -613,21 +680,23 @@ public class AnalyzeKuduDDLTest extends FrontendTestBase {
         "TBLPROPERTIES ('kudu.table_id' = '1234')",
         "Property 'kudu.table_id' cannot be altered for Kudu tables");
 
-    // Setting 'external.table.purge' is not allowed for Kudu tables.
-    AnalysisError("ALTER TABLE functional_kudu.testtbl SET " +
-        "TBLPROPERTIES ('external.table.purge' = 'true')",
-        "Property 'external.table.purge' cannot be altered for Kudu tables");
+    // Setting 'external.table.purge' is allowed for Kudu tables.
+    AnalyzesOk("ALTER TABLE functional_kudu.testtbl SET " +
+        "TBLPROPERTIES ('external.table.purge' = 'true')");
+
+    AnalyzesOk("ALTER TABLE functional_kudu.testtbl SET " +
+        "TBLPROPERTIES ('external.table.purge' = 'false')");
 
     // Rename the underlying Kudu table is not supported for managed Kudu tables.
     AnalysisError("ALTER TABLE functional_kudu.testtbl SET " +
         "TBLPROPERTIES ('kudu.table_name' = 'Hans')",
-        "Not allowed to set 'kudu.table_name' manually for managed Kudu tables");
+        "Not allowed to set 'kudu.table_name' manually for synchronized Kudu tables");
 
     // TODO IMPALA-6375: Allow setting kudu.table_name for managed Kudu tables
     // if the 'EXTERNAL' property is set to TRUE in the same step.
     AnalysisError("ALTER TABLE functional_kudu.testtbl SET " +
         "TBLPROPERTIES ('EXTERNAL' = 'TRUE','kudu.table_name' = 'Hans')",
-        "Not allowed to set 'kudu.table_name' manually for managed Kudu tables");
+        "Not allowed to set 'kudu.table_name' manually for synchronized Kudu tables");
 
     // ALTER TABLE RENAME TO
     AnalyzesOk("ALTER TABLE functional_kudu.testtbl RENAME TO new_testtbl");
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
index a737f0b..e486111 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
@@ -686,17 +686,44 @@ public class CatalogTest {
     assertEquals(System.getProperty("user.name"), table.getMetaStoreTable().getOwner());
     assertEquals(TableType.EXTERNAL_TABLE.toString(),
         table.getMetaStoreTable().getTableType());
+  }
+
+  /**
+   * In Hive-3 the HMS translation layer converts non-transactional managed
+   * table definitions to external tables. This test makes sure that such tables
+   * are seen as EXTERNAL tables when loaded in catalog
+   * @throws CatalogException
+   */
+  @Test
+  public void testCreateTableMetadataHive3() throws CatalogException {
+    Assume.assumeTrue(TestUtils.getHiveMajorVersion() > 2);
     // alltypesinsert is created using CREATE TABLE LIKE and is a MANAGED table
-    table = catalog_.getOrLoadTable("functional", "alltypesinsert", "test");
+    Table table = catalog_.getOrLoadTable("functional", "alltypesinsert", "test");
     assertEquals(System.getProperty("user.name"), table.getMetaStoreTable().getOwner());
-    if (TestUtils.getHiveMajorVersion() == 2) {
-      assertEquals(TableType.MANAGED_TABLE.toString(),
-          table.getMetaStoreTable().getTableType());
-    } else {
-      // in Hive-3 due to HMS translation, this table becomes an external table
-      assertEquals(TableType.EXTERNAL_TABLE.toString(),
-          table.getMetaStoreTable().getTableType());
-    }
+    assertEquals(TableType.EXTERNAL_TABLE.toString(),
+        table.getMetaStoreTable().getTableType());
+    // ACID tables should be loaded as MANAGED tables
+    table = catalog_.getOrLoadTable("functional", "insert_only_transactional_table",
+        "test");
+    assertEquals(System.getProperty("user.name"), table.getMetaStoreTable().getOwner());
+    assertEquals(TableType.MANAGED_TABLE.toString(),
+        table.getMetaStoreTable().getTableType());
+  }
+
+  /**
+   * In Hive-2 there is no HMS translation which converts non-transactional managed
+   * table definitions to external tables. This test makes sure that the such tables
+   * are seen as MANAGED tables in catalog
+   * @throws CatalogException
+   */
+  @Test
+  public void testCreateTableMetadataHive2() throws CatalogException {
+    Assume.assumeTrue(TestUtils.getHiveMajorVersion() <= 2);
+    // alltypesinsert is created using CREATE TABLE LIKE and is a MANAGED table
+    Table table = catalog_.getOrLoadTable("functional", "alltypesinsert", "test");
+    assertEquals(System.getProperty("user.name"), table.getMetaStoreTable().getOwner());
+    assertEquals(TableType.MANAGED_TABLE.toString(),
+        table.getMetaStoreTable().getTableType());
   }
 
   @Test
diff --git a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
index 34ffe74..2926fa0 100644
--- a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
@@ -42,6 +42,7 @@ import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.service.Frontend;
 import org.apache.impala.service.MetadataOp;
+import org.apache.impala.testutil.TestUtils;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TMetadataOpRequest;
 import org.apache.impala.thrift.TMetadataOpcode;
@@ -248,18 +249,17 @@ public class LocalCatalogTest {
     assertEquals("SELECT * FROM functional.alltypes", v.getQueryStmt().toSql());
   }
 
-  @Ignore("Ignored until IMPALA-9092 is fixed")
   @Test
   public void testKuduTable() throws Exception {
     LocalKuduTable t = (LocalKuduTable) catalog_.getTable("functional_kudu",  "alltypes");
     assertEquals("id,bool_col,tinyint_col,smallint_col,int_col," +
         "bigint_col,float_col,double_col,date_string_col,string_col," +
         "timestamp_col,year,month", Joiner.on(",").join(t.getColumnNames()));
-    // Assert on the generated SQL for the table, but not the table properties, since
-    // those might change based on whether this test runs before or after other
-    // tests which compute stats, etc.
-    Assert.assertThat(ToSqlUtils.getCreateTableSql(t), CoreMatchers.startsWith(
-        "CREATE TABLE functional_kudu.alltypes (\n" +
+    boolean areDefaultSynchronizedTablesExternal = TestUtils.getHiveMajorVersion() > 2;
+    String expectedOutputPrefix = areDefaultSynchronizedTablesExternal ? "CREATE "
+        + "EXTERNAL TABLE" : "CREATE TABLE";
+    String expectedOutput =
+        expectedOutputPrefix + " functional_kudu.alltypes (\n" +
         "  id INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
         "  bool_col BOOLEAN NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
         "  tinyint_col TINYINT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
@@ -277,7 +277,28 @@ public class LocalCatalogTest {
         ")\n" +
         "PARTITION BY HASH (id) PARTITIONS 3\n" +
         "STORED AS KUDU\n" +
-        "TBLPROPERTIES"));
+        "TBLPROPERTIES";
+
+    if (areDefaultSynchronizedTablesExternal) {
+      // Assert on the generated SQL for the table, but not the table properties, since
+      // those might change based on whether this test runs before or after other
+      // tests which compute stats, etc.
+      String output = ToSqlUtils.getCreateTableSql(t);
+      Assert.assertThat(output, CoreMatchers.startsWith(expectedOutput));
+      // the tblproperties have keys which are not in a deterministic order
+      // we will confirm if the 'external.table.purge'='TRUE' is available in the
+      // tblproperties substring separately
+      Assert.assertTrue("Synchronized Kudu tables in Hive-3 must contain external.table"
+          + ".purge table property", output.contains("'external.table.purge'='TRUE'"));
+      Assert.assertFalse("Found internal property TRANSLATED_TO_EXTERNAL in table "
+          + "properties", output.contains("TRANSLATED_TO_EXTERNAL"));
+    } else {
+    // Assert on the generated SQL for the table, but not the table properties, since
+    // those might change based on whether this test runs before or after other
+    // tests which compute stats, etc.
+      Assert.assertThat(ToSqlUtils.getCreateTableSql(t),
+          CoreMatchers.startsWith(expectedOutput));
+    }
   }
 
   @Test
diff --git a/fe/src/test/java/org/apache/impala/common/FrontendFixture.java b/fe/src/test/java/org/apache/impala/common/FrontendFixture.java
index 606ea39..de1e9d8 100644
--- a/fe/src/test/java/org/apache/impala/common/FrontendFixture.java
+++ b/fe/src/test/java/org/apache/impala/common/FrontendFixture.java
@@ -208,7 +208,7 @@ public class FrontendFixture {
         fail("Failed to add test table:\n" + createTableSql);
       }
     } else if (dummyTable instanceof KuduTable) {
-      if (!KuduTable.isExternalTable(msTbl)) {
+      if (!Table.isExternalTable(msTbl)) {
         fail("Failed to add table, external kudu table expected:\n" + createTableSql);
       }
       try {
diff --git a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
index a0efa71..fae008c 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
@@ -5,12 +5,19 @@ CREATE TABLE test1 (
   id INT
 )
 STORED AS TEXTFILE
----- RESULTS
+---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.test1 (
   id INT
 )
 STORED AS TEXTFILE
 LOCATION '$$location_uri$$'
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.test1 (
+  id INT
+)
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE')
 ====
 ---- CREATE_TABLE
 # simple table with all types
@@ -30,7 +37,7 @@ CREATE TABLE test2 (
   timestamp_col TIMESTAMP
 )
 STORED AS TEXTFILE
----- RESULTS
+---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.test2 (
   year INT,
   month INT,
@@ -48,6 +55,25 @@ CREATE TABLE show_create_table_test_db.test2 (
 )
 STORED AS TEXTFILE
 LOCATION '$$location_uri$$'
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.test2 (
+  year INT,
+  month INT,
+  id INT COMMENT 'Add a comment',
+  bool_col BOOLEAN,
+  tinyint_col TINYINT,
+  smallint_col SMALLINT,
+  int_col INT,
+  bigint_col BIGINT,
+  float_col FLOAT,
+  double_col DOUBLE,
+  date_string_col STRING,
+  string_col STRING,
+  timestamp_col TIMESTAMP
+)
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE')
 ====
 ---- CREATE_TABLE
 # all types and partitioned
@@ -73,7 +99,7 @@ PARTITIONED BY (
 )
 COMMENT 'This is a test'
 STORED AS TEXTFILE
----- RESULTS
+---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.test3 (
   year INT,
   month INT,
@@ -97,6 +123,31 @@ PARTITIONED BY (
 COMMENT 'This is a test'
 STORED AS TEXTFILE
 LOCATION '$$location_uri$$'
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.test3 (
+  year INT,
+  month INT,
+  id INT COMMENT 'Add a comment',
+  bool_col BOOLEAN,
+  tinyint_col TINYINT,
+  smallint_col SMALLINT,
+  int_col INT,
+  bigint_col BIGINT,
+  float_col FLOAT,
+  double_col DOUBLE,
+  date_string_col STRING,
+  string_col STRING,
+  timestamp_col TIMESTAMP
+)
+PARTITIONED BY (
+  x INT,
+  y INT,
+  a BOOLEAN
+)
+COMMENT 'This is a test'
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE')
 ====
 ---- CREATE_TABLE
 # With a table comment
@@ -107,7 +158,7 @@ CREATE TABLE test4 (
 )
 COMMENT 'This is a test'
 STORED AS TEXTFILE
----- RESULTS
+---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.test4 (
   year INT,
   month INT,
@@ -116,6 +167,16 @@ CREATE TABLE show_create_table_test_db.test4 (
 COMMENT 'This is a test'
 STORED AS TEXTFILE
 LOCATION '$$location_uri$$'
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.test4 (
+  year INT,
+  month INT,
+  id INT COMMENT 'Add a comment'
+)
+COMMENT 'This is a test'
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE')
 ====
 ---- CREATE_TABLE
 # With the row format specified
@@ -126,7 +187,7 @@ CREATE TABLE test5 (
 )
 ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ESCAPED BY '\\' LINES TERMINATED BY '\n'
 STORED AS TEXTFILE
----- RESULTS
+---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.test5 (
   year INT,
   month INT,
@@ -137,6 +198,18 @@ WITH SERDEPROPERTIES ('line.delim'='\n', 'field.delim'=',',
                       'serialization.format'=',', 'escape.delim'='\\')
 STORED AS TEXTFILE
 LOCATION '$$location_uri$$'
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.test5 (
+  year INT,
+  month INT,
+  id INT COMMENT 'Add a comment'
+)
+ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ESCAPED BY '\\' LINES TERMINATED BY '\n'
+WITH SERDEPROPERTIES ('line.delim'='\n', 'field.delim'=',',
+                      'serialization.format'=',', 'escape.delim'='\\')
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE')
 ====
 ---- CREATE_TABLE
 # testing with parquet specified
@@ -146,7 +219,7 @@ CREATE TABLE test6 (
   id INT COMMENT 'Add a comment'
 )
 STORED AS PARQUET
----- RESULTS
+---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.test6 (
   year INT,
   month INT,
@@ -154,6 +227,15 @@ CREATE TABLE show_create_table_test_db.test6 (
 )
 STORED AS PARQUET
 LOCATION '$$location_uri$$'
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.test6 (
+  year INT,
+  month INT,
+  id INT COMMENT 'Add a comment'
+)
+STORED AS PARQUET
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE')
 ====
 ---- CREATE_TABLE
 # with extra table properties and sequencefile
@@ -164,7 +246,7 @@ CREATE TABLE test7 (
 )
 STORED AS SEQUENCEFILE
 TBLPROPERTIES ('key3'='val3', 'key2'='val2', 'key1'='val1')
----- RESULTS
+---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.test7 (
   year INT,
   month INT,
@@ -173,6 +255,15 @@ CREATE TABLE show_create_table_test_db.test7 (
 STORED AS SEQUENCEFILE
 LOCATION '$$location_uri$$'
 TBLPROPERTIES ('key3'='val3', 'key2'='val2', 'key1'='val1')
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.test7 (
+  year INT,
+  month INT,
+  id INT COMMENT 'Add a comment'
+)
+STORED AS SEQUENCEFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('key3'='val3', 'key2'='val2', 'key1'='val1', 'external.table.purge'='TRUE')
 ====
 ---- CREATE_TABLE
 # testing with rcfile specified
@@ -182,7 +273,7 @@ CREATE TABLE test8 (
   id INT COMMENT 'Add a comment'
 )
 STORED AS RCFILE
----- RESULTS
+---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.test8 (
   year INT,
   month INT,
@@ -190,6 +281,15 @@ CREATE TABLE show_create_table_test_db.test8 (
 )
 STORED AS RCFILE
 LOCATION '$$location_uri$$'
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.test8 (
+  year INT,
+  month INT,
+  id INT COMMENT 'Add a comment'
+)
+STORED AS RCFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE')
 ====
 ---- CREATE_TABLE
 # Test create table as select
@@ -209,7 +309,7 @@ CREATE TABLE test_as_select (
   month INT
 )
 STORED AS TEXTFILE
----- RESULTS
+---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.test_as_select (
   id INT,
   bool_col BOOLEAN,
@@ -227,11 +327,30 @@ CREATE TABLE show_create_table_test_db.test_as_select (
 )
 STORED AS TEXTFILE
 LOCATION '$$location_uri$$'
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.test_as_select (
+  id INT,
+  bool_col BOOLEAN,
+  tinyint_col TINYINT,
+  smallint_col SMALLINT,
+  int_col INT,
+  bigint_col BIGINT,
+  float_col FLOAT,
+  double_col DOUBLE,
+  date_string_col STRING,
+  string_col STRING,
+  timestamp_col TIMESTAMP,
+  year INT,
+  month INT
+)
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE')
 ====
 ---- CREATE_TABLE
 create table i_1687_p partitioned by (int_col) as
   select bigint_col, int_col from functional.alltypessmall;
----- RESULTS
+---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.i_1687_p (
   bigint_col BIGINT
 )
@@ -240,10 +359,20 @@ PARTITIONED BY (
 )
 STORED AS TEXTFILE
 LOCATION '$$location_uri$$'
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.i_1687_p (
+  bigint_col BIGINT
+)
+PARTITIONED BY (
+  int_col INT
+)
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE')
 ====
 ---- QUERY
 SHOW CREATE TABLE functional_text_lzo.tinytable
----- RESULTS
+---- RESULTS-HIVE
 CREATE EXTERNAL TABLE functional_text_lzo.tinytable (
   a STRING,
   b STRING
@@ -255,7 +384,7 @@ LOCATION '$$location_uri$$'
 ====
 ---- QUERY
 SHOW CREATE TABLE functional.allcomplextypes
----- RESULTS
+---- RESULTS-HIVE
 CREATE EXTERNAL TABLE functional.allcomplextypes (
   id INT,
   int_array_col ARRAY<INT>,
@@ -283,14 +412,14 @@ TBLPROPERTIES ('transient_lastDdlTime'='1405990341')
 ====
 ---- QUERY
 SHOW CREATE VIEW functional.alltypes_view
----- RESULTS
+---- RESULTS-HIVE
 CREATE VIEW functional.alltypes_view AS
 SELECT * FROM functional.alltypes
 ====
 ---- QUERY
 # SHOW CREATE TABLE should also work for views.
 SHOW CREATE TABLE functional.alltypes_view
----- RESULTS
+---- RESULTS-HIVE
 CREATE VIEW functional.alltypes_view AS
 SELECT * FROM functional.alltypes
 ====
@@ -299,7 +428,7 @@ SELECT * FROM functional.alltypes
 CREATE VIEW column_aliases_view (foo, bar, baz) AS
 SELECT tinyint_col, id, bigint_col
 FROM functional.alltypes;
----- RESULTS
+---- RESULTS-HIVE
 # A view with column aliases is expanded into a SELECT with an inline view.
 CREATE VIEW show_create_table_test_db.column_aliases_view AS
 SELECT column_aliases_view.tinyint_col foo, column_aliases_view.id bar, column_aliases_view.bigint_col baz FROM (SELECT tinyint_col, id, bigint_col FROM functional.alltypes) column_aliases_view
@@ -307,14 +436,14 @@ SELECT column_aliases_view.tinyint_col foo, column_aliases_view.id bar, column_a
 ---- QUERY
 # Test views referencing views.
 SHOW CREATE VIEW functional.view_view;
----- RESULTS
+---- RESULTS-HIVE
 CREATE VIEW functional.view_view AS
 SELECT * FROM functional.alltypes_view
 ====
 ---- QUERY
 # Test complex views with multiple tables and Hive-style column aliases.
 SHOW CREATE VIEW functional.complex_view
----- RESULTS
+---- RESULTS-HIVE
 CREATE VIEW functional.complex_view AS
 SELECT complex_view.`_c0` abc, complex_view.string_col xyz FROM (SELECT count(a.bigint_col), b.string_col FROM functional.alltypesagg a INNER JOIN functional.alltypestiny b ON a.id = b.id WHERE a.bigint_col < 50 GROUP BY b.string_col HAVING count(a.bigint_col) > 1 ORDER BY b.string_col ASC LIMIT 100) complex_view
 ====
@@ -323,14 +452,14 @@ SELECT complex_view.`_c0` abc, complex_view.string_col xyz FROM (SELECT count(a.
 CREATE VIEW _quote_view (_foo, bar) AS
 SELECT tinyint_col, id _id
 FROM functional.alltypes;
----- RESULTS
+---- RESULTS-HIVE
 CREATE VIEW show_create_table_test_db.`_quote_view` AS
 SELECT `_quote_view`.tinyint_col `_foo`, `_quote_view`.`_id` bar FROM (SELECT tinyint_col, id `_id` FROM functional.alltypes) `_quote_view`
 ====
 ---- QUERY
 # SHOW CREATE VIEW should also work on tables.
 SHOW CREATE VIEW functional_parquet.tinytable;
----- RESULTS
+---- RESULTS-HIVE
 CREATE EXTERNAL TABLE functional_parquet.tinytable (
   a STRING,
   b STRING
@@ -343,7 +472,7 @@ TBLPROPERTIES ()
 # Create view that contains a subquery (IMPALA-4579)
 CREATE VIEW view_with_subquery AS SELECT * FROM functional.alltypestiny
   WHERE id IN (SELECT id FROM functional.alltypesagg);
----- RESULTS
+---- RESULTS-HIVE
 CREATE VIEW show_create_table_test_db.view_with_subquery
 AS SELECT * FROM functional.alltypestiny
 WHERE id IN (SELECT id FROM functional.alltypesagg)
@@ -353,7 +482,13 @@ WHERE id IN (SELECT id FROM functional.alltypesagg)
 CREATE TABLE test1 (id INT)
 SORT BY (id)
 STORED AS TEXTFILE
----- RESULTS
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.test1 (id INT)
+SORT BY LEXICAL (id)
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE')
+---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.test1 (id INT)
 SORT BY LEXICAL (id)
 STORED AS TEXTFILE
@@ -365,7 +500,14 @@ CREATE TABLE test1 (id INT)
 PARTITIONED BY (x INT, y INT)
 SORT BY (id)
 STORED AS TEXTFILE
----- RESULTS
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.test1 (id INT)
+PARTITIONED BY (x INT, y INT)
+SORT BY LEXICAL (id)
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE')
+---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.test1 (id INT)
 PARTITIONED BY (x INT, y INT)
 SORT BY LEXICAL (id)
@@ -374,7 +516,7 @@ LOCATION '$$location_uri$$'
 ====
 ---- QUERY
 SHOW CREATE TABLE functional_hbase.alltypes
----- RESULTS
+---- RESULTS-HIVE
 CREATE EXTERNAL TABLE functional_hbase.alltypes (
   id INT COMMENT 'Add a comment',
   bigint_col BIGINT,
diff --git a/tests/common/skip.py b/tests/common/skip.py
index 08a8358..2ab4250 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -217,9 +217,6 @@ class SkipIfHive3:
       reason="Kudu is not tested with Hive 3 notifications yet, see IMPALA-8751.")
   col_stat_separated_by_engine = pytest.mark.skipif(HIVE_MAJOR_VERSION >= 3,
       reason="Hive 3 separates column statistics by engine")
-  kudu_with_hms_translation = pytest.mark.skipif(HIVE_MAJOR_VERSION >= 3,
-      reason="Show create table output is different for HMS translated Kudu tables. "
-             "See IMPALA-9092 for details")
 
 
 class SkipIfHive2:
@@ -227,6 +224,9 @@ class SkipIfHive2:
       reason="Acid tables are only supported with Hive 3.")
   col_stat_not_separated_by_engine = pytest.mark.skipif(HIVE_MAJOR_VERSION == 2,
       reason="Hive 2 doesnt support separating column statistics by engine")
+  create_external_kudu_table = pytest.mark.skipif(HIVE_MAJOR_VERSION == 2,
+      reason="Hive 2 does not support creating external.table.purge Kudu tables."
+             " See IMPALA-9092 for details.")
 
 
 class SkipIfCatalogV2:
diff --git a/tests/metadata/test_show_create_table.py b/tests/metadata/test_show_create_table.py
index e813d1b..dbcc852 100644
--- a/tests/metadata/test_show_create_table.py
+++ b/tests/metadata/test_show_create_table.py
@@ -23,6 +23,7 @@ from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIf, SkipIfHive3
 from tests.common.test_dimensions import create_uncompressed_text_dimension
 from tests.util.test_file_parser import QueryTestSectionReader, remove_comments
+from tests.common.environ import HIVE_MAJOR_VERSION
 
 
 # The purpose of the show create table tests are to ensure that the "SHOW CREATE TABLE"
@@ -30,13 +31,15 @@ from tests.util.test_file_parser import QueryTestSectionReader, remove_comments
 # definition. The table is created, then the output of "SHOW CREATE TABLE" is used to
 # test if the table can be recreated. This test class does not support --update-results.
 class TestShowCreateTable(ImpalaTestSuite):
-  VALID_SECTION_NAMES = ["CREATE_TABLE", "CREATE_VIEW", "QUERY", "RESULTS"]
+  VALID_SECTION_NAMES = ["CREATE_TABLE", "CREATE_VIEW", "QUERY", "RESULTS-HIVE",
+                         "RESULTS-HIVE-3"]
   # Properties to filter before comparing results
   FILTER_TBL_PROPERTIES = ["transient_lastDdlTime", "numFiles", "numPartitions",
                            "numRows", "rawDataSize", "totalSize", "COLUMN_STATS_ACCURATE",
                            "STATS_GENERATED_VIA_STATS_TASK", "last_modified_by",
                            "last_modified_time", "numFilesErasureCoded",
-                           "bucketing_version", "OBJCAPABILITIES"]
+                           "bucketing_version", "OBJCAPABILITIES",
+                           "TRANSLATED_TO_EXTERNAL"]
 
   @classmethod
   def get_workload(self):
@@ -54,7 +57,6 @@ class TestShowCreateTable(ImpalaTestSuite):
         lambda v: v.get_value('table_format').file_format == 'text' and
         v.get_value('table_format').compression_codec == 'none')
 
-  @SkipIfHive3.kudu_with_hms_translation
   def test_show_create_table(self, vector, unique_database):
     self.__run_show_create_table_test_case('QueryTest/show-create-table', vector,
                                            unique_database)
@@ -209,7 +211,13 @@ class ShowCreateTableTestCase(object):
       assert 0, 'Error in test file %s. Test cases require a '\
           'CREATE_TABLE section.\n%s' %\
           (test_file_name, pprint.pformat(test_section))
-    expected_result = remove_comments(test_section['RESULTS'])
+    results_key = 'RESULTS-HIVE'
+    if HIVE_MAJOR_VERSION > 2:
+      if 'RESULTS-HIVE-3' in test_section:
+        # If the hive version is greater than 2 use the RESULTS-HIVE-3 available
+        results_key = 'RESULTS-HIVE-3'
+
+    expected_result = remove_comments(test_section[results_key])
     self.expected_result = expected_result.replace(
         ShowCreateTableTestCase.RESULTS_DB_NAME_TOKEN, test_db_name)
 
@@ -264,7 +272,6 @@ class TestInfraCompat(ImpalaTestSuite):
                              'l_comment')}]
 
   @SkipIf.kudu_not_supported
-  @SkipIfHive3.kudu_with_hms_translation
   @pytest.mark.parametrize('table_primary_keys_map', TABLE_PRIMARY_KEYS_MAPS)
   def test_primary_key_parse(self, impala_testinfra_cursor, table_primary_keys_map):
     """
@@ -276,7 +283,6 @@ class TestInfraCompat(ImpalaTestSuite):
         table_primary_keys_map['table']) == table_primary_keys_map['primary_keys']
 
   @SkipIf.kudu_not_supported
-  @SkipIfHive3.kudu_with_hms_translation
   @pytest.mark.parametrize('table_primary_keys_map', TABLE_PRIMARY_KEYS_MAPS)
   def test_load_table_with_primary_key_attr(self, impala_testinfra_cursor,
                                             table_primary_keys_map):
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index aa94ea0..1de184a 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -37,10 +37,10 @@ import time
 from datetime import datetime
 from pytz import utc
 
-from tests.common.environ import ImpalaTestClusterProperties
+from tests.common.environ import ImpalaTestClusterProperties, HIVE_MAJOR_VERSION
 from tests.common.kudu_test_suite import KuduTestSuite
 from tests.common.impala_cluster import ImpalaCluster
-from tests.common.skip import SkipIfNotHdfsMinicluster, SkipIfKudu, SkipIfHive3
+from tests.common.skip import SkipIfNotHdfsMinicluster, SkipIfKudu, SkipIfHive2
 from tests.common.test_dimensions import add_exec_option_dimension
 from tests.verifiers.metric_verifier import MetricVerifier
 
@@ -809,23 +809,42 @@ class TestCreateExternalTable(KuduTestSuite):
       if kudu_client.table_exists(table_name):
         kudu_client.delete_table(table_name)
 
+
 class TestShowCreateTable(KuduTestSuite):
   column_properties = "ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION"
 
-  def assert_show_create_equals(self, cursor, create_sql, show_create_sql):
+  def assert_show_create_equals(self, cursor, create_sql, show_create_sql,
+                                do_exact_match=False):
     """Executes 'create_sql' to create a table, then runs "SHOW CREATE TABLE" and checks
        that the output is the same as 'show_create_sql'. 'create_sql' and
        'show_create_sql' can be templates that can be used with str.format(). format()
-       will be called with 'table' and 'db' as keyword args.
+       will be called with 'table' and 'db' as keyword args. Also, compares HMS-3 specific
+       output due to HMS translation. If do_exact_match is True does not manipulate the
+       output and compares exactly with the show_create_sql parameter.
     """
     format_args = {"table": self.random_table_name(), "db": cursor.conn.db_name}
     cursor.execute(create_sql.format(**format_args))
     cursor.execute("SHOW CREATE TABLE {table}".format(**format_args))
-    assert cursor.fetchall()[0][0] == \
+    output = cursor.fetchall()[0][0]
+    if not do_exact_match and HIVE_MAJOR_VERSION > 2:
+      # in case of HMS-3 all Kudu tables are translated to external tables with some
+      # additional properties. This code below makes sure that we have the expected table
+      # properties and the table is external
+      # TODO we should move these tests to a query.test file so that we can have better
+      # way to compare the output against different hive versions
+      assert output.startswith("CREATE EXTERNAL TABLE")
+      assert "TBLPROPERTIES ('external.table.purge'='TRUE', " in output
+      # We have made sure that the output starts with CREATE EXTERNAL TABLE, now we can
+      # change it to "CREATE TABLE" to make it easier to compare rest of the str
+      output = output.replace("CREATE EXTERNAL TABLE", "CREATE TABLE")
+      # We should also remove the additional tbl property external.table.purge so that we
+      # can compare the rest of output
+      output = output.replace("TBLPROPERTIES ('external.table.purge'='TRUE', ",
+                              "TBLPROPERTIES (")
+    assert output == \
         textwrap.dedent(show_create_sql.format(**format_args)).strip()
 
   @SkipIfKudu.hms_integration_enabled
-  @SkipIfHive3.kudu_with_hms_translation
   def test_primary_key_and_distribution(self, cursor):
     # TODO: Add case with BLOCK_SIZE
     self.assert_show_create_equals(cursor,
@@ -927,7 +946,6 @@ class TestShowCreateTable(KuduTestSuite):
             kudu_addr=KUDU_MASTER_HOSTS))
 
   @SkipIfKudu.hms_integration_enabled
-  @SkipIfHive3.kudu_with_hms_translation
   def test_timestamp_default_value(self, cursor):
     create_sql_fmt = """
         CREATE TABLE {table} (c INT, d TIMESTAMP,
@@ -987,13 +1005,12 @@ class TestShowCreateTable(KuduTestSuite):
           STORED AS KUDU
           TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}', {kudu_table})""".format(
               db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS,
-              kudu_table=table_name_prop))
+              kudu_table=table_name_prop), True)
     finally:
       if kudu_client.table_exists(kudu_table_name):
         kudu_client.delete_table(kudu_table_name)
 
   @SkipIfKudu.hms_integration_enabled
-  @SkipIfHive3.kudu_with_hms_translation
   def test_managed_kudu_table_name_with_show_create(self, cursor):
     """Check that the generated kudu.table_name tblproperty is not present with
        show create table with managed Kudu tables.
@@ -1013,6 +1030,49 @@ class TestShowCreateTable(KuduTestSuite):
         TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format(
             db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS))
 
+  def test_synchronized_kudu_table_with_show_create(self, cursor):
+    # in this case we do exact match with the provided input since this is specifically
+    # creating a synchronized table
+    self.assert_show_create_equals(cursor,
+        """
+        CREATE EXTERNAL TABLE {table} (
+          id BIGINT,
+          name STRING,
+          PRIMARY KEY(id))
+        PARTITION BY HASH PARTITIONS 16
+        STORED AS KUDU
+        TBLPROPERTIES('external.table.purge'='true')""",
+        """
+        CREATE EXTERNAL TABLE {db}.{{table}} (
+          id BIGINT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
+          name STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
+          PRIMARY KEY (id)
+        )
+        PARTITION BY HASH (id) PARTITIONS 16
+        STORED AS KUDU
+        TBLPROPERTIES ('external.table.purge'='true', 'kudu.master_addresses'='{kudu_addr}')"""
+          .format(db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS), True)
+
+    self.assert_show_create_equals(cursor,
+        """
+        CREATE EXTERNAL TABLE {table} (
+          id BIGINT PRIMARY KEY,
+          name STRING)
+        PARTITION BY HASH(id) PARTITIONS 16
+        STORED AS KUDU
+        TBLPROPERTIES('external.table.purge'='true')""",
+        """
+        CREATE EXTERNAL TABLE {db}.{{table}} (
+          id BIGINT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
+          name STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
+          PRIMARY KEY (id)
+        )
+        PARTITION BY HASH (id) PARTITIONS 16
+        STORED AS KUDU
+        TBLPROPERTIES ('external.table.purge'='true', 'kudu.master_addresses'='{kudu_addr}')"""
+          .format(db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS), True)
+
+
 class TestDropDb(KuduTestSuite):
 
   @SkipIfKudu.hms_integration_enabled
@@ -1185,3 +1245,136 @@ class TestKuduMemLimits(KuduTestSuite):
                  for i in ImpalaCluster.get_e2e_test_cluster().impalads]
     for v in verifiers:
       v.wait_for_metric("impala-server.num-fragments-in-flight", 0, timeout=30)
+
+
+@SkipIfHive2.create_external_kudu_table
+class TestCreateSynchronizedTable(KuduTestSuite):
+
+  def test_create_synchronized_table(self, cursor, kudu_client, unique_database):
+    """
+    Creates a synchronized Kudu table and makes sure that the statement does not fail.
+    """
+    table_name = self.random_table_name()
+    # create a external kudu table with external.table.purge=true
+    cursor.execute("""
+      CREATE EXTERNAL TABLE %s.%s (
+        id int PRIMARY KEY,
+        name string)
+      PARTITION BY HASH PARTITIONS 8
+      STORED AS KUDU
+      TBLPROPERTIES ('external.table.purge'='true')
+    """ % (unique_database, table_name))
+    # make sure that the table was created
+    cursor.execute("SHOW TABLES IN %s" % unique_database)
+    assert (table_name,) in cursor.fetchall()
+    # make sure that the kudu table was created with default name
+    assert kudu_client.table_exists(self.to_kudu_table_name(unique_database, table_name))
+    # make sure that the external.table.purge property can be changed
+    cursor.execute("ALTER TABLE %s.%s set TBLPROPERTIES ("
+                   "'external.table.purge'='FALSE')" % (unique_database, table_name))
+    cursor.execute("SHOW TABLES IN %s" % unique_database)
+    assert (table_name,) in cursor.fetchall()
+    cursor.execute("ALTER TABLE %s.%s set TBLPROPERTIES ("
+                   "'external.table.purge'='TRUE')" % (unique_database, table_name))
+    cursor.execute("SHOW TABLES IN %s" % unique_database)
+    assert (table_name,) in cursor.fetchall()
+    # make sure that table can be renamed
+    new_table_name = self.random_table_name()
+    cursor.execute("ALTER TABLE %s.%s rename to %s.%s" %
+                   (unique_database, table_name, unique_database, new_table_name))
+    cursor.execute("SHOW TABLES IN %s" % unique_database)
+    assert (new_table_name,) in cursor.fetchall()
+    # make sure that the kudu table was created with default name
+    assert kudu_client.table_exists(
+      self.to_kudu_table_name(unique_database, new_table_name))
+    # now make sure that table disappears after we remove it
+    cursor.execute("DROP TABLE %s.%s" % (unique_database, new_table_name))
+    cursor.execute("SHOW TABLES IN %s" % unique_database)
+    assert (new_table_name,) not in cursor.fetchall()
+    assert not kudu_client.table_exists(
+      self.to_kudu_table_name(unique_database, new_table_name))
+
+  def test_invalid_sync_table_stmts(self, cursor, kudu_client, unique_database):
+    """
+    Test makes sure that a invalid way to create a synchronized table is erroring out
+    """
+    table_name = self.random_table_name()
+    try:
+      cursor.execute("""
+        CREATE EXTERNAL TABLE %s.%s (
+          a int PRIMARY KEY)
+        PARTITION BY HASH PARTITIONS 8
+        STORED AS KUDU
+        TBLPROPERTIES ('external.table.purge'='false')
+      """ % (unique_database, table_name))
+      assert False,\
+        "Create table statement with external.table.purge=False should error out"
+    except Exception as e:
+      # We throw this exception since the analyzer checks for properties one by one.
+      # This is the first property that it checks for an external table
+      assert "Table property kudu.table_name must be specified when " \
+             "creating an external Kudu table" in str(e)
+
+    try:
+      # missing external.table.purge in TBLPROPERTIES
+      cursor.execute("""
+        CREATE EXTERNAL TABLE %s.%s (
+          a int PRIMARY KEY)
+        PARTITION BY HASH PARTITIONS 8
+        STORED AS KUDU
+        TBLPROPERTIES ('FOO'='BAR')
+        """ % (unique_database, table_name))
+      assert False, \
+        "Create external table statement must include external.table.purge property"
+    except Exception as e:
+      # We throw this exception since the analyzer checks for properties one by one.
+      # This is the first property that it checks for an external table
+      assert "Table property kudu.table_name must be specified when " \
+             "creating an external Kudu table" in str(e)
+
+    try:
+      # Trying to create a managed table with external.purge.table property in it
+      cursor.execute("""
+        CREATE TABLE %s.%s (
+          a int PRIMARY KEY)
+        PARTITION BY HASH PARTITIONS 8
+        STORED AS KUDU
+        TBLPROPERTIES ('external.table.purge'='true')
+              """ % (unique_database, table_name))
+      assert False, \
+        "Managed table creation with external.table.purge property must be disallowed"
+    except Exception as e:
+      assert "Table property 'external.table.purge' cannot be set to true " \
+             "with an managed Kudu table." in str(e)
+
+    # TODO should we block this?
+    cursor.execute("""
+      CREATE TABLE %s.%s (
+        a int PRIMARY KEY)
+      PARTITION BY HASH PARTITIONS 8
+      STORED AS KUDU
+      TBLPROPERTIES ('external.table.purge'='False')""" % (unique_database, table_name))
+    cursor.execute("SHOW TABLES IN %s" % unique_database)
+    assert (table_name,) in cursor.fetchall()
+
+  def test_sync_tbl_with_kudu_table(self, cursor, kudu_client, unique_database):
+    """
+    Test tries to create a synchronized table with an existing Kudu table name and
+    makes sure it fails.
+    """
+    with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table:
+      table_name = self.random_table_name()
+      try:
+        cursor.execute("""
+            CREATE EXTERNAL TABLE %s.%s (
+              a int PRIMARY KEY)
+            PARTITION BY HASH PARTITIONS 8
+            STORED AS KUDU
+            TBLPROPERTIES('external.table.purge'='true', 'kudu.table_name' = '%s')"""
+                       % (unique_database, table_name,
+                          self.get_kudu_table_base_name(kudu_table.name)))
+        assert False, "External tables with external.purge.table property must fail " \
+          "if the kudu table already exists"
+      except Exception as e:
+        assert "Not allowed to set 'kudu.table_name' manually for" \
+               " synchronized Kudu tables" in str(e)