You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jr...@apache.org on 2017/02/24 21:41:59 UTC

[1/4] incubator-impala git commit: IMPALA-4981: Re-enable spilling with MT_DOP.

Repository: incubator-impala
Updated Branches:
  refs/heads/master 1e73d7fc3 -> a71636847


IMPALA-4981: Re-enable spilling with MT_DOP.

The initial changes for MT_DOP disabled spilling because
spilling is not yet designed/implemented for multi-threaded
joins. However, we have since disallowed running queries
that have non-local joins with MT_DOP.
This patch re-enables spilling with MT_DOP.

Change-Id: I86465896c6583be256e17c88a713da7dde25b540
Reviewed-on: http://gerrit.cloudera.org:8080/6131
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/909be1dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/909be1dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/909be1dd

Branch: refs/heads/master
Commit: 909be1dd47fb155f04d700fa9e7429c0f98fdc41
Parents: 1e73d7f
Author: Alex Behm <al...@cloudera.com>
Authored: Thu Feb 23 15:57:37 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Feb 24 04:26:24 2017 +0000

----------------------------------------------------------------------
 fe/src/main/java/org/apache/impala/service/Frontend.java | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/909be1dd/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 0a4cceb..d3cefa6 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -1026,8 +1026,7 @@ public class Frontend {
         queryCtx.client_request.query_options.isDisable_unsafe_spills()
           && !queryCtx.tables_missing_stats.isEmpty()
           && !analysisResult.getAnalyzer().hasPlanHints();
-    // for now, always disable spilling for multi-threaded execution
-    if (isMtExec || disableSpilling) queryCtx.setDisable_spilling(true);
+    queryCtx.setDisable_spilling(disableSpilling);
 
     // assign fragment idx
     int idx = 0;


[4/4] incubator-impala git commit: IMPALA-4902: Copy parameters map in HdfsPartition.toThrift().

Posted by jr...@apache.org.
IMPALA-4902: Copy parameters map in HdfsPartition.toThrift().

The bug: When generating the toThrift() of an HdfsTable,
each THdfsPartition used to contain a reference to its
partition's parameters map. As a result, one thread trying
to serialize a thrift table returned by toThrift() could
conflict with another thread updating the parameters maps of
the table partitions. Here are a few examples of operations
that may modify the parameters map:
COMPUTE [INCREMENTAL] STATS, DROP STATS,
ALTER TABLE SET TBLPROPERTIES, ALTER TABLE SET CACHED, etc.

The fix: Create a shallow copy of the parameters map in
HdfsPartition.toThrift(). This means that toThrift() itself
must be protected from concurrent modifications to the
parameters map. Callers of toThrift() are now required
to hold the table lock. One place where the lock was not
already held needed to be adjusted.

Testing:
- I was unable to reproduce the issue locally, but the stacks
  from the JIRAs point directly to the parameters map, and
  the races are pretty obvious from looking at the code.
- Passed a core/hdfs private run.

Change-Id: Ic11277ad5512d2431cd3cc791715917c95395ddf
Reviewed-on: http://gerrit.cloudera.org:8080/6127
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/a7163684
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/a7163684
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/a7163684

Branch: refs/heads/master
Commit: a71636847fe742a9d0eb770516aff34ff16bbca1
Parents: 013456d
Author: Alex Behm <al...@cloudera.com>
Authored: Fri Feb 17 10:00:55 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Feb 24 10:18:38 2017 +0000

----------------------------------------------------------------------
 .../impala/catalog/CatalogServiceCatalog.java   | 90 +++++++++++---------
 .../apache/impala/catalog/HdfsPartition.java    |  7 +-
 .../java/org/apache/impala/catalog/Table.java   | 24 ++++--
 .../impala/service/CatalogOpExecutor.java       | 67 ++++++---------
 .../catalog/CatalogObjectToFromThriftTest.java  | 35 +++++---
 5 files changed, 122 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a7163684/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 8be0aa3..00caf51 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -52,6 +52,7 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.JniUtil;
 import org.apache.impala.common.Pair;
+import org.apache.impala.common.Reference;
 import org.apache.impala.hive.executor.UdfExecutor;
 import org.apache.impala.thrift.TCatalog;
 import org.apache.impala.thrift.TCatalogObject;
@@ -925,13 +926,14 @@ public class CatalogServiceCatalog extends Catalog {
    * Reloads metadata for table 'tbl'. If 'tbl' is an IncompleteTable, it makes an
    * asynchronous request to the table loading manager to create a proper table instance
    * and load the metadata from Hive Metastore. Otherwise, it updates table metadata
-   * in-place by calling the load() function on the specified table. Returns 'tbl', if it
-   * is a fully loaded table (e.g. HdfsTable, HBaseTable, etc). Otherwise, returns a
-   * newly constructed fully loaded table. Applies proper synchronization to protect the
-   * metadata load from concurrent table modifications and assigns a new catalog version.
+   * in-place by calling the load() function on the specified table. Returns the
+   * TCatalogObject representing 'tbl', if it is a fully loaded table (e.g. HdfsTable,
+   * HBaseTable, etc). Otherwise, returns a newly constructed fully loaded TCatalogObject.
+   * Applies proper synchronization to protect the metadata load from concurrent table
+   * modifications and assigns a new catalog version.
    * Throws a CatalogException if there is an error loading table metadata.
    */
-  public Table reloadTable(Table tbl) throws CatalogException {
+  public TCatalogObject reloadTable(Table tbl) throws CatalogException {
     LOG.info(String.format("Refreshing table metadata: %s", tbl.getFullName()));
     TTableName tblName = new TTableName(tbl.getDb().getName().toLowerCase(),
         tbl.getName().toLowerCase());
@@ -951,7 +953,8 @@ public class CatalogServiceCatalog extends Catalog {
       try {
         // The table may have been dropped/modified while the load was in progress, so
         // only apply the update if the existing table hasn't changed.
-        return replaceTableIfUnchanged(loadReq.get(), previousCatalogVersion);
+        Table result = replaceTableIfUnchanged(loadReq.get(), previousCatalogVersion);
+        return result.toTCatalogObject();
       } finally {
         loadReq.close();
         LOG.info(String.format("Refreshed table metadata: %s", tbl.getFullName()));
@@ -978,7 +981,7 @@ public class CatalogServiceCatalog extends Catalog {
       }
       tbl.setCatalogVersion(newCatalogVersion);
       LOG.info(String.format("Refreshed table metadata: %s", tbl.getFullName()));
-      return tbl;
+      return tbl.toTCatalogObject();
     } finally {
       Preconditions.checkState(!catalogLock_.isWriteLockedByCurrentThread());
       tbl.getLock().unlock();
@@ -986,13 +989,12 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
-   * Reloads the metadata of a table with name 'tableName'. Returns the table or null if
-   * the table does not exist.
+   * Reloads the metadata of a table with name 'tableName'.
    */
-  public Table reloadTable(TTableName tableName) throws CatalogException {
+  public void reloadTable(TTableName tableName) throws CatalogException {
     Table table = getTable(tableName.getDb_name(), tableName.getTable_name());
-    if (table == null) return null;
-    return reloadTable(table);
+    if (table == null) return;
+    reloadTable(table);
   }
 
   /**
@@ -1047,27 +1049,25 @@ public class CatalogServiceCatalog extends Catalog {
    * Invalidates the table in the catalog cache, potentially adding/removing the table
    * from the cache based on whether it exists in the Hive Metastore.
    * The invalidation logic is:
-   * - If the table exists in the metastore, add it to the catalog as an uninitialized
+   * - If the table exists in the Metastore, add it to the catalog as an uninitialized
    *   IncompleteTable (replacing any existing entry). The table metadata will be
    *   loaded lazily, on the next access. If the parent database for this table does not
    *   yet exist in Impala's cache it will also be added.
-   * - If the table does not exist in the metastore, remove it from the catalog cache.
-   * - If we are unable to determine whether the table exists in the metastore (there was
+   * - If the table does not exist in the Metastore, remove it from the catalog cache.
+   * - If we are unable to determine whether the table exists in the Metastore (there was
    *   an exception thrown making the RPC), invalidate any existing Table by replacing
    *   it with an uninitialized IncompleteTable.
-   *
-   * The parameter updatedObjects is a Pair that contains details on what catalog objects
-   * were modified as a result of the invalidateTable() call. The first item in the Pair
-   * is a Db which will only be set if a new database was added as a result of this call,
-   * otherwise it will be null. The second item in the Pair is the Table that was
-   * modified/added/removed.
-   * Returns a flag that indicates whether the items in updatedObjects were removed
-   * (returns true) or added/modified (return false). Only Tables should ever be removed.
+   * Returns the thrift representation of the added/updated/removed table, or null if
+   * the table was not present in the catalog cache or the Metastore.
+   * Sets tblWasRemoved to true if the table was absent from the Metastore and it was
+   * removed from the catalog cache.
+   * Sets dbWasAdded to true if both a new database and table were added to the catalog
+   * cache.
    */
-  public boolean invalidateTable(TTableName tableName, Pair<Db, Table> updatedObjects) {
-    Preconditions.checkNotNull(updatedObjects);
-    updatedObjects.first = null;
-    updatedObjects.second = null;
+  public TCatalogObject invalidateTable(TTableName tableName,
+      Reference<Boolean> tblWasRemoved, Reference<Boolean> dbWasAdded) {
+    tblWasRemoved.setRef(false);
+    dbWasAdded.setRef(false);
     String dbName = tableName.getDb_name();
     String tblName = tableName.getTable_name();
     LOG.info(String.format("Invalidating table metadata: %s.%s", dbName, tblName));
@@ -1092,17 +1092,19 @@ public class CatalogServiceCatalog extends Catalog {
       }
 
       if (tableExistsInMetaStore != null && !tableExistsInMetaStore) {
-        updatedObjects.second = removeTable(dbName, tblName);
-        return true;
+        Table result = removeTable(dbName, tblName);
+        if (result == null) return null;
+        tblWasRemoved.setRef(true);
+        return result.toTCatalogObject();
       }
 
       db = getDb(dbName);
       if ((db == null || !db.containsTable(tblName)) && tableExistsInMetaStore == null) {
         // The table does not exist in our cache AND it is unknown whether the
-        // table exists in the metastore. Do nothing.
-        return false;
+        // table exists in the Metastore. Do nothing.
+        return null;
       } else if (db == null && tableExistsInMetaStore) {
-        // The table exists in the metastore, but our cache does not contain the parent
+        // The table exists in the Metastore, but our cache does not contain the parent
         // database. A new db will be added to the cache along with the new table. msDb
         // must be valid since tableExistsInMetaStore is true.
         try {
@@ -1111,11 +1113,11 @@ public class CatalogServiceCatalog extends Catalog {
           db = new Db(dbName, this, msDb);
           db.setCatalogVersion(incrementAndGetCatalogVersion());
           addDb(db);
-          updatedObjects.first = db;
+          dbWasAdded.setRef(true);
         } catch (TException e) {
-          // The metastore database cannot be get. Log the error and return.
+          // The Metastore database cannot be get. Log the error and return.
           LOG.error("Error executing getDatabase() metastore call: " + dbName, e);
-          return false;
+          return null;
         }
       }
     }
@@ -1130,8 +1132,12 @@ public class CatalogServiceCatalog extends Catalog {
       tableLoadingMgr_.backgroundLoad(new TTableName(dbName.toLowerCase(),
           tblName.toLowerCase()));
     }
-    updatedObjects.second = newTable;
-    return false;
+    if (dbWasAdded.getRef()) {
+      // The database should always have a lower catalog version than the table because
+      // it needs to be created before the table can be added.
+      Preconditions.checkState(db.getCatalogVersion() < newTable.getCatalogVersion());
+    }
+    return newTable.toTCatalogObject();
   }
 
   /**
@@ -1290,10 +1296,10 @@ public class CatalogServiceCatalog extends Catalog {
 
   /**
    * Reloads metadata for the partition defined by the partition spec
-   * 'partitionSpec' in table 'tbl'. Returns the table object with partition
-   * metadata reloaded
+   * 'partitionSpec' in table 'tbl'. Returns the resulting table's TCatalogObject after
+   * the partition metadata was reloaded.
    */
-  public Table reloadPartition(Table tbl, List<TPartitionKeyValue> partitionSpec)
+  public TCatalogObject reloadPartition(Table tbl, List<TPartitionKeyValue> partitionSpec)
       throws CatalogException {
     if (!tryLockTable(tbl)) {
       throw new CatalogException(String.format("Error reloading partition of table %s " +
@@ -1324,7 +1330,7 @@ public class CatalogServiceCatalog extends Catalog {
             hdfsTable.dropPartition(partitionSpec);
             hdfsTable.setCatalogVersion(newCatalogVersion);
           }
-          return hdfsTable;
+          return hdfsTable.toTCatalogObject();
         } catch (Exception e) {
           throw new CatalogException("Error loading metadata for partition: "
               + hdfsTable.getFullName() + " " + partitionName, e);
@@ -1334,7 +1340,7 @@ public class CatalogServiceCatalog extends Catalog {
       hdfsTable.setCatalogVersion(newCatalogVersion);
       LOG.info(String.format("Refreshed partition metadata: %s %s",
           hdfsTable.getFullName(), partitionName));
-      return hdfsTable;
+      return hdfsTable.toTCatalogObject();
     } finally {
       Preconditions.checkState(!catalogLock_.isWriteLockedByCurrentThread());
       tbl.getLock().unlock();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a7163684/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index c240613..5db2247 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -762,8 +762,11 @@ public class HdfsPartition implements Comparable<HdfsPartition> {
     thriftHdfsPart.setAccess_level(accessLevel_);
     thriftHdfsPart.setIs_marked_cached(isMarkedCached_);
     thriftHdfsPart.setId(getId());
-    thriftHdfsPart.setHms_parameters(
-        includeIncrementalStats ? hmsParameters_ : getFilteredHmsParameters());
+    // IMPALA-4902: Shallow-clone the map to avoid concurrent modifications. One thread
+    // may try to serialize the returned THdfsPartition after releasing the table's lock,
+    // and another thread doing DDL may modify the map.
+    thriftHdfsPart.setHms_parameters(Maps.newHashMap(
+        includeIncrementalStats ? hmsParameters_ : getFilteredHmsParameters()));
     if (includeFileDesc) {
       // Add block location information
       for (FileDescriptor fd: fileDescriptors_) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a7163684/fe/src/main/java/org/apache/impala/catalog/Table.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 01a4e55..61289a5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -17,22 +17,19 @@
 
 package org.apache.impala.catalog;
 
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.log4j.Logger;
-
 import org.apache.impala.analysis.TableName;
-import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TAccessLevel;
@@ -44,6 +41,8 @@ import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.util.HdfsCachingUtil;
+import org.apache.log4j.Logger;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -69,7 +68,7 @@ public abstract class Table implements CatalogObject {
   protected TTableDescriptor tableDesc_;
   protected TAccessLevel accessLevel_ = TAccessLevel.READ_WRITE;
   // Lock protecting this table
-  private final ReentrantLock tableLock_ = new ReentrantLock(true);
+  private final ReentrantLock tableLock_ = new ReentrantLock();
 
   // Number of clustering columns.
   protected int numClusteringCols_;
@@ -297,7 +296,22 @@ public abstract class Table implements CatalogObject {
     }
   }
 
+  /**
+   * Must be called with 'tableLock_' held to protect against concurrent modifications
+   * while producing the TTable result.
+   */
   public TTable toThrift() {
+    // It would be simple to acquire and release the lock in this function.
+    // However, in most cases toThrift() is called after modifying a table for which
+    // the table lock should already be held, and we want the toThrift() to be consistent
+    // with the modification. So this check helps us identify places where the lock
+    // acquisition is probably missing entirely.
+    if (!tableLock_.isHeldByCurrentThread()) {
+      throw new IllegalStateException(
+          "Table.toThrift() called without holding the table lock: " +
+              getFullName() + " " + getClass().getName());
+    }
+
     TTable table = new TTable(db_.getName(), name_);
     table.setAccess_level(accessLevel_);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a7163684/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index c11f990..9d82c07 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -3069,63 +3069,50 @@ public class CatalogOpExecutor {
     resp.getResult().setCatalog_service_id(JniCatalog.getServiceId());
 
     if (req.isSetTable_name()) {
-      // Tracks any CatalogObjects updated/added/removed as a result of
-      // the invalidate metadata or refresh call. For refresh() it is only expected
-      // that a table be modified, but for invalidateTable() the table's parent database
-      // may have also been added if it did not previously exist in the catalog.
-      Pair<Db, Table> modifiedObjects = new Pair<Db, Table>(null, null);
-
-      boolean wasRemoved = false;
+      // Results of an invalidate operation, indicating whether the table was removed
+      // from the Metastore, and whether a new database was added to Impala as a result
+      // of the invalidate operation. Always false for refresh.
+      Reference<Boolean> tblWasRemoved = new Reference<Boolean>(false);
+      Reference<Boolean> dbWasAdded = new Reference<Boolean>(false);
+      // Thrift representation of the result of the invalidate/refresh operation.
+      TCatalogObject updatedThriftTable = null;
       if (req.isIs_refresh()) {
         TableName tblName = TableName.fromThrift(req.getTable_name());
         Table tbl = getExistingTable(tblName.getDb(), tblName.getTbl());
-        if (tbl == null) {
-          modifiedObjects.second = null;
-        } else {
+        if (tbl != null) {
           if (req.isSetPartition_spec()) {
-            modifiedObjects.second = catalog_.reloadPartition(tbl,
-                req.getPartition_spec());
+            updatedThriftTable = catalog_.reloadPartition(tbl, req.getPartition_spec());
           } else {
-            modifiedObjects.second = catalog_.reloadTable(tbl);
+            updatedThriftTable = catalog_.reloadTable(tbl);
           }
         }
       } else {
-        wasRemoved = catalog_.invalidateTable(req.getTable_name(), modifiedObjects);
+        updatedThriftTable = catalog_.invalidateTable(
+            req.getTable_name(), tblWasRemoved, dbWasAdded);
       }
 
-      if (modifiedObjects.first == null) {
-        if (modifiedObjects.second == null) {
-          // Table does not exist in the meta store and Impala catalog, throw error.
-          throw new TableNotFoundException("Table not found: " +
-              req.getTable_name().getDb_name() + "." +
-              req.getTable_name().getTable_name());
-        }
-        TCatalogObject thriftTable = modifiedObjects.second.toTCatalogObject();
+      if (updatedThriftTable == null) {
+        // Table does not exist in the Metastore and Impala catalog, throw error.
+        throw new TableNotFoundException("Table not found: " +
+            req.getTable_name().getDb_name() + "." +
+            req.getTable_name().getTable_name());
+      }
+
+      if (!dbWasAdded.getRef()) {
         // Return the TCatalogObject in the result to indicate this request can be
         // processed as a direct DDL operation.
-        if (wasRemoved) {
-          resp.getResult().setRemoved_catalog_object_DEPRECATED(thriftTable);
+        if (tblWasRemoved.getRef()) {
+          resp.getResult().setRemoved_catalog_object_DEPRECATED(updatedThriftTable);
         } else {
-          resp.getResult().setUpdated_catalog_object_DEPRECATED(thriftTable);
+          resp.getResult().setUpdated_catalog_object_DEPRECATED(updatedThriftTable);
         }
-        resp.getResult().setVersion(thriftTable.getCatalog_version());
       } else {
-        // If there were two catalog objects modified it indicates there was an
-        // "invalidateTable()" call that added a new table AND database to the catalog.
+        // Since multiple catalog objects were modified (db and table), don't treat this
+        // as a direct DDL operation. Set the overall catalog version and the impalad
+        // will wait for a statestore heartbeat that contains the update.
         Preconditions.checkState(!req.isIs_refresh());
-        Preconditions.checkNotNull(modifiedObjects.first);
-        Preconditions.checkNotNull(modifiedObjects.second);
-
-        // The database should always have a lower catalog version than the table because
-        // it needs to be created before the table can be added.
-        Preconditions.checkState(modifiedObjects.first.getCatalogVersion() <
-            modifiedObjects.second.getCatalogVersion());
-
-        // Since multiple catalog objects were modified, don't treat this as a direct DDL
-        // operation. Just set the overall catalog version and the impalad will wait for
-        // a statestore heartbeat that contains the update.
-        resp.getResult().setVersion(modifiedObjects.second.getCatalogVersion());
       }
+      resp.getResult().setVersion(updatedThriftTable.getCatalog_version());
     } else {
       // Invalidate the entire catalog if no table name is provided.
       Preconditions.checkArgument(!req.isIs_refresh());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a7163684/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
index ebd99ba..7d80ce0 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
@@ -21,15 +21,10 @@ import static org.junit.Assert.fail;
 
 import java.util.Map;
 
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.impala.testutil.CatalogServiceTestCatalog;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaException;
+import org.apache.impala.testutil.CatalogServiceTestCatalog;
 import org.apache.impala.thrift.ImpalaInternalServiceConstants;
 import org.apache.impala.thrift.TAccessLevel;
 import org.apache.impala.thrift.THBaseTable;
@@ -37,6 +32,11 @@ import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.THdfsTable;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableType;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
 import com.google.common.collect.Lists;
 
 /**
@@ -59,7 +59,7 @@ public class CatalogObjectToFromThriftTest {
                         "functional_seq"};
     for (String dbName: dbNames) {
       Table table = catalog_.getOrLoadTable(dbName, "alltypes");
-      TTable thriftTable = table.toThrift();
+      TTable thriftTable = getThriftTable(table);
       Assert.assertEquals(thriftTable.tbl_name, "alltypes");
       Assert.assertEquals(thriftTable.db_name, dbName);
       Assert.assertTrue(thriftTable.isSetTable_type());
@@ -125,7 +125,7 @@ public class CatalogObjectToFromThriftTest {
   public void TestMismatchedAvroAndTableSchemas() throws CatalogException {
     Table table = catalog_.getOrLoadTable("functional_avro_snap",
         "schema_resolution_test");
-    TTable thriftTable = table.toThrift();
+    TTable thriftTable = getThriftTable(table);
     Assert.assertEquals(thriftTable.tbl_name, "schema_resolution_test");
     Assert.assertTrue(thriftTable.isSetTable_type());
     Assert.assertEquals(thriftTable.getColumns().size(), 8);
@@ -145,7 +145,7 @@ public class CatalogObjectToFromThriftTest {
   public void TestHBaseTables() throws CatalogException {
     String dbName = "functional_hbase";
     Table table = catalog_.getOrLoadTable(dbName, "alltypes");
-    TTable thriftTable = table.toThrift();
+    TTable thriftTable = getThriftTable(table);
     Assert.assertEquals(thriftTable.tbl_name, "alltypes");
     Assert.assertEquals(thriftTable.db_name, dbName);
     Assert.assertTrue(thriftTable.isSetTable_type());
@@ -174,7 +174,7 @@ public class CatalogObjectToFromThriftTest {
       throws CatalogException {
     String dbName = "functional_hbase";
     Table table = catalog_.getOrLoadTable(dbName, "alltypessmallbinary");
-    TTable thriftTable = table.toThrift();
+    TTable thriftTable = getThriftTable(table);
     Assert.assertEquals(thriftTable.tbl_name, "alltypessmallbinary");
     Assert.assertEquals(thriftTable.db_name, dbName);
     Assert.assertTrue(thriftTable.isSetTable_type());
@@ -206,7 +206,7 @@ public class CatalogObjectToFromThriftTest {
   @Test
   public void TestTableLoadingErrors() throws ImpalaException {
     Table table = catalog_.getOrLoadTable("functional", "hive_index_tbl");
-    TTable thriftTable = table.toThrift();
+    TTable thriftTable = getThriftTable(table);
     Assert.assertEquals(thriftTable.tbl_name, "hive_index_tbl");
     Assert.assertEquals(thriftTable.db_name, "functional");
 
@@ -237,11 +237,22 @@ public class CatalogObjectToFromThriftTest {
   @Test
   public void TestView() throws CatalogException {
     Table table = catalog_.getOrLoadTable("functional", "view_view");
-    TTable thriftTable = table.toThrift();
+    TTable thriftTable = getThriftTable(table);
     Assert.assertEquals(thriftTable.tbl_name, "view_view");
     Assert.assertEquals(thriftTable.db_name, "functional");
     Assert.assertFalse(thriftTable.isSetHdfs_table());
     Assert.assertFalse(thriftTable.isSetHbase_table());
     Assert.assertTrue(thriftTable.isSetMetastore_table());
   }
+
+  private TTable getThriftTable(Table table) {
+    TTable thriftTable = null;
+    table.getLock().lock();
+    try {
+      thriftTable = table.toThrift();
+    } finally {
+      table.getLock().unlock();
+    }
+    return thriftTable;
+  }
 }


[2/4] incubator-impala git commit: IMPALA-2020, 4915, 4936: Add rounding for decimal casts

Posted by jr...@apache.org.
IMPALA-2020, 4915, 4936: Add rounding for decimal casts

This change adds support for DECIMAL_V2 rounding behavior for both
DECIMAL to INT and DOUBLE to DECIMAL casts.  The round behavior
implemented for exact halves is round halves away from zero (e.g
(0.5 -> 1) and (-0.5 -> -1)).

This change also fixes two open bugs regarding overflow checking.
The root cause of both behaviors turned out to be the same - a
missing std:: caused the wrong abs() function to be used.  Due
to details of IEEE floating point representation, this actually
masked another bug, as NaN is often represented as all 1-bits,
which fails the overflow test.  Since the implicit conversion to
int lost precision, we ended up storing large numbers that don't
actually represent valid decimal numbers in the range when the
value happened to be +/- Infinity.  This caused the rendering
back to ASCII characters to go awry, but is otherwise harmless.

Testing: Added expr-test and decimal-test test coverage as well as
manual testing.  I tried to update the expr benchmark to get some
kind of results but the benchmark is pretty bit-rotted.  It was
throwing JNI exceptions.  Fixed up the JNI init call, but there is
still a lot of work to do to get this back in a runnable state.
Even with the hack to get at the RuntimeContext, we end up getting
null derefs due to the slot descriptor table not being initialized.

Output comparisons, before | after
+----------------------+
| cast(0.59999 as int) |
+----------------------+
| 0        |  1        |
+----------------------+

+---------------------------------------------+
| cast(cast(0.5999 as float) as decimal(5,1)) |
+---------------------------------------------+
| 0.5      | 0.6                              |
+---------------------------------------------+

Performance summary.  In all cases I have tried multiple times and
taken the fastest query results.

Old version, head at 815c76f9cbbe6585ebed961da506fc54ce2ef4e3:

select sum(cast(l_extendedprice as bigint)) from tpch10_parquet.lineitem;
+--------------------------------------+
| sum(cast(l_extendedprice as bigint)) |
+--------------------------------------+
| 2293784575265                        |
+--------------------------------------+
Fetched 1 row(s) in 0.53s

With this change, and decimal_v2 off:

+--------------------------------------+
| sum(cast(l_extendedprice as bigint)) |
+--------------------------------------+
| 2293784575265                        |
+--------------------------------------+
Fetched 1 row(s) in 0.52s

Note that there is some noise / instability in these results and across
invocations there is quite a bit of variance.  Still we appear not to
have regressed.

With decimal V2 enabled, we loose some performance due to rounding.

DECIMAL_V2 set to 1
+--------------------------------------+
| sum(cast(l_extendedprice as bigint)) |
+--------------------------------------+
| 2293814088985                        |
+--------------------------------------+
Fetched 1 row(s) in 0.63s

So we're about 20% slower.  The variance is quite a lot so this is not a
scientific number, but the trend is maintained.  So we have some work to
do to get this back.

Casting from double seems to be roughly at parity:

Old version, head at 815c76f9cbbe6585ebed961da506fc54ce2ef4e3:
+-------------------------------------------------------------+
| sum(cast(cast(l_extendedprice as double) as decimal(14,2))) |
+-------------------------------------------------------------+
| 2293813121802.09                                            |
+-------------------------------------------------------------+
Fetched 1 row(s) in 0.63s
+--------------------------------------------------------------+
| sum(cast(cast(l_extendedprice as double) as decimal(38,10))) |
+--------------------------------------------------------------+
| 2293813156773.3596978911                                     |
+--------------------------------------------------------------+
Fetched 1 row(s) in 0.72s

New version, decimal_v2=0
+-------------------------------------------------------------+
| sum(cast(cast(l_extendedprice as double) as decimal(14,2))) |
+-------------------------------------------------------------+
| 2293813121802.09                                            |
+-------------------------------------------------------------+
Fetched 1 row(s) in 0.64s
+--------------------------------------------------------------+
| sum(cast(cast(l_extendedprice as double) as decimal(38,10))) |
+--------------------------------------------------------------+
| 2293813156773.3596978911                                     |
+--------------------------------------------------------------+
Fetched 1 row(s) in 0.73s

New version, decimal_v2=1;
+-------------------------------------------------------------+
| sum(cast(cast(l_extendedprice as double) as decimal(14,2))) |
+-------------------------------------------------------------+
| 2293813156773.36                                            |
+-------------------------------------------------------------+
Fetched 1 row(s) in 0.63s
+--------------------------------------------------------------+
| sum(cast(cast(l_extendedprice as double) as decimal(38,10))) |
+--------------------------------------------------------------+
| 2293813156773.3600000000                                     |
+--------------------------------------------------------------+
Fetched 1 row(s) in 0.73s

Interestingly, you can see the effect of the rounding as well - the
decimal 38,10 result is now precise, where as the truncation before
left artifacts from the division.

Change-Id: I2daf186b4770a022f9cb349d512067a1dd624810
Reviewed-on: http://gerrit.cloudera.org:8080/5951
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d2c540f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d2c540f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d2c540f1

Branch: refs/heads/master
Commit: d2c540f1e3d63eea39311a03f1ee8741042312eb
Parents: 909be1d
Author: Zach Amsden <za...@cloudera.com>
Authored: Thu Feb 9 03:23:07 2017 +0000
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Feb 24 07:58:59 2017 +0000

----------------------------------------------------------------------
 be/src/benchmarks/expr-benchmark.cc   | 104 +++++++++++-----
 be/src/exprs/decimal-operators-ir.cc  |  74 +++++++----
 be/src/exprs/expr-test.cc             | 193 ++++++++++++++++++++++++++++-
 be/src/exprs/expr.h                   |  11 --
 be/src/exprs/literal.cc               |   6 +-
 be/src/runtime/decimal-test.cc        |  97 ++++++++++++---
 be/src/runtime/decimal-value.h        |  19 ++-
 be/src/runtime/decimal-value.inline.h |  58 +++++++--
 be/src/udf/udf.h                      |  20 +--
 9 files changed, 472 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d2c540f1/be/src/benchmarks/expr-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/expr-benchmark.cc b/be/src/benchmarks/expr-benchmark.cc
index cda183f..c6d075d 100644
--- a/be/src/benchmarks/expr-benchmark.cc
+++ b/be/src/benchmarks/expr-benchmark.cc
@@ -22,11 +22,16 @@
 #include <thrift/Thrift.h>
 #include <thrift/protocol/TDebugProtocol.h>
 
+#include "runtime/exec-env.h"
+#include "runtime/runtime-state.h"
+
 #include "exprs/expr.h"
 #include "exprs/expr-context.h"
+#include "util/backend-gflag-util.h"
 #include "util/benchmark.h"
 #include "util/cpu-info.h"
 #include "util/debug-util.h"
+#include "util/jni-util.h"
 #include "rpc/jni-thrift-util.h"
 
 #include "gen-cpp/Types_types.h"
@@ -41,9 +46,12 @@
 #include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/Frontend_types.h"
 #include "rpc/thrift-server.h"
+#include "codegen/llvm-codegen.h"
+#include "common/init.h"
 #include "common/object-pool.h"
 #include "common/status.h"
 #include "runtime/mem-tracker.h"
+#include "service/fe-support.h"
 #include "service/impala-server.h"
 
 #include "common/names.h"
@@ -56,19 +64,8 @@ using namespace impala;
 class Planner {
  public:
   Planner() {
-    JNIEnv* jni_env = getJNIEnv();
-    // create instance of java class JniFrontend
-    jclass fe_class = jni_env->FindClass("org/apache/impala/service/JniFrontend");
-    jmethodID fe_ctor = jni_env->GetMethodID(fe_class, "<init>", "(Z)V");
-    EXIT_IF_EXC(jni_env);
-    create_exec_request_id_ =
-        jni_env->GetMethodID(fe_class, "createExecRequest", "([B)[B");
-    EXIT_IF_EXC(jni_env);
-
-    jboolean lazy = true;
-    jobject fe = jni_env->NewObject(fe_class, fe_ctor, lazy);
-    EXIT_IF_EXC(jni_env);
-    ABORT_IF_ERROR(JniUtil::LocalToGlobalRef(jni_env, fe, &fe_));
+    frontend_.SetCatalogInitialized();
+    exec_env_.InitForFeTests();
   }
 
   Status GeneratePlan(const string& stmt, TExecRequest* result) {
@@ -77,22 +74,19 @@ class Planner {
     query_ctx.client_request.query_options = query_options_;
     query_ctx.__set_session(session_state_);
     ImpalaServer::PrepareQueryContext(&query_ctx);
+    runtime_state_.reset(new RuntimeState(query_ctx, &exec_env_, ""));
+
+    return frontend_.GetExecRequest(query_ctx, result);
+  }
 
-    JNIEnv* jni_env = getJNIEnv();
-    JniLocalFrame jni_frame;
-    RETURN_IF_ERROR(jni_frame.push(jni_env));
-    jbyteArray request_bytes;
-    RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &query_ctx, &request_bytes));
-    jbyteArray result_bytes = static_cast<jbyteArray>(
-        jni_env->CallObjectMethod(fe_, create_exec_request_id_, request_bytes));
-    RETURN_ERROR_IF_EXC(jni_env);
-    RETURN_IF_ERROR(DeserializeThriftMsg(jni_env, result_bytes, result));
-    return Status::OK();
+  RuntimeState* GetRuntimeState() {
+    return runtime_state_.get();
   }
 
  private:
-  jobject fe_;  // instance of org.apache.impala.service.JniFrontend
-  jmethodID create_exec_request_id_;  // JniFrontend.createExecRequest()
+  Frontend frontend_;
+  ExecEnv exec_env_;
+  scoped_ptr<RuntimeState> runtime_state_;
 
   TQueryOptions query_options_;
   TSessionState session_state_;
@@ -103,7 +97,7 @@ struct TestData {
   int64_t dummy_result;
 };
 
-Planner planner;
+Planner* planner;
 ObjectPool pool;
 MemTracker tracker;
 
@@ -114,7 +108,7 @@ static Status PrepareSelectList(const TExecRequest& request, ExprContext** ctx)
   vector<TExpr> texprs = query_request.plan_exec_info[0].fragments[0].output_exprs;
   DCHECK_EQ(texprs.size(), 1);
   RETURN_IF_ERROR(Expr::CreateExprTree(&pool, texprs[0], ctx));
-  RETURN_IF_ERROR((*ctx)->Prepare(NULL, RowDescriptor(), &tracker));
+  RETURN_IF_ERROR((*ctx)->Prepare(planner->GetRuntimeState(), RowDescriptor(), &tracker));
   return Status::OK();
 }
 
@@ -124,7 +118,7 @@ static TestData* GenerateBenchmarkExprs(const string& query, bool codegen) {
   ss << "select " << query;
   TestData* test_data = new TestData;
   TExecRequest request;
-  ABORT_IF_ERROR(planner.GeneratePlan(ss.str(), &request));
+  ABORT_IF_ERROR(planner->GeneratePlan(ss.str(), &request));
   ABORT_IF_ERROR(PrepareSelectList(request, &test_data->ctx));
   return test_data;
 }
@@ -221,6 +215,50 @@ Benchmark* BenchmarkCast() {
   return suite;
 }
 
+Benchmark* BenchmarkDecimalCast() {
+  Benchmark* suite = new Benchmark("Decimal Casts");
+  BENCHMARK("int_to_decimal4", "cast 12345678 as DECIMAL(9,2)");
+  BENCHMARK("decimal4_to_decimal4", "cast 12345678.5 as DECIMAL(9,2)");
+  BENCHMARK("decimal8_to_decimal4", "cast 12345678.345 as DECIMAL(9,2)");
+  BENCHMARK("decimal16_to_decimal4", "cast 12345678.123456783456789 as DECIMAL(9,2)");
+  BENCHMARK("double_to_decimal4", "cast e() as DECIMAL(9,7)");
+  BENCHMARK("string_to_decimal4", "cast '12345678.123456783456789' as DECIMAL(9,2)");
+  BENCHMARK("int_to_decimal8", "cast 12345678 as DECIMAL(18,2)");
+  BENCHMARK("decimal4_to_decimal8", "cast 12345678.5 as DECIMAL(18,2)");
+  BENCHMARK("decimal8_to_decimal8", "cast 12345678.345 as DECIMAL(18,2)");
+  BENCHMARK("decimal16_to_decimal8", "cast 12345678.123456783456789 as DECIMAL(18,2)");
+  BENCHMARK("double_to_decimal8", "cast e() as DECIMAL(18,7)");
+  BENCHMARK("string_to_decimal8", "cast '12345678.123456783456789' as DECIMAL(18,7)");
+  BENCHMARK("int_to_decimal16", "cast 12345678 as DECIMAL(28,2)");
+  BENCHMARK("decimal4_to_decimal16", "cast 12345678.5 as DECIMAL(28,2)");
+  BENCHMARK("decimal8_to_decimal16", "cast 12345678.345 as DECIMAL(28,2)");
+  BENCHMARK("decimal16_to_decimal16", "cast 12345678.123456783456789 as DECIMAL(28,2)");
+  BENCHMARK("double_to_decimal16", "cast e() as DECIMAL(28,7)");
+  BENCHMARK("string_to_decimal16", "cast '12345678.123456783456789' as DECIMAL(28,7)");
+  BENCHMARK("decimal4_to_tinyint", "cast 78.5 as TINYINT");
+  BENCHMARK("decimal8_to_tinyint", "cast 0.12345678345 as TINYINT");
+  BENCHMARK("decimal16_to_tinyint", "cast 78.12345678123456783456789 as TINYINT");
+  BENCHMARK("decimal4_to_smallint", "cast 78.5 as SMALLINT");
+  BENCHMARK("decimal8_to_smallint", "cast 0.12345678345 as SMALLINT");
+  BENCHMARK("decimal16_to_smallint", "cast 78.12345678123456783456789 as SMALLINT");
+  BENCHMARK("decimal4_to_int", "cast 12345678.5 as INT");
+  BENCHMARK("decimal8_to_int", "cast 12345678.345 as INT");
+  BENCHMARK("decimal16_to_int", "cast 12345678.123456783456789 as INT");
+  BENCHMARK("decimal4_to_bigint", "cast 12345678.5 as BIGINT");
+  BENCHMARK("decimal8_to_bigint", "cast 12345678.345 as BIGINT");
+  BENCHMARK("decimal16_to_bigint", "cast 12345678.123456783456789 as BIGINT");
+  BENCHMARK("decimal4_to_float", "cast 12345678.5 as FLOAT");
+  BENCHMARK("decimal8_to_float", "cast 12345678.345 as FLOAT");
+  BENCHMARK("decimal16_to_float", "cast 12345678.123456783456789 as FLOAT");
+  BENCHMARK("decimal4_to_double", "cast 12345678.5 as DOUBLE");
+  BENCHMARK("decimal8_to_double", "cast 12345678.345 as DOUBLE");
+  BENCHMARK("decimal16_to_double", "cast 12345678.123456783456789 as DOUBLE");
+  BENCHMARK("decimal4_to_string", "cast 12345678.5 as STRING");
+  BENCHMARK("decimal8_to_string", "cast 12345678.345 as STRING");
+  BENCHMARK("decimal16_to_string", "cast 12345678.123456783456789 as STRING");
+  return suite;
+}
+
 // ConditionalFunctions: Function                Rate          Comparison
 // ----------------------------------------------------------------------
 //                       not_null               877.8                  1X
@@ -516,13 +554,20 @@ Benchmark* BenchmarkTimestampFunctions() {
 }
 
 int main(int argc, char** argv) {
-  CpuInfo::Init();
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport(false);
+  impala::LlvmCodeGen::InitializeLlvm();
+
+  // Dynamically construct at runtime as the planner initialization depends on
+  // static objects being initialized in other compilation modules.
+  planner = new Planner();
 
   // Generate all the tests first (this does the planning)
   Benchmark* literals = BenchmarkLiterals();
   Benchmark* arithmetics = BenchmarkArithmetic();
   Benchmark* like = BenchmarkLike();
   Benchmark* cast = BenchmarkCast();
+  Benchmark* decimal_cast = BenchmarkDecimalCast();
   Benchmark* conditional_fns = BenchmarkConditionalFunctions();
   Benchmark* string_fns = BenchmarkStringFunctions();
   Benchmark* url_fns = BenchmarkUrlFunctions();
@@ -534,6 +579,7 @@ int main(int argc, char** argv) {
   cout << arithmetics->Measure() << endl;
   cout << like->Measure() << endl;
   cout << cast->Measure() << endl;
+  cout << decimal_cast->Measure() << endl;
   cout << conditional_fns->Measure() << endl;
   cout << string_fns->Measure() << endl;
   cout << url_fns->Measure() << endl;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d2c540f1/be/src/exprs/decimal-operators-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/decimal-operators-ir.cc b/be/src/exprs/decimal-operators-ir.cc
index 4dfc8dc..6ea48f7 100644
--- a/be/src/exprs/decimal-operators-ir.cc
+++ b/be/src/exprs/decimal-operators-ir.cc
@@ -34,11 +34,11 @@
 
 namespace impala {
 
-#define RETURN_IF_OVERFLOW(ctx, overflow) \
+#define RETURN_IF_OVERFLOW(ctx, overflow, return_type) \
   do {\
     if (UNLIKELY(overflow)) {\
       ctx->AddWarning("Expression overflowed, returning NULL");\
-      return DecimalVal::null();\
+      return return_type::null();\
     }\
   } while (false)
 
@@ -49,17 +49,17 @@ IR_ALWAYS_INLINE DecimalVal DecimalOperators::IntToDecimalVal(
   switch (ColumnType::GetDecimalByteSize(precision)) {
     case 4: {
       Decimal4Value dv = Decimal4Value::FromInt(precision, scale, val, &overflow);
-      RETURN_IF_OVERFLOW(ctx, overflow);
+      RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(dv.value());
     }
     case 8: {
       Decimal8Value dv = Decimal8Value::FromInt(precision, scale, val, &overflow);
-      RETURN_IF_OVERFLOW(ctx, overflow);
+      RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(dv.value());
     }
     case 16: {
       Decimal16Value dv = Decimal16Value::FromInt(precision, scale, val, &overflow);
-      RETURN_IF_OVERFLOW(ctx, overflow);
+      RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(dv.value());
     }
     default:
@@ -72,23 +72,24 @@ IR_ALWAYS_INLINE DecimalVal DecimalOperators::IntToDecimalVal(
 IR_ALWAYS_INLINE DecimalVal DecimalOperators::FloatToDecimalVal(
     FunctionContext* ctx, int precision, int scale, double val) {
   bool overflow = false;
+  const bool round = ctx->impl()->GetConstFnAttr(FunctionContextImpl::DECIMAL_V2);
   switch (ColumnType::GetDecimalByteSize(precision)) {
     case 4: {
       Decimal4Value dv =
-          Decimal4Value::FromDouble(precision, scale, val, &overflow);
-      RETURN_IF_OVERFLOW(ctx, overflow);
+          Decimal4Value::FromDouble(precision, scale, val, round, &overflow);
+      RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(dv.value());
     }
     case 8: {
       Decimal8Value dv =
-          Decimal8Value::FromDouble(precision, scale, val, &overflow);
-      RETURN_IF_OVERFLOW(ctx, overflow);
+          Decimal8Value::FromDouble(precision, scale, val, round, &overflow);
+      RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(dv.value());
     }
     case 16: {
       Decimal16Value dv =
-          Decimal16Value::FromDouble(precision, scale, val, &overflow);
-      RETURN_IF_OVERFLOW(ctx, overflow);
+          Decimal16Value::FromDouble(precision, scale, val, round, &overflow);
+      RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(dv.value());
     }
     default:
@@ -112,21 +113,21 @@ IR_ALWAYS_INLINE DecimalVal DecimalOperators::ScaleDecimalValue(FunctionContext*
     case 4: {
       Decimal4Value scaled_val = val.ScaleTo(
           val_scale, output_scale, output_precision, &overflow);
-      RETURN_IF_OVERFLOW(ctx, overflow);
+      RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(scaled_val.value());
     }
     case 8: {
       Decimal8Value val8 = ToDecimal8(val, &overflow);
       Decimal8Value scaled_val = val8.ScaleTo(
           val_scale, output_scale, output_precision, &overflow);
-      RETURN_IF_OVERFLOW(ctx, overflow);
+      RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(scaled_val.value());
     }
     case 16: {
       Decimal16Value val16 = ToDecimal16(val, &overflow);
       Decimal16Value scaled_val = val16.ScaleTo(
           val_scale, output_scale, output_precision, &overflow);
-      RETURN_IF_OVERFLOW(ctx, overflow);
+      RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(scaled_val.value());
     }
     default:
@@ -143,20 +144,20 @@ IR_ALWAYS_INLINE DecimalVal DecimalOperators::ScaleDecimalValue(FunctionContext*
       Decimal8Value scaled_val = val.ScaleTo(
           val_scale, output_scale, output_precision, &overflow);
       Decimal4Value val4 = ToDecimal4(scaled_val, &overflow);
-      RETURN_IF_OVERFLOW(ctx, overflow);
+      RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(val4.value());
     }
     case 8: {
       Decimal8Value scaled_val = val.ScaleTo(
           val_scale, output_scale, output_precision, &overflow);
-      RETURN_IF_OVERFLOW(ctx, overflow);
+      RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(scaled_val.value());
     }
     case 16: {
       Decimal16Value val16 = ToDecimal16(val, &overflow);
       Decimal16Value scaled_val = val16.ScaleTo(
           val_scale, output_scale, output_precision, &overflow);
-      RETURN_IF_OVERFLOW(ctx, overflow);
+      RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(scaled_val.value());
     }
     default:
@@ -173,20 +174,20 @@ IR_ALWAYS_INLINE DecimalVal DecimalOperators::ScaleDecimalValue(FunctionContext*
       Decimal16Value scaled_val = val.ScaleTo(
           val_scale, output_scale, output_precision, &overflow);
       Decimal4Value val4 = ToDecimal4(scaled_val, &overflow);
-      RETURN_IF_OVERFLOW(ctx, overflow);
+      RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(val4.value());
     }
     case 8: {
       Decimal16Value scaled_val = val.ScaleTo(
           val_scale, output_scale, output_precision, &overflow);
       Decimal8Value val8 = ToDecimal8(scaled_val, &overflow);
-      RETURN_IF_OVERFLOW(ctx, overflow);
+      RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(val8.value());
     }
     case 16: {
       Decimal16Value scaled_val = val.ScaleTo(
           val_scale, output_scale, output_precision, &overflow);
-      RETURN_IF_OVERFLOW(ctx, overflow);
+      RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(scaled_val.value());
     }
     default:
@@ -292,18 +293,39 @@ static inline Decimal16Value GetDecimal16Value(
       FunctionContext* ctx, const DecimalVal& val) { \
     if (val.is_null) return to_type::null(); \
     int scale = ctx->impl()->GetConstFnAttr(FunctionContextImpl::ARG_TYPE_SCALE, 0); \
+    bool overflow = false; \
+    /* TODO: IMPALA-4929: remove DECIMAL V1 code */ \
+    const bool round = ctx->impl()->GetConstFnAttr(FunctionContextImpl::DECIMAL_V2); \
     switch (ctx->impl()->GetConstFnAttr(FunctionContextImpl::ARG_TYPE_SIZE, 0)) { \
       case 4: { \
         Decimal4Value dv(val.val4); \
-        return to_type(dv.whole_part(scale)); \
+        if (round) { \
+          auto val = dv.ToInt<to_type>(scale, &overflow); \
+          RETURN_IF_OVERFLOW(ctx, overflow, to_type); \
+          return to_type(val); \
+        } else { \
+          return to_type(dv.whole_part(scale)); \
+        } \
       } \
       case 8: { \
         Decimal8Value dv(val.val8); \
-        return to_type(dv.whole_part(scale)); \
+        if (round) { \
+          auto val = dv.ToInt<to_type>(scale, &overflow); \
+          RETURN_IF_OVERFLOW(ctx, overflow, to_type); \
+          return to_type(val); \
+        } else { \
+          return to_type(dv.whole_part(scale)); \
+        } \
       } \
       case 16: { \
         Decimal16Value dv(val.val16); \
-        return to_type(dv.whole_part(scale)); \
+        if (round) { \
+          auto val = dv.ToInt<to_type>(scale, &overflow); \
+          RETURN_IF_OVERFLOW(ctx, overflow, to_type); \
+          return to_type(val); \
+        } else { \
+          return to_type(dv.whole_part(scale)); \
+        } \
       } \
       default:\
         DCHECK(false); \
@@ -477,7 +499,7 @@ IR_ALWAYS_INLINE DecimalVal DecimalOperators::RoundDecimal(FunctionContext* ctx,
   // overflow if output_precision >= val_precision. Otherwise, result can overflow.
   bool overflow = output_precision < val_precision &&
       abs(result.val16) >= DecimalUtil::GetScaleMultiplier<int128_t>(output_precision);
-  RETURN_IF_OVERFLOW(ctx, overflow);
+  RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
   return result;
 }
 
@@ -679,7 +701,7 @@ BooleanVal DecimalOperators::CastToBooleanVal(
         Decimal16Value y_val = GetDecimal16Value(y, y_size, &overflow); \
         Decimal16Value result = x_val.OP_FN<int128_t>(x_scale, y_val, y_scale, \
             return_precision, return_scale, &overflow); \
-        RETURN_IF_OVERFLOW(ctx, overflow); \
+        RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal); \
         return DecimalVal(result.value()); \
       } \
       default: \
@@ -726,7 +748,7 @@ BooleanVal DecimalOperators::CastToBooleanVal(
         Decimal16Value y_val = GetDecimal16Value(y, y_size, &overflow); \
         Decimal16Value result = x_val.OP_FN<int128_t>(x_scale, y_val, y_scale, \
             return_precision, return_scale, &is_nan, &overflow); \
-        RETURN_IF_OVERFLOW(ctx, overflow); \
+        RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal); \
         if (is_nan) return DecimalVal::null(); \
         return DecimalVal(result.value()); \
       } \

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d2c540f1/be/src/exprs/expr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index 442b64e..37e816d 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -451,6 +451,7 @@ class ExprTest : public testing::Test {
       default:
         EXPECT_TRUE(false) << expected_type << " " << expected_type.GetByteSize();
     }
+    EXPECT_EQ(result, StringParser::PARSE_SUCCESS);
   }
 
   template <class T> void TestValue(const string& expr, const ColumnType& expr_type,
@@ -1402,8 +1403,12 @@ DecimalTestCase decimal_cases[] = {
   { "cast(99999999999999999999999999999999999999 as decimal(38,0)) % "
     "cast(99999999999999999999999999999999999999 as decimal(38,0))",
     {{ false, 0, 38, 0 }}},
-  { "cast(998 as decimal(38,0)) % cast(0.999 as decimal(38,38))", {{ false, 0, 38, 38 }}},
-  { "cast(0.998 as decimal(38,38)) % cast(999 as decimal(38,0))", {{ false, 0, 38, 38 }}},
+  { "cast(998 as decimal(38,0)) % cast(0.999 as decimal(38,38))",
+    {{ true, 0, 38, 38 },   // IMPALA-4964 - this should not overflow
+     { true, 0, 38, 38 }}},
+  { "cast(0.998 as decimal(38,38)) % cast(999 as decimal(38,0))",
+    {{ true, 0, 38, 38 },   // IMPALA-4964 - this should not overflow
+     { true, 0, 38, 38 }}},
   { "cast(0.00000000000000000000000000000000000001 as decimal(38,38)) % "
     "cast(0.0000000000000000000000000000000000001 as decimal(38,38))",
     {{ false, 1, 38, 38 }}},
@@ -1488,7 +1493,188 @@ DecimalTestCase decimal_cases[] = {
   { "avg(d) from (values((cast(0.10000000000000000000000000000000000000 as DECIMAL(38,38)) "
     "as d))) as t",
     {{false, static_cast<int128_t>(10000000) *
-      10000000000ll * 10000000000ll * 10000000000ll, 38, 38}}}
+      10000000000ll * 10000000000ll * 10000000000ll, 38, 38}}},
+  // Test CAST DECIMAL -> INT
+  { "cast(cast(0.5999999 AS tinyint) AS decimal(10,6))",
+    {{ false, 0, 10, 6 },
+     { false, 1000000, 10, 6 }}},
+  { "cast(cast(99999999.4999999 AS int) AS decimal(10,2))",
+    {{ false, 9999999900, 10, 2 }}},
+  { "cast(cast(99999999.5999999 AS int) AS decimal(10,2))",
+    {{ false, 9999999900, 10, 2 },
+     { true, 0, 10, 2 }}},
+  { "cast(cast(10000.5999999 as int) as decimal(30,6))",
+    {{ false, 10000000000, 30, 6 },
+     { false, 10001000000, 30, 6 }}},
+  { "cast(cast(10000.5 AS int) AS decimal(6,1))",
+    {{ false, 100000, 6, 1 },
+     { false, 100010, 6, 1 }}},
+  { "cast(cast(-10000.5 AS int) AS decimal(6,1))",
+    {{ false, -100000, 6, 1 },
+     { false, -100010, 6, 1 }}},
+  { "cast(cast(9999.5 AS int) AS decimal(4,0))",
+    {{ false, 9999, 4, 0 },
+     { true, 0, 4, 0 }}},
+  { "cast(cast(-9999.5 AS int) AS decimal(4,0))",
+    {{ false, -9999, 4, 0 },
+     { true, 0, 4, 0 }}},
+  { "cast(cast(127.4999 AS tinyint) AS decimal(30,0))",
+    {{ false, 127, 30, 0 }}},
+  { "cast(cast(127.5 AS tinyint) AS decimal(30,0))",
+    {{ false, 127, 30, 0 },
+     { true, 0, 30, 0 }}},
+  { "cast(cast(128.0 AS tinyint) AS decimal(30,0))",
+    {{ false, -128, 30, 0 }, // BUG: JIRA: IMPALA-865
+     { true, 0, 30, 0 }}},
+  { "cast(cast(-128.4999 AS tinyint) AS decimal(30,0))",
+    {{ false, -128, 30, 0 }}},
+  { "cast(cast(-128.5 AS tinyint) AS decimal(30,0))",
+    {{ false, -128, 30, 0 },
+     { true, 0, 30, 0 }}},
+  { "cast(cast(-129.0 AS tinyint) AS decimal(30,0))",
+    {{ false, 127, 30, 0 }, // BUG: JIRA: IMPALA-865
+     { true, 0, 30, 0 }}},
+  { "cast(cast(32767.4999 AS smallint) AS decimal(30,0))",
+    {{ false, 32767, 30, 0 }}},
+  { "cast(cast(32767.5 AS smallint) AS decimal(30,0))",
+    {{ false, 32767, 30, 0 },
+     { true, 0, 30, 0 }}},
+  { "cast(cast(32768.0 AS smallint) AS decimal(30,0))",
+    {{ false, -32768, 30, 0 }, // BUG: JIRA: IMPALA-865
+     { true, 0, 30, 0 }}},
+  { "cast(cast(-32768.4999 AS smallint) AS decimal(30,0))",
+    {{ false, -32768, 30, 0 }}},
+  { "cast(cast(-32768.5 AS smallint) AS decimal(30,0))",
+    {{ false, -32768, 30, 0 },
+     { true, 0, 30, 0 }}},
+  { "cast(cast(-32769.0 AS smallint) AS decimal(30,0))",
+    {{ false, 32767, 30, 0 }, // BUG: JIRA: IMPALA-865
+     { true, 0, 30, 0 }}},
+  { "cast(cast(2147483647.4999 AS int) AS decimal(30,0))",
+    {{ false, 2147483647, 30, 0 }}},
+  { "cast(cast(2147483647.5 AS int) AS decimal(30,0))",
+    {{ false, 2147483647, 30, 0 },
+     { true, 0, 30, 0 }}},
+  { "cast(cast(2147483648.0 AS int) AS decimal(30,0))",
+    {{ false, -2147483648, 30, 0 }, // BUG: JIRA: IMPALA-865
+     { true, 0, 30, 0 }}},
+  { "cast(cast(-2147483648.4999 AS int) AS decimal(30,0))",
+    {{ false, -2147483648, 30, 0 }}},
+  { "cast(cast(-2147483648.5 AS int) AS decimal(30,0))",
+    {{ false, -2147483648, 30, 0 },
+     { true, 0, 30, 0 }}},
+  { "cast(cast(-2147483649.0 AS int) AS decimal(30,0))",
+    {{ false, 2147483647, 30, 0 }, // BUG: JIRA: IMPALA-865
+     { true, 0, 30, 0 }}},
+  { "cast(cast(9223372036854775807.4999 AS bigint) AS decimal(30,0))",
+    {{ false, 9223372036854775807, 30, 0 }}},
+  { "cast(cast(9223372036854775807.5 AS bigint) AS decimal(30,0))",
+    {{ false, 9223372036854775807, 30, 0 },
+     { true, 0, 30, 0 }}},
+  { "cast(cast(9223372036854775808.0 AS bigint) AS decimal(30,0))",
+    {{ false, -9223372036854775807 - 1, 30, 0 }, // BUG; also GCC workaround with -1
+     // error: integer constant is so large that it is unsigned
+     { true, 0, 30, 0 }}},
+  { "cast(cast(-9223372036854775808.4999 AS bigint) AS decimal(30,0))",
+    {{ false, -9223372036854775807 - 1, 30, 0 }}},
+  { "cast(cast(-9223372036854775808.5 AS bigint) AS decimal(30,0))",
+    {{ false, -9223372036854775807 - 1, 30, 0 },
+     { true, 0, 30, 0 }}},
+  { "cast(cast(-9223372036854775809.0 AS bigint) AS decimal(30,0))",
+    {{ false, 9223372036854775807, 30, 0 }, // BUG: JIRA: IMPALA-865
+     { true, 0, 30, 0 }}},
+  // Test CAST FLOAT -> DECIMAL
+  { "cast(cast(power(10, 3) - power(10, -1) as float) as decimal(4,1))",
+    {{ false, 9999, 4, 1 }}},
+  { "cast(cast(power(10, 3) - power(10, -2) as float) as decimal(5,1))",
+    {{ false, 9999, 5, 1 },
+     { false, 10000, 5, 1 }}},
+  { "cast(cast(power(10, 3) - power(10, -2) as float) as decimal(4,1))",
+    {{ false, 9999, 4, 1 },
+     { true, 0, 4, 1 }}},
+  { "cast(cast(-power(10, 3) + power(10, -1) as float) as decimal(4,1))",
+    {{ false, -9999, 4, 1 }}},
+  { "cast(cast(-power(10, 3) + power(10, -2) as float) as decimal(5,1))",
+    {{ false, -9999, 5, 1 },
+     { false, -10000, 5, 1 }}},
+  { "cast(cast(-power(10, 3) + power(10, -2) as float) as decimal(4,1))",
+    {{ false, -9999, 4, 1 },
+     { true, 0, 4, 1 }}},
+  { "cast(cast(power(10, 3) - 0.45 as double) as decimal(4,1))",
+    {{ false, 9995, 4, 1 },
+     { false, 9996, 4, 1 }}},
+  { "cast(cast(power(10, 3) - 0.45 as double) as decimal(5,2))",
+    {{ false, 99955, 5, 2 }}},
+  { "cast(cast(power(10, 3) - 0.45 as double) as decimal(5,0))",
+    {{ false, 999, 5, 0 },
+     { false, 1000, 5, 0 }}},
+  { "cast(cast(power(10, 3) - 0.45 as double) as decimal(3,0))",
+    {{ false, 999, 3, 0 },
+     { true, 0, 3, 0 }}},
+  { "cast(cast(-power(10, 3) + 0.45 as double) as decimal(4,1))",
+    {{ false, -9995, 4, 1 },
+     { false, -9996, 4, 1 }}},
+  { "cast(cast(-power(10, 3) + 0.45 as double) as decimal(5,2))",
+    {{ false, -99955, 5, 2 }}},
+  { "cast(cast(-power(10, 3) + 0.45 as double) as decimal(5,0))",
+    {{ false, -999, 5, 0 },
+     { false, -1000, 5, 0 }}},
+  { "cast(cast(-power(10, 3) + 0.45 as double) as decimal(3,0))",
+    {{ false, -999, 3, 0 },
+     { true, 0, 3, 0 }}},
+  { "cast(cast(power(10, 3) - 0.5 as double) as decimal(4,1))",
+    {{ false, 9995, 4, 1 }}},
+  { "cast(cast(power(10, 3) - 0.5 as double) as decimal(5,2))",
+    {{ false, 99950, 5, 2 }}},
+  { "cast(cast(power(10, 3) - 0.5 as double) as decimal(5,0))",
+    {{ false, 999, 5, 0 },
+     { false, 1000, 5, 0 }}},
+  { "cast(cast(power(10, 3) - 0.5 as double) as decimal(3,0))",
+    {{ false, 999, 3, 0 },
+     { true, 0, 3, 0 }}},
+  { "cast(cast(-power(10, 3) + 0.5 as double) as decimal(4,1))",
+    {{ false, -9995, 4, 1 }}},
+  { "cast(cast(-power(10, 3) + 0.5 as double) as decimal(5,2))",
+    {{ false, -99950, 5, 2 }}},
+  { "cast(cast(-power(10, 3) + 0.5 as double) as decimal(5,0))",
+    {{ false, -999, 5, 0 },
+     { false, -1000, 5, 0 }}},
+  { "cast(cast(-power(10, 3) + 0.5 as double) as decimal(3,0))",
+    {{ false, -999, 3, 0 },
+     { true, 0, 3, 0 }}},
+  { "cast(cast(power(10, 3) - 0.55 as double) as decimal(4,1))",
+    {{ false, 9994, 4, 1 },
+     { false, 9995, 4, 1 }}},
+  { "cast(cast(power(10, 3) - 0.55 as double) as decimal(5,2))",
+    {{ false, 99945, 5, 2 }}},
+  { "cast(cast(power(10, 3) - 0.55 as double) as decimal(5,0))",
+    {{ false, 999, 5, 0 }}},
+  { "cast(cast(power(10, 3) - 0.55 as double) as decimal(3,0))",
+    {{ false, 999, 3, 0 }}},
+  { "cast(cast(-power(10, 3) + 0.55 as double) as decimal(4,1))",
+    {{ false, -9994, 4, 1 },
+     { false, -9995, 4, 1 }}},
+  { "cast(cast(-power(10, 3) + 0.55 as double) as decimal(5,2))",
+    {{ false, -99945, 5, 2 }}},
+  { "cast(cast(-power(10, 3) + 0.55 as double) as decimal(5,0))",
+    {{ false, -999, 5, 0 }}},
+  { "cast(cast(-power(10, 3) + 0.55 as double) as decimal(3,0))",
+    {{ false, -999, 3, 0 }}},
+  { "cast(power(2, 1023) * 100 as decimal(38,0))",
+    {{ true, 0, 38, 0 }}},
+  { "cast(power(2, 1023) * 100 as decimal(18,0))",
+    {{ true, 0, 18, 0 }}},
+  { "cast(power(2, 1023) * 100 as decimal(9,0))",
+    {{ true, 0, 9, 0 }}},
+  { "cast(0/0 as decimal(38,0))",
+    {{ true, 0, 38, 0 }}},
+  { "cast(0/0 as decimal(18,0))",
+    {{ true, 0, 18, 0 }}},
+  { "cast(0/0 as decimal(9,0))",
+    {{ true, 0, 9, 0 }}},
+  // 39 5's - legal double but will overflow in decimal
+  { "cast(555555555555555555555555555555555555555 as decimal(38,0))",
+    {{ true, 0, 38, 0 }}},
 };
 
 TEST_F(ExprTest, DecimalArithmeticExprs) {
@@ -1503,6 +1689,7 @@ TEST_F(ExprTest, DecimalArithmeticExprs) {
         TestDecimalResultType(c.expr, type);
         TestIsNull(c.expr, type);
       } else {
+        TestIsNotNull(c.expr, type);
         switch (type.GetByteSize()) {
           case 4:
             TestDecimalValue(c.expr, Decimal4Value(r.scaled_val), type);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d2c540f1/be/src/exprs/expr.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr.h b/be/src/exprs/expr.h
index 7b1b68a..8f14d6d 100644
--- a/be/src/exprs/expr.h
+++ b/be/src/exprs/expr.h
@@ -209,17 +209,6 @@ class Expr {
   /// Convenience function for closing multiple expr trees.
   static void Close(const std::vector<ExprContext*>& ctxs, RuntimeState* state);
 
-  /// Create a new literal expr of 'type' with initial 'data'.
-  /// data should match the ColumnType (i.e. type == TYPE_INT, data is a int*)
-  /// The new Expr will be allocated from the pool.
-  static Expr* CreateLiteral(ObjectPool* pool, const ColumnType& type, void* data);
-
-  /// Create a new literal expr of 'type' by parsing the string.
-  /// NULL will be returned if the string and type are not compatible.
-  /// The new Expr will be allocated from the pool.
-  static Expr* CreateLiteral(ObjectPool* pool, const ColumnType& type,
-      const std::string&);
-
   /// Computes a memory efficient layout for storing the results of evaluating
   /// 'exprs'. The results are assumed to be void* slot types (vs AnyVal types). Varlen
   /// data is not included (e.g. there will be space for a StringValue, but not the data

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d2c540f1/be/src/exprs/literal.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/literal.cc b/be/src/exprs/literal.cc
index 32f8250..4ae8a74 100644
--- a/be/src/exprs/literal.cc
+++ b/be/src/exprs/literal.cc
@@ -182,13 +182,13 @@ Literal::Literal(ColumnType type, double v)
     bool overflow = false;
     switch (type.GetByteSize()) {
       case 4:
-        value_.decimal4_val = Decimal4Value::FromDouble(type, v, &overflow);
+        value_.decimal4_val = Decimal4Value::FromDouble(type, v, true, &overflow);
         break;
       case 8:
-        value_.decimal8_val = Decimal8Value::FromDouble(type, v, &overflow);
+        value_.decimal8_val = Decimal8Value::FromDouble(type, v, true, &overflow);
         break;
       case 16:
-        value_.decimal16_val = Decimal16Value::FromDouble(type, v, &overflow);
+        value_.decimal16_val = Decimal16Value::FromDouble(type, v, true, &overflow);
         break;
     }
     DCHECK(!overflow);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d2c540f1/be/src/runtime/decimal-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/decimal-test.cc b/be/src/runtime/decimal-test.cc
index f88c8f5..6bcd34d 100644
--- a/be/src/runtime/decimal-test.cc
+++ b/be/src/runtime/decimal-test.cc
@@ -101,90 +101,151 @@ TEST(DoubleToDecimal, Basic) {
   Decimal16Value d16;
 
   bool overflow = false;
-  d4 = Decimal4Value::FromDouble(t1, 1.1, &overflow);
+  d4 = Decimal4Value::FromDouble(t1, 1.9, true, &overflow);
+  EXPECT_FALSE(overflow);
+  EXPECT_EQ(d4.value(), 2);
+  VerifyToString(d4, t1, "2");
+
+  d4 = Decimal4Value::FromDouble(t1, 1.9, false, &overflow);
   EXPECT_FALSE(overflow);
   EXPECT_EQ(d4.value(), 1);
   VerifyToString(d4, t1, "1");
 
-  d4 = Decimal4Value::FromDouble(t4, 1, &overflow);
+  d4 = Decimal4Value::FromDouble(t4, 1, true, &overflow);
   EXPECT_FALSE(overflow);
   EXPECT_EQ(d4.value(), 1);
   VerifyToString(d4, t4, "1");
 
-  d4 = Decimal4Value::FromDouble(t4, 0, &overflow);
+  d4 = Decimal4Value::FromDouble(t4, 0, true, &overflow);
   EXPECT_FALSE(overflow);
   EXPECT_EQ(d4.value(), 0);
   VerifyToString(d4, t4, "0");
 
-  d4 = Decimal4Value::FromDouble(t4, -1, &overflow);
+  d4 = Decimal4Value::FromDouble(t4, 9.9, false, &overflow);
+  EXPECT_FALSE(overflow);
+  EXPECT_EQ(d4.value(), 9);
+  VerifyToString(d4, t4, "9");
+
+  d4 = Decimal4Value::FromDouble(t4, 9.9, true, &overflow);
+  EXPECT_TRUE(overflow);
+  overflow = false;
+
+  d4 = Decimal4Value::FromDouble(t4, -1, true, &overflow);
   EXPECT_FALSE(overflow);
   EXPECT_EQ(d4.value(), -1);
   VerifyToString(d4, t4, "-1");
 
-  d4 = Decimal4Value::FromDouble(t5, 0.1, &overflow);
+  d4 = Decimal4Value::FromDouble(t4, -9.9, false, &overflow);
+  EXPECT_FALSE(overflow);
+  EXPECT_EQ(d4.value(), -9);
+  VerifyToString(d4, t4, "-9");
+
+  d4 = Decimal4Value::FromDouble(t4, -9.9, true, &overflow);
+  EXPECT_TRUE(overflow);
+  overflow = false;
+
+  d4 = Decimal4Value::FromDouble(t5, 0.1, true, &overflow);
   EXPECT_FALSE(overflow);
   EXPECT_EQ(d4.value(), 1);
   VerifyToString(d4, t5, "0.1");
 
-  d4 = Decimal4Value::FromDouble(t5, 0.0, &overflow);
+  d4 = Decimal4Value::FromDouble(t5, 0.0, true, &overflow);
   EXPECT_FALSE(overflow);
   EXPECT_EQ(d4.value(), 0);
   VerifyToString(d4, t5, "0.0");
 
-  d4 = Decimal4Value::FromDouble(t5, -0.1, &overflow);
+  d4 = Decimal4Value::FromDouble(t5, -0.1, true, &overflow);
   EXPECT_FALSE(overflow);
   EXPECT_EQ(d4.value(), -1);
   VerifyToString(d4, t5, "-0.1");
 
   overflow = false;
-  d8 = Decimal8Value::FromDouble(t2, -100.1, &overflow);
+  d8 = Decimal8Value::FromDouble(t2, -100.1, true, &overflow);
   EXPECT_FALSE(overflow);
   EXPECT_EQ(d8.value(), -10010000);
   VerifyToString(d8, t2, "-100.10000");
 
   overflow = false;
-  d16 = Decimal16Value::FromDouble(t3, -.1, &overflow);
+  d16 = Decimal16Value::FromDouble(t3, -0.1, true, &overflow);
   EXPECT_FALSE(overflow);
   EXPECT_EQ(d16.value(), -1000000000);
   VerifyToString(d16, t3, "-0.1000000000");
 
   // Test overflow
   overflow = false;
-  Decimal4Value::FromDouble(t1, 999999999.123, &overflow);
+  Decimal4Value::FromDouble(t1, 999999999.123, false, &overflow);
   EXPECT_FALSE(overflow);
 
   overflow = false;
-  Decimal4Value::FromDouble(t1, 1234567890.1, &overflow);
+  Decimal4Value::FromDouble(t1, 1234567890.1, false, &overflow);
   EXPECT_TRUE(overflow);
 
   overflow = false;
-  Decimal8Value::FromDouble(t1, -1234567890.123, &overflow);
+  Decimal8Value::FromDouble(t1, -1234567890.123, false, &overflow);
   EXPECT_TRUE(overflow);
 
   overflow = false;
-  Decimal8Value::FromDouble(t2, 99999.1234567, &overflow);
+  Decimal8Value::FromDouble(t2, 99999.1234567, false, &overflow);
   EXPECT_FALSE(overflow);
 
   overflow = false;
-  Decimal8Value::FromDouble(t2, 100000.1, &overflow);
+  Decimal8Value::FromDouble(t2, 100000.1, false, &overflow);
   EXPECT_TRUE(overflow);
 
   overflow = false;
-  Decimal8Value::FromDouble(t2, -123456.123, &overflow);
+  Decimal8Value::FromDouble(t2, -123456.123, false, &overflow);
   EXPECT_TRUE(overflow);
 
   overflow = false;
-  d16 = Decimal16Value::FromDouble(t3, 0.1234, &overflow);
+  d16 = Decimal16Value::FromDouble(t3, 0.1234, true, &overflow);
   EXPECT_FALSE(overflow);
   VerifyToString(d16, t3, "0.1234000000");
 
   overflow = false;
-  Decimal16Value::FromDouble(t3, 1.1, &overflow);
+  Decimal16Value::FromDouble(t3, 1.1, true, &overflow);
   EXPECT_TRUE(overflow);
 
   overflow = false;
-  Decimal16Value::FromDouble(t3, -1.1, &overflow);
+  Decimal16Value::FromDouble(t3, -1.1, true, &overflow);
+  EXPECT_TRUE(overflow);
+
+  overflow = false;
+  Decimal16Value::FromDouble(t3, 0.99999999999, false, &overflow);
+  EXPECT_FALSE(overflow);
+
+  overflow = false;
+  Decimal16Value::FromDouble(t3, 0.99999999999, true, &overflow);
   EXPECT_TRUE(overflow);
+
+  overflow = false;
+  Decimal16Value::FromDouble(t3, -0.99999999999, false, &overflow);
+  EXPECT_FALSE(overflow);
+
+  overflow = false;
+  Decimal16Value::FromDouble(t3, -0.99999999999, true, &overflow);
+  EXPECT_TRUE(overflow);
+  overflow = false;
+
+  // Test half rounding behavior
+  d4 = Decimal4Value::FromDouble(t4, 0.499999999, true, &overflow);
+  EXPECT_FALSE(overflow);
+  EXPECT_EQ(d4.value(), 0);
+  VerifyToString(d4, t4, "0");
+
+  d4 = Decimal4Value::FromDouble(t4, 0.5, true, &overflow);
+  EXPECT_FALSE(overflow);
+  EXPECT_EQ(d4.value(), 1);
+  VerifyToString(d4, t4, "1");
+
+  d4 = Decimal4Value::FromDouble(t4, -0.499999999, true, &overflow);
+  EXPECT_FALSE(overflow);
+  EXPECT_EQ(d4.value(), 0);
+  VerifyToString(d4, t4, "0");
+
+  d4 = Decimal4Value::FromDouble(t4, -0.5, true, &overflow);
+  EXPECT_FALSE(overflow);
+  EXPECT_EQ(d4.value(), -1);
+  VerifyToString(d4, t4, "-1");
 }
 
 TEST(StringToDecimal, Basic) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d2c540f1/be/src/runtime/decimal-value.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/decimal-value.h b/be/src/runtime/decimal-value.h
index 86fc71b..23c8c6f 100644
--- a/be/src/runtime/decimal-value.h
+++ b/be/src/runtime/decimal-value.h
@@ -49,13 +49,15 @@ class DecimalValue {
     return *this;
   }
 
-  /// Returns the closest Decimal to 'd' of type 't', truncating digits that
-  /// cannot be represented.
-  static inline DecimalValue FromDouble(const ColumnType& t, double d, bool* overflow) {
-    return FromDouble(t.precision, t.scale, d, overflow);
+  /// Returns the closest Decimal to 'd' of type 't', rounding to the nearest integer
+  /// if 'round' is true, truncating the decimal places otherwise
+  static inline DecimalValue FromDouble(const ColumnType& t, double d, bool round,
+      bool* overflow) {
+    return FromDouble(t.precision, t.scale, d, round, overflow);
   }
 
-  static inline DecimalValue FromDouble(int precision, int scale, double d, bool* overflow);
+  static inline DecimalValue FromDouble(int precision, int scale, double d,
+      bool round, bool* overflow);
 
   /// Assigns *result as a decimal.
   static inline DecimalValue FromInt(const ColumnType& t, int64_t d, bool* overflow) {
@@ -226,6 +228,13 @@ class DecimalValue {
 
   inline const T fractional_part(int scale) const;
 
+  /// Returns the value as an integer, setting overflow to true on overflow,
+  /// and leaving unchanged otherwise.  Rounds to the nearest integer, defined
+  /// as half / round away from zero.  Template parameter RESULT_T should be a
+  /// UDF Val type which defines an integer underlying type as underlying_type_t
+  template <typename RESULT_T>
+  inline typename RESULT_T::underlying_type_t ToInt(int scale, bool* overflow) const;
+
   /// Returns an approximate double for this decimal.
   inline double ToDouble(const ColumnType& type) const {
     return ToDouble(type.scale);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d2c540f1/be/src/runtime/decimal-value.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/decimal-value.inline.h b/be/src/runtime/decimal-value.inline.h
index e7e89aa..40b1119 100644
--- a/be/src/runtime/decimal-value.inline.h
+++ b/be/src/runtime/decimal-value.inline.h
@@ -20,7 +20,9 @@
 
 #include "runtime/decimal-value.h"
 
+#include <cmath>
 #include <iomanip>
+#include <limits>
 #include <ostream>
 #include <sstream>
 
@@ -32,17 +34,29 @@ namespace impala {
 
 template<typename T>
 inline DecimalValue<T> DecimalValue<T>::FromDouble(int precision, int scale, double d,
-    bool* overflow) {
-  // Check overflow.
-  T max_value = DecimalUtil::GetScaleMultiplier<T>(precision - scale);
-  if (abs(d) >= max_value) {
+    bool round, bool* overflow) {
+
+  // Multiply the double by the scale.
+  // Unfortunately, this conversion is not exact, and there is a loss of precision.
+  // Despite having enough bits in the mantissa to represent all the leading bits in
+  // powers of 10 up to around 10**38, the conversion done is still inexact.  Writing
+  // literals directly as 1.0e23 produces exactly the same number.  The error starts
+  // around 1.0e23 and can take either positive or negative values.  This means the
+  // multiplication can cause an unwanted decimal overflow.
+  d *= DecimalUtil::GetScaleMultiplier<double>(scale);
+
+  // Decimal V2 behavior
+  // TODO: IMPALA-4924: remove DECIMAL V1 code
+  if (round) d = std::round(d);
+
+  const T max_value = DecimalUtil::GetScaleMultiplier<T>(precision);
+  DCHECK(max_value > 0);  // no DCHECK_GT because of int128_t
+  if (UNLIKELY(std::isnan(d)) || UNLIKELY(std::fabs(d) >= max_value)) {
     *overflow = true;
     return DecimalValue();
   }
 
-  // Multiply the double by the scale.
-  d *= DecimalUtil::GetScaleMultiplier<double>(scale);
-  // Truncate and just take the integer part.
+  // Return the rounded or truncated integer part.
   return DecimalValue(static_cast<T>(d));
 }
 
@@ -77,6 +91,36 @@ inline const T DecimalValue<T>::fractional_part(int scale) const {
   return abs(value()) % DecimalUtil::GetScaleMultiplier<T>(scale);
 }
 
+// Note: this expects RESULT_T to be a UDF AnyVal subclass which defines
+// RESULT_T::underlying_type_t to be the representative type
+template<typename T>
+template<typename RESULT_T>
+inline typename RESULT_T::underlying_type_t DecimalValue<T>::ToInt(int scale,
+    bool* overflow) const {
+  const T divisor = DecimalUtil::GetScaleMultiplier<T>(scale);
+  const T v = value();
+  T result;
+  if (divisor == 1) {
+    result = v;
+  } else {
+    result = v / divisor;
+    const T remainder = v % divisor;
+    // Divisor is always a multiple of 2, so no loss of precision when shifting down
+    DCHECK(divisor % 2 == 0);  // No DCHECK_EQ as this is possibly an int128_t
+    // N.B. also - no std::abs for int128_t
+    if (abs(remainder) >= divisor >> 1) {
+      // Round away from zero.
+      // Note this trick works with both signed and unsigned types
+      const int shift = std::numeric_limits<T>::digits;
+      result += 1 | (result >> shift);
+    }
+  }
+  *overflow |=
+      result > std::numeric_limits<typename RESULT_T::underlying_type_t>::max() ||
+      result < std::numeric_limits<typename RESULT_T::underlying_type_t>::min();
+  return result;
+}
+
 template<typename T>
 inline DecimalValue<T> DecimalValue<T>::ScaleTo(int src_scale, int dst_scale,
     int dst_precision, bool* overflow) const {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d2c540f1/be/src/udf/udf.h
----------------------------------------------------------------------
diff --git a/be/src/udf/udf.h b/be/src/udf/udf.h
index d4d6602..732f767 100644
--- a/be/src/udf/udf.h
+++ b/be/src/udf/udf.h
@@ -417,9 +417,10 @@ struct BooleanVal : public AnyVal {
 };
 
 struct TinyIntVal : public AnyVal {
-  int8_t val;
+  using underlying_type_t = int8_t;
+  underlying_type_t val;
 
-  TinyIntVal(int8_t val = 0) : val(val) { }
+  TinyIntVal(underlying_type_t val = 0) : val(val) { }
 
   static TinyIntVal null() {
     TinyIntVal result;
@@ -436,9 +437,10 @@ struct TinyIntVal : public AnyVal {
 };
 
 struct SmallIntVal : public AnyVal {
-  int16_t val;
+  using underlying_type_t = int16_t;
+  underlying_type_t val;
 
-  SmallIntVal(int16_t val = 0) : val(val) { }
+  SmallIntVal(underlying_type_t val = 0) : val(val) { }
 
   static SmallIntVal null() {
     SmallIntVal result;
@@ -455,9 +457,10 @@ struct SmallIntVal : public AnyVal {
 };
 
 struct IntVal : public AnyVal {
-  int32_t val;
+  using underlying_type_t = int32_t;
+  underlying_type_t val;
 
-  IntVal(int32_t val = 0) : val(val) { }
+  IntVal(underlying_type_t val = 0) : val(val) { }
 
   static IntVal null() {
     IntVal result;
@@ -474,9 +477,10 @@ struct IntVal : public AnyVal {
 };
 
 struct BigIntVal : public AnyVal {
-  int64_t val;
+  using underlying_type_t = int64_t;
+  underlying_type_t val;
 
-  BigIntVal(int64_t val = 0) : val(val) { }
+  BigIntVal(underlying_type_t val = 0) : val(val) { }
 
   static BigIntVal null() {
     BigIntVal result;


[3/4] incubator-impala git commit: IMPALA-4955: Fix integer overflow in hdfs table size accounting

Posted by jr...@apache.org.
IMPALA-4955: Fix integer overflow in hdfs table size accounting

We incorrectly use integer type instead of a long for a variable
that tracks the partition data size and that is overflowing.

Testing: Couldn't reproduce it locally but the perf build that
was hitting this over TPCDS scale 1000 dataset is green with this
fix.

Change-Id: I8ee568a72ac038464cfb3e4c225f130770dd8d0f
Reviewed-on: http://gerrit.cloudera.org:8080/6133
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/013456d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/013456d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/013456d3

Branch: refs/heads/master
Commit: 013456d3b462ec2af4e026538848fcad79e98531
Parents: d2c540f
Author: Bharath Vissapragada <bh...@cloudera.com>
Authored: Thu Feb 23 15:04:11 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Feb 24 09:26:25 2017 +0000

----------------------------------------------------------------------
 fe/src/main/java/org/apache/impala/catalog/HdfsTable.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/013456d3/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 6096ba9..fa6a1b7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -815,7 +815,7 @@ public class HdfsTable extends Table {
       // Iterate through the current snapshot of the partition directory listing to
       // figure out files that were newly added/modified.
       List<FileDescriptor> newFileDescs = Lists.newArrayList();
-      int newPartSizeBytes = 0;
+      long newPartSizeBytes = 0;
       for (FileStatus fileStatus : fs.listStatus(partDir)) {
         if (!FileSystemUtil.isValidDataFile(fileStatus)) continue;
         String fileName = fileStatus.getPath().getName().toString();