You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2022/08/19 14:43:38 UTC

[impala] branch master updated (9518ee290 -> 51126259b)

This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from 9518ee290 IMPALA-10800: Tidy up the be/src/exec directory
     new 7ca11dfc7 IMPALA-9482: Support for BINARY columns
     new 51126259b IMPALA-11436: Change search bind authentication parameters

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/file-metadata-utils.cc                 |   4 +-
 be/src/exec/hbase/hbase-scan-node.cc               |  21 +--
 be/src/exec/hbase/hbase-scan-node.h                |   3 +
 be/src/exec/hbase/hbase-table-writer.cc            |  13 +-
 be/src/exec/hdfs-scanner-ir.cc                     |   8 +-
 be/src/exec/hdfs-scanner.cc                        |  15 ++-
 be/src/exec/hdfs-text-table-writer.cc              |  16 ++-
 be/src/exec/orc/orc-metadata-utils.cc              |   2 +
 be/src/exec/parquet/hdfs-parquet-table-writer.cc   |   4 +-
 be/src/exec/parquet/parquet-metadata-utils.cc      |   3 +-
 be/src/exec/parquet/parquet-metadata-utils.h       |   2 +-
 be/src/exec/rcfile/hdfs-rcfile-scanner.cc          |   6 +-
 be/src/exec/text-converter.cc                      |  14 +-
 be/src/exec/text-converter.h                       |  15 ++-
 be/src/exec/text-converter.inline.h                |  20 ++-
 be/src/exec/text/hdfs-text-scanner.cc              |   8 +-
 be/src/exprs/expr-test.cc                          | 115 ++++++++++++----
 be/src/exprs/utility-functions-ir.cc               |   5 +
 be/src/exprs/utility-functions.h                   |   4 +
 be/src/runtime/descriptors.cc                      |   3 +-
 be/src/runtime/descriptors.h                       |  13 ++
 be/src/runtime/types.cc                            | 116 ++++------------
 be/src/runtime/types.h                             |  30 ++++-
 be/src/service/hs2-util.cc                         | 136 +++++++++++++++++--
 be/src/service/hs2-util.h                          |   3 +
 be/src/service/impala-beeswax-server.cc            |   5 +-
 be/src/service/impala-hs2-server.cc                |   4 +-
 be/src/service/query-result-set.cc                 |   9 ++
 be/src/testutil/test-udfs.cc                       |   5 +-
 be/src/util/coding-util.cc                         |   2 +-
 be/src/util/coding-util.h                          |   1 +
 be/src/util/ldap-search-bind.cc                    |  27 ++--
 bin/rat_exclude_files.txt                          |   1 +
 common/function-registry/impala_functions.py       |  10 ++
 docs/topics/impala_ldap.xml                        |  18 ++-
 .../java/org/apache/impala/analysis/Analyzer.java  |   4 +
 .../java/org/apache/impala/analysis/CastExpr.java  |  13 ++
 .../apache/impala/analysis/ComputeStatsStmt.java   |  24 ++--
 .../org/apache/impala/analysis/InPredicate.java    |   3 +-
 .../org/apache/impala/analysis/LikePredicate.java  |   8 +-
 .../org/apache/impala/analysis/LiteralExpr.java    |   2 +
 .../java/org/apache/impala/analysis/SlotRef.java   |   4 +
 .../java/org/apache/impala/catalog/BuiltinsDb.java |  18 +++
 .../org/apache/impala/catalog/ColumnStats.java     |  17 ++-
 .../java/org/apache/impala/catalog/Function.java   |   1 +
 .../org/apache/impala/catalog/PrimitiveType.java   |   4 +-
 .../org/apache/impala/catalog/ScalarFunction.java  |   1 +
 .../java/org/apache/impala/catalog/ScalarType.java |  11 +-
 .../main/java/org/apache/impala/catalog/Type.java  |  20 ++-
 .../hive/executor/HiveGenericJavaFunction.java     |   2 +
 .../hive/executor/HiveLegacyJavaFunction.java      |   6 +-
 .../impala/hive/executor/JavaUdfDataType.java      |  17 +++
 .../apache/impala/util/AvroSchemaConverter.java    |   1 +
 .../org/apache/impala/util/AvroSchemaParser.java   |   6 +-
 .../org/apache/impala/util/AvroSchemaUtils.java    |   3 +-
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |   7 +-
 .../apache/impala/analysis/AnalyzeExprsTest.java   |  30 ++++-
 .../apache/impala/analysis/AnalyzeStmtsTest.java   |  21 ++-
 .../impala/analysis/AnalyzeSubqueriesTest.java     |   4 +
 .../org/apache/impala/analysis/AnalyzerTest.java   |  58 ++++----
 .../org/apache/impala/analysis/AuditingTest.java   |   8 +-
 .../impala/analysis/ExprRewriteRulesTest.java      |   2 +
 .../apache/impala/analysis/LiteralExprTest.java    |   1 +
 .../java/org/apache/impala/analysis/ToSqlTest.java |   4 +
 .../org/apache/impala/catalog/CatalogTest.java     |  26 ++++
 .../impala/catalog/PartialCatalogInfoTest.java     |  45 ++++---
 .../impala/catalog/local/LocalCatalogTest.java     |  42 ++++++
 .../org/apache/impala/common/FrontendTestBase.java |   2 +
 .../LdapSearchBindImpalaShellTest.java             |   8 +-
 .../impala/hive/executor/TestGenericUdf.java       |  24 +++-
 .../java/org/apache/impala/service/JdbcTest.java   |  14 ++
 .../java/org/apache/impala/TestGenericUdf.java     |  24 +++-
 testdata/UnsupportedTypes/data.csv                 |   5 -
 testdata/bin/compute-table-stats.sh                |   2 +-
 testdata/bin/generate-schema-statements.py         |   1 +
 testdata/data/binary_tbl/000000_0.txt              |   8 ++
 .../functional/functional_schema_template.sql      |  98 ++++++++++----
 .../datasets/functional/schema_constraints.csv     |  10 +-
 .../queries/QueryTest/binary-type.test             | 149 +++++++++++++++++++++
 .../queries/QueryTest/generic-java-udf.test        |   7 +
 .../queries/QueryTest/hbase-inserts.test           |  37 +++--
 .../queries/QueryTest/java-udf.test                |  14 ++
 .../queries/QueryTest/load-generic-java-udfs.test  |   4 +
 .../queries/QueryTest/load-java-udfs.test          |   4 +
 .../functional-query/queries/QueryTest/misc.test   |  22 +--
 .../QueryTest/nested-types-scanner-basic.test      |  34 +++++
 .../functional-query/queries/QueryTest/udf.test    |  17 ++-
 tests/common/test_result_verifier.py               |   2 +-
 tests/custom_cluster/test_permanent_udfs.py        |   8 +-
 tests/hs2/test_fetch.py                            |  78 +++++------
 tests/hs2/test_hs2.py                              |   2 +-
 tests/query_test/test_scanners.py                  |  16 +++
 tests/query_test/test_udfs.py                      |  12 +-
 93 files changed, 1267 insertions(+), 409 deletions(-)
 delete mode 100644 testdata/UnsupportedTypes/data.csv
 create mode 100644 testdata/data/binary_tbl/000000_0.txt
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/binary-type.test


[impala] 02/02: IMPALA-11436: Change search bind authentication parameters

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 51126259bea2491316decef63d584f713d02a9a6
Author: Tamas Mate <tm...@apache.org>
AuthorDate: Fri Aug 5 15:07:23 2022 +0200

    IMPALA-11436: Change search bind authentication parameters
    
    Impala's search bind authentication intends to mimic Spring's behaviour.
    However, the login username and user dn paremeters were swapped for
    group searches compared to Spring. This change intends to align these
    parameters.
    
    For user search, Spring uses {0} to replace the login username.
    Meanwhile, during group search {0} is used to replace the login user dn
    and {1} is used to replace the login username.
    
    Testing:
     - Ran LdapSearchBindImpalaShellTest frontend tests
    
    Change-Id: I9808566a348f7c6200b0571fbc05e67f720f2075
    Reviewed-on: http://gerrit.cloudera.org:8080/18819
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/util/ldap-search-bind.cc                    | 27 +++++++++++-----------
 docs/topics/impala_ldap.xml                        | 18 ++++++++++-----
 .../LdapSearchBindImpalaShellTest.java             |  8 +++----
 3 files changed, 30 insertions(+), 23 deletions(-)

diff --git a/be/src/util/ldap-search-bind.cc b/be/src/util/ldap-search-bind.cc
index 9c47965d1..d3c6d9c1d 100644
--- a/be/src/util/ldap-search-bind.cc
+++ b/be/src/util/ldap-search-bind.cc
@@ -42,11 +42,12 @@ using std::string;
 namespace impala {
 
 // Permitted patterns in the user/group filter
-const string USER_NAME_PATTERN = "{0}";
-const string USER_DN_PATTERN = "{1}";
+const string USER_SEARCH_LOGIN_NAME_PATTERN = "{0}";
+const string GROUP_SEARCH_LOGIN_NAME_PATTERN = "{1}";
+const string GROUP_SEARCH_USER_DN_PATTERN = "{0}";
 // Default ldap filters
 const string DEFAULT_USER_FILTER = "(&(objectClass=user)(sAMAccountName={0}))";
-const string DEFAULT_GROUP_FILTER = "(&(objectClass=group)(member={1})";
+const string DEFAULT_GROUP_FILTER = "(&(objectClass=group)(member={0})";
 
 Status LdapSearchBind::ValidateFlags() {
   RETURN_IF_ERROR(ImpalaLdap::ValidateFlags());
@@ -92,10 +93,10 @@ bool LdapSearchBind::LdapCheckPass(const char* user, const char* pass, unsigned
   if (!success) return false;
   VLOG(2) << "LDAP bind successful";
 
-  // Escape special characters and replace the USER_NAME_PATTERN in the filter.
+  // Escape special characters and replace the USER_SEARCH_LOGIN_NAME_PATTERN.
   string filter = string(user_filter_);
   string escaped_user = EscapeFilterProperty(user);
-  replace_all(filter, USER_NAME_PATTERN, escaped_user);
+  replace_all(filter, USER_SEARCH_LOGIN_NAME_PATTERN, escaped_user);
 
   // Execute the LDAP search and try to retrieve the user dn
   VLOG(1) << "Trying LDAP user search for: " << user;
@@ -134,15 +135,15 @@ bool LdapSearchBind::LdapCheckFilters(string username) {
   if (!success) return false;
   VLOG(2) << "LDAP bind successful";
 
-  // Substitute the USER_NAME_PATTERN and USER_DN_PATTERN patterns for the group search.
-  // USER_DN_PATTERN requires to determine the user dn and therefore an additional LDAP
-  // search.
+  // Substitute the USER_SEARCH_LOGIN_NAME_PATTERN and GROUP_SEARCH_USER_DN_PATTERN
+  // patterns for the group search. This needs an additional LDAP search to determine
+  // the GROUP_SEARCH_USER_DN_PATTERN.
   string group_filter = group_filter_;
   string escaped_username = EscapeFilterProperty(username);
-  replace_all(group_filter, USER_NAME_PATTERN, escaped_username);
-  if (group_filter.find(USER_DN_PATTERN) != string::npos) {
+  replace_all(group_filter, GROUP_SEARCH_LOGIN_NAME_PATTERN, escaped_username);
+  if (group_filter.find(GROUP_SEARCH_USER_DN_PATTERN) != string::npos) {
     string user_filter = user_filter_;
-    replace_all(user_filter, USER_NAME_PATTERN, escaped_username);
+    replace_all(user_filter, USER_SEARCH_LOGIN_NAME_PATTERN, escaped_username);
     vector<string> user_dns =
         LdapSearchObject(ld, FLAGS_ldap_user_search_basedn.c_str(), user_filter.c_str());
     if (user_dns.size() != 1) {
@@ -153,9 +154,9 @@ bool LdapSearchBind::LdapCheckFilters(string username) {
       ldap_unbind_ext(ld, nullptr, nullptr);
       return false;
     }
-    // Escape the characters in the the DN then replace the USER_DN_PATTERN.
+    // Escape the characters in the the DN then replace the GROUP_SEARCH_DN_PATTERN.
     string escaped_user_dn = EscapeFilterProperty(user_dns[0]);
-    replace_all(group_filter, USER_DN_PATTERN, escaped_user_dn);
+    replace_all(group_filter, GROUP_SEARCH_USER_DN_PATTERN, escaped_user_dn);
   }
 
   // Execute LDAP search for the group
diff --git a/docs/topics/impala_ldap.xml b/docs/topics/impala_ldap.xml
index e4d651a7e..6bac3614f 100644
--- a/docs/topics/impala_ldap.xml
+++ b/docs/topics/impala_ldap.xml
@@ -323,7 +323,7 @@ under the License.
 
           <dd>
             <p>
-              A comma separated list of user names. If specified, users must be
+              A comma separated list of usernames. If specified, users must be
               on this list for authentication to succeed
             </p>
           </dd>
@@ -459,7 +459,7 @@ under the License.
           <p>
             LDAP filter that will be used during LDAP search, it can contain
             <codeph>{0}</codeph> pattern which will be replaced with the
-            user name. The default value is <codeph>
+            username. The default value is <codeph>
             (&amp;(objectClass=user)(sAMAccountName={0}))</codeph>.
           </p>
         </dd>
@@ -475,11 +475,17 @@ under the License.
         <dd>
           <p>
             LDAP filter that will be used during LDAP group search, it can
-            contain <codeph>{0}</codeph> pattern which will be replaced with
-            the user name and/or <codeph>{1}</codeph> which will be replaced
+            contain <codeph>{1}</codeph> pattern which will be replaced with
+            the username and/or <codeph>{0}</codeph> which will be replaced
             with the user DN. The default value is <codeph>
-            (&amp;(objectClass=group)(member={1})</codeph>.
+            (&amp;(objectClass=group)(member={0})</codeph>.
           </p>
+          <note>
+            The behaviour of this flag has been changed between Impala 4.1 and Impala 4.2
+            to comply with Spring LDAP. Previously <codeph>{0}</codeph> was for username
+            and <codeph>{1}</codeph> for user dn, these paramters were swapped, now
+            <codeph>{0}</codeph> is for user dn and <codeph>{1}</codeph> is for username.
+          </note>
         </dd>
 
       </dlentry>
@@ -617,7 +623,7 @@ under the License.
               Sets the user. Per Active Directory, the user is the short username, not the full
               LDAP distinguished name. If your LDAP settings include a search base, use the
               <codeph>--ldap_bind_pattern</codeph> on the <cmdname>impalad</cmdname> daemon to
-              translate the short user name from <cmdname>impala-shell</cmdname> automatically
+              translate the short username from <cmdname>impala-shell</cmdname> automatically
               to the fully qualified name.
             </p>
           </dd>
diff --git a/fe/src/test/java/org/apache/impala/customcluster/LdapSearchBindImpalaShellTest.java b/fe/src/test/java/org/apache/impala/customcluster/LdapSearchBindImpalaShellTest.java
index b92a8f9bf..38d74540c 100644
--- a/fe/src/test/java/org/apache/impala/customcluster/LdapSearchBindImpalaShellTest.java
+++ b/fe/src/test/java/org/apache/impala/customcluster/LdapSearchBindImpalaShellTest.java
@@ -71,7 +71,7 @@ public class LdapSearchBindImpalaShellTest extends LdapImpalaShellTest {
     setUp(String.format("--ldap_user_search_basedn=dc=myorg,dc=com "
             + "--ldap_group_search_basedn=ou=Groups,dc=myorg,dc=com "
             + "--ldap_user_filter=(&(objectClass=person)(cn={0})(!(cn=%s))) "
-            + "--ldap_group_filter=(uniqueMember={1})",
+            + "--ldap_group_filter=(uniqueMember={0})",
         TEST_USER_2));
     testLdapFiltersImpl();
   }
@@ -89,7 +89,7 @@ public class LdapSearchBindImpalaShellTest extends LdapImpalaShellTest {
     setUp(String.format("--ldap_user_search_basedn=dc=myorg,dc=com "
             + "--ldap_group_search_basedn=ou=Groups,dc=myorg,dc=com "
             + "--ldap_user_filter=(&(objectClass=person)(cn={0})(!(cn=%s))) "
-            + "--ldap_group_filter=(&(cn=%s)(uniqueMember={1}))",
+            + "--ldap_group_filter=(&(cn=%s)(uniqueMember={0}))",
         TEST_USER_2, TEST_USER_GROUP));
     testLdapFiltersImpl();
   }
@@ -107,7 +107,7 @@ public class LdapSearchBindImpalaShellTest extends LdapImpalaShellTest {
     setUp(String.format("--ldap_user_search_basedn=dc=myorg,dc=com "
             + "--ldap_group_search_basedn=ou=Groups,dc=myorg,dc=com "
             + "--ldap_user_filter=(&(objectClass=person)(cn={0})(!(cn=Test2Ldap))) "
-            + "--ldap_group_filter=(&(cn=group1)(uniqueMember={1})) "
+            + "--ldap_group_filter=(&(cn=group1)(uniqueMember={0})) "
             + "--authorized_proxy_user_config=%s=* ",
         TEST_USER_4));
     testLdapFiltersWithProxyImpl();
@@ -145,7 +145,7 @@ public class LdapSearchBindImpalaShellTest extends LdapImpalaShellTest {
     setUp("--ldap_user_search_basedn=dc=myorg,dc=com "
         + "--ldap_group_search_basedn=ou=Groups,dc=myorg,dc=com "
         + "--ldap_user_filter=(cn={0}) "
-        + "--ldap_group_filter=(uniqueMember={1}) ");
+        + "--ldap_group_filter=(uniqueMember={0}) ");
     String query = "select logged_in_user()";
 
     // Authentications should succeed with user who has escaped character in its DN


[impala] 01/02: IMPALA-9482: Support for BINARY columns

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 7ca11dfc7fc26646f2e7194be74ba9739ea842bf
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Fri Jun 5 00:48:33 2020 +0200

    IMPALA-9482: Support for BINARY columns
    
    This patch adds support for BINARY columns for all table formats with
    the exception of Kudu.
    
    In Hive the main difference between STRING and BINARY is that STRING is
    assumed to be UTF8 encoded, while BINARY can be any byte array.
    Some other differences in Hive:
    - BINARY can be only cast from/to STRING
    - Only a small subset of built-in STRING functions support BINARY.
    - In several file formats (e.g. text) BINARY is base64 encoded.
    - No NDV is calculated during COMPUTE STATISTICS.
    
    As Impala doesn't treat STRINGs as UTF8, BINARY and STRING become nearly
    identical, especially from the backend's perspective. For this reason,
    BINARY is implemented a bit differently compared to other types:
    while the frontend treats STRING and BINARY as two separate types, most
    of the backend uses PrimitiveType::TYPE_STRING for BINARY too, e.g.
    in SlotDesc. Only the following parts of backend need to differentiate
    between STRING and BINARY:
    - table scanners
    - table writers
    - HS2/Beeswax service
    These parts have access to column metadata, which allows to add special
    handling for BINARY.
    
    Only a very few builtins are allowed for BINARY at the moment:
    - length
    - min/max/count
    - coalesce and similar "selector" functions
    Other STRING functions can be only used by casting to STRING first.
    Adding support for more of these functions is very easy, as simply
    the BINARY type has to be "connected" to the already existing STRING
    function's signature. Functions where the result depends on utf8_mode
    need to ensure that with BINARY it always works as if utf8_mode=0 (for
    example length() is mapped to bytes() as length count utf8 chars if
    utf8_mode=1).
    
    All kinds of UDFs (native, Hive legacy, Hive generic) support BINARY,
    though in case of legacy Hive UDFs it is only supported if the argument
    and return types are set explicitely to ensure backward compatibility.
    See IMPALA-11340 for details.
    
    The original plan was to behave as close to Hive as possible, but I
    realized that Hive has more relaxed casting rules than Impala, which
    led to STRING<->BINARY casts being necessary in more cases in Impala.
    This was needed to disallow passing a BINARY to functions that expect
    a STRING argument. An example for the difference is that in
    INSERT ... VALUES () string literals need to be explicitly cast to
    BINARY, while this is not needed in Hive.
    
    Testing:
    - Added functional.binary_tbl for all file formats (except Kudu)
      to test scanning.
    - Removed functional.unsupported_types and related tests, as now
      Impala supports all (non-complex) types that Hive does.
    - Added FE/EE tests mainly based on the ones added to the DATE type
    
    Change-Id: I36861a9ca6c2047b0d76862507c86f7f153bc582
    Reviewed-on: http://gerrit.cloudera.org:8080/16066
    Reviewed-by: Quanlong Huang <hu...@gmail.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/file-metadata-utils.cc                 |   4 +-
 be/src/exec/hbase/hbase-scan-node.cc               |  21 +--
 be/src/exec/hbase/hbase-scan-node.h                |   3 +
 be/src/exec/hbase/hbase-table-writer.cc            |  13 +-
 be/src/exec/hdfs-scanner-ir.cc                     |   8 +-
 be/src/exec/hdfs-scanner.cc                        |  15 ++-
 be/src/exec/hdfs-text-table-writer.cc              |  16 ++-
 be/src/exec/orc/orc-metadata-utils.cc              |   2 +
 be/src/exec/parquet/hdfs-parquet-table-writer.cc   |   4 +-
 be/src/exec/parquet/parquet-metadata-utils.cc      |   3 +-
 be/src/exec/parquet/parquet-metadata-utils.h       |   2 +-
 be/src/exec/rcfile/hdfs-rcfile-scanner.cc          |   6 +-
 be/src/exec/text-converter.cc                      |  14 +-
 be/src/exec/text-converter.h                       |  15 ++-
 be/src/exec/text-converter.inline.h                |  20 ++-
 be/src/exec/text/hdfs-text-scanner.cc              |   8 +-
 be/src/exprs/expr-test.cc                          | 115 ++++++++++++----
 be/src/exprs/utility-functions-ir.cc               |   5 +
 be/src/exprs/utility-functions.h                   |   4 +
 be/src/runtime/descriptors.cc                      |   3 +-
 be/src/runtime/descriptors.h                       |  13 ++
 be/src/runtime/types.cc                            | 116 ++++------------
 be/src/runtime/types.h                             |  30 ++++-
 be/src/service/hs2-util.cc                         | 136 +++++++++++++++++--
 be/src/service/hs2-util.h                          |   3 +
 be/src/service/impala-beeswax-server.cc            |   5 +-
 be/src/service/impala-hs2-server.cc                |   4 +-
 be/src/service/query-result-set.cc                 |   9 ++
 be/src/testutil/test-udfs.cc                       |   5 +-
 be/src/util/coding-util.cc                         |   2 +-
 be/src/util/coding-util.h                          |   1 +
 bin/rat_exclude_files.txt                          |   1 +
 common/function-registry/impala_functions.py       |  10 ++
 .../java/org/apache/impala/analysis/Analyzer.java  |   4 +
 .../java/org/apache/impala/analysis/CastExpr.java  |  13 ++
 .../apache/impala/analysis/ComputeStatsStmt.java   |  24 ++--
 .../org/apache/impala/analysis/InPredicate.java    |   3 +-
 .../org/apache/impala/analysis/LikePredicate.java  |   8 +-
 .../org/apache/impala/analysis/LiteralExpr.java    |   2 +
 .../java/org/apache/impala/analysis/SlotRef.java   |   4 +
 .../java/org/apache/impala/catalog/BuiltinsDb.java |  18 +++
 .../org/apache/impala/catalog/ColumnStats.java     |  17 ++-
 .../java/org/apache/impala/catalog/Function.java   |   1 +
 .../org/apache/impala/catalog/PrimitiveType.java   |   4 +-
 .../org/apache/impala/catalog/ScalarFunction.java  |   1 +
 .../java/org/apache/impala/catalog/ScalarType.java |  11 +-
 .../main/java/org/apache/impala/catalog/Type.java  |  20 ++-
 .../hive/executor/HiveGenericJavaFunction.java     |   2 +
 .../hive/executor/HiveLegacyJavaFunction.java      |   6 +-
 .../impala/hive/executor/JavaUdfDataType.java      |  17 +++
 .../apache/impala/util/AvroSchemaConverter.java    |   1 +
 .../org/apache/impala/util/AvroSchemaParser.java   |   6 +-
 .../org/apache/impala/util/AvroSchemaUtils.java    |   3 +-
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |   7 +-
 .../apache/impala/analysis/AnalyzeExprsTest.java   |  30 ++++-
 .../apache/impala/analysis/AnalyzeStmtsTest.java   |  21 ++-
 .../impala/analysis/AnalyzeSubqueriesTest.java     |   4 +
 .../org/apache/impala/analysis/AnalyzerTest.java   |  58 ++++----
 .../org/apache/impala/analysis/AuditingTest.java   |   8 +-
 .../impala/analysis/ExprRewriteRulesTest.java      |   2 +
 .../apache/impala/analysis/LiteralExprTest.java    |   1 +
 .../java/org/apache/impala/analysis/ToSqlTest.java |   4 +
 .../org/apache/impala/catalog/CatalogTest.java     |  26 ++++
 .../impala/catalog/PartialCatalogInfoTest.java     |  45 ++++---
 .../impala/catalog/local/LocalCatalogTest.java     |  42 ++++++
 .../org/apache/impala/common/FrontendTestBase.java |   2 +
 .../impala/hive/executor/TestGenericUdf.java       |  24 +++-
 .../java/org/apache/impala/service/JdbcTest.java   |  14 ++
 .../java/org/apache/impala/TestGenericUdf.java     |  24 +++-
 testdata/UnsupportedTypes/data.csv                 |   5 -
 testdata/bin/compute-table-stats.sh                |   2 +-
 testdata/bin/generate-schema-statements.py         |   1 +
 testdata/data/binary_tbl/000000_0.txt              |   8 ++
 .../functional/functional_schema_template.sql      |  98 ++++++++++----
 .../datasets/functional/schema_constraints.csv     |  10 +-
 .../queries/QueryTest/binary-type.test             | 149 +++++++++++++++++++++
 .../queries/QueryTest/generic-java-udf.test        |   7 +
 .../queries/QueryTest/hbase-inserts.test           |  37 +++--
 .../queries/QueryTest/java-udf.test                |  14 ++
 .../queries/QueryTest/load-generic-java-udfs.test  |   4 +
 .../queries/QueryTest/load-java-udfs.test          |   4 +
 .../functional-query/queries/QueryTest/misc.test   |  22 +--
 .../QueryTest/nested-types-scanner-basic.test      |  34 +++++
 .../functional-query/queries/QueryTest/udf.test    |  17 ++-
 tests/common/test_result_verifier.py               |   2 +-
 tests/custom_cluster/test_permanent_udfs.py        |   8 +-
 tests/hs2/test_fetch.py                            |  78 +++++------
 tests/hs2/test_hs2.py                              |   2 +-
 tests/query_test/test_scanners.py                  |  16 +++
 tests/query_test/test_udfs.py                      |  12 +-
 90 files changed, 1237 insertions(+), 386 deletions(-)

diff --git a/be/src/exec/file-metadata-utils.cc b/be/src/exec/file-metadata-utils.cc
index 52891ab77..aeb2ecc57 100644
--- a/be/src/exec/file-metadata-utils.cc
+++ b/be/src/exec/file-metadata-utils.cc
@@ -101,7 +101,9 @@ void FileMetadataUtils::AddIcebergColumns(MemPool* mem_pool, Tuple** template_tu
         continue;
       }
       if (field_id != transform->source_id()) continue;
-      if (!text_converter.WriteSlot(slot_desc, *template_tuple,
+      const AuxColumnType& aux_type =
+          scan_node_->hdfs_table()->GetColumnDesc(slot_desc).auxType();
+      if (!text_converter.WriteSlot(slot_desc, &aux_type, *template_tuple,
                                     transform->transform_value()->c_str(),
                                     transform->transform_value()->size(),
                                     true, false,
diff --git a/be/src/exec/hbase/hbase-scan-node.cc b/be/src/exec/hbase/hbase-scan-node.cc
index 986968507..55d0ab584 100644
--- a/be/src/exec/hbase/hbase-scan-node.cc
+++ b/be/src/exec/hbase/hbase-scan-node.cc
@@ -45,11 +45,12 @@ HBaseScanNode::HBaseScanNode(ObjectPool* pool, const ScanPlanNode& pnode,
     : ScanNode(pool, pnode, descs),
       table_name_(pnode.tnode_->hbase_scan_node.table_name),
       tuple_id_(pnode.tnode_->hbase_scan_node.tuple_id),
-      tuple_desc_(NULL),
+      tuple_desc_(nullptr),
+      hbase_table_(nullptr),
       tuple_idx_(0),
       filters_(pnode.tnode_->hbase_scan_node.filters),
-      hbase_scanner_(NULL),
-      row_key_slot_(NULL),
+      hbase_scanner_(nullptr),
+      row_key_slot_(nullptr),
       row_key_binary_encoded_(false),
       text_converter_(new TextConverter('\\', "", false)),
       suggested_max_caching_(0) {
@@ -78,9 +79,8 @@ Status HBaseScanNode::Prepare(RuntimeState* state) {
   // Here, we re-order the slots from the query by family/qualifier, exploiting the
   // known sort order of the columns retrieved from HBase, to avoid family/qualifier
   // comparisons.
-  const HBaseTableDescriptor* hbase_table =
-      static_cast<const HBaseTableDescriptor*>(tuple_desc_->table_desc());
-  const vector<HBaseTableDescriptor::HBaseColumnDescriptor>& cols = hbase_table->cols();
+  hbase_table_ = static_cast<const HBaseTableDescriptor*>(tuple_desc_->table_desc());
+  const vector<HBaseTableDescriptor::HBaseColumnDescriptor>& cols = hbase_table_->cols();
   const vector<SlotDescriptor*>& slots = tuple_desc_->slots();
   sorted_non_key_slots_.reserve(slots.size());
   for (int i = 0; i < slots.size(); ++i) {
@@ -126,7 +126,7 @@ Status HBaseScanNode::Prepare(RuntimeState* state) {
       sr.set_stop_key(key_range.stopkey());
     }
   }
-  runtime_profile_->AddInfoString("Table Name", hbase_table->fully_qualified_name());
+  runtime_profile_->AddInfoString("Table Name", hbase_table_->fully_qualified_name());
   return Status::OK();
 }
 
@@ -145,9 +145,10 @@ Status HBaseScanNode::Open(RuntimeState* state) {
 
 void HBaseScanNode::WriteTextSlot(
     const string& family, const string& qualifier,
-    void* value, int value_length, SlotDescriptor* slot,
+    void* value, int value_length, SlotDescriptor* slot_desc,
     RuntimeState* state, MemPool* pool, Tuple* tuple, bool* error_in_row) {
-  if (!text_converter_->WriteSlot(slot, tuple,
+  const AuxColumnType& aux_type = hbase_table_->GetColumnDesc(slot_desc).auxType();
+  if (!text_converter_->WriteSlot(slot_desc, &aux_type, tuple,
       reinterpret_cast<char*>(value), value_length, true, false, pool)) {
     *error_in_row = true;
     if (state->LogHasSpace()) {
@@ -155,7 +156,7 @@ void HBaseScanNode::WriteTextSlot(
       ss << "Error converting column " << family
           << ":" << qualifier << ": "
           << "'" << string(reinterpret_cast<char*>(value), value_length) << "' TO "
-          << slot->type();
+          << slot_desc->type();
       state->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
     }
   }
diff --git a/be/src/exec/hbase/hbase-scan-node.h b/be/src/exec/hbase/hbase-scan-node.h
index 88f6b9c74..bbb404c0c 100644
--- a/be/src/exec/hbase/hbase-scan-node.h
+++ b/be/src/exec/hbase/hbase-scan-node.h
@@ -79,6 +79,9 @@ class HBaseScanNode : public ScanNode {
   /// Descriptor of tuples read from HBase table.
   const TupleDescriptor* tuple_desc_;
 
+  /// Descriptor of the HBase table. Set during Prepare().
+  const HBaseTableDescriptor* hbase_table_;
+
   /// Tuple index in tuple row.
   int tuple_idx_;
 
diff --git a/be/src/exec/hbase/hbase-table-writer.cc b/be/src/exec/hbase/hbase-table-writer.cc
index 71f0f0fb0..0f1641c94 100644
--- a/be/src/exec/hbase/hbase-table-writer.cc
+++ b/be/src/exec/hbase/hbase-table-writer.cc
@@ -30,6 +30,7 @@
 #include "runtime/tuple.h"
 #include "runtime/tuple-row.h"
 #include "util/bit-util.h"
+#include "util/coding-util.h"
 #include "util/jni-util.h"
 
 #include "common/names.h"
@@ -129,6 +130,7 @@ Status HBaseTableWriter::AppendRows(RowBatch* batch) {
   // For every TupleRow in the row batch create a put, assign the row key,
   // and add all of the values generated from the expressions.
   string string_value; // text encoded value
+  string base64_encoded_value; // needed for BINARY columns
   char binary_value[8]; // binary encoded value; at most 8 bytes
   const void* data; // pointer to the column value in bytes
   int data_len; // length of the column value in bytes
@@ -152,8 +154,15 @@ Status HBaseTableWriter::AppendRows(RowBatch* batch) {
             // Text encoded
             string_value.clear();
             output_expr_evals_[j]->PrintValue(value, &string_value);
-            data = string_value.data();
-            data_len = string_value.length();
+            const ColumnDescriptor& col_desc = table_desc_->col_descs()[j];
+            if (col_desc.auxType().IsBinaryStringSubtype()) {
+              Base64Encode(string_value , &base64_encoded_value);
+              data = base64_encoded_value.data();
+              data_len = base64_encoded_value.length();
+            } else {
+              data = string_value.data();
+              data_len = string_value.length();
+            }
           } else {
             // Binary encoded
             // Only bool, tinyint, smallint, int, bigint, float and double can be binary
diff --git a/be/src/exec/hdfs-scanner-ir.cc b/be/src/exec/hdfs-scanner-ir.cc
index 41cd8fb8d..0fe3e05c2 100644
--- a/be/src/exec/hdfs-scanner-ir.cc
+++ b/be/src/exec/hdfs-scanner-ir.cc
@@ -124,9 +124,11 @@ bool HdfsScanner::TextConverterWriteSlotInterpretedIR(HdfsScanner* hdfs_scanner,
     need_escape = true;
   }
 
-  SlotDescriptor* desc = hdfs_scanner->scan_node_->materialized_slots()[slot_idx];
-  return hdfs_scanner->text_converter_->WriteSlot(desc, tuple, data, len, copy_string,
-      need_escape, pool);
+  SlotDescriptor* slot_desc = hdfs_scanner->scan_node_->materialized_slots()[slot_idx];
+  const AuxColumnType& auxType =
+      hdfs_scanner->scan_node_->hdfs_table_->GetColumnDesc(slot_desc).auxType();
+  return hdfs_scanner->text_converter_->WriteSlot(slot_desc, &auxType, tuple, data, len,
+       copy_string, need_escape, pool);
 }
 
 // Define the string parsing functions for llvm.  Stamp out the templated functions
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 501203909..1b7e8f47c 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -286,8 +286,10 @@ bool HdfsScanner::WriteCompleteTuple(MemPool* pool, FieldLocation* fields,
       need_escape = true;
     }
 
-    SlotDescriptor* desc = scan_node_->materialized_slots()[i];
-    bool error = !text_converter_->WriteSlot(desc, tuple,
+    const SlotDescriptor* slot_desc = scan_node_->materialized_slots()[i];
+    const AuxColumnType& aux_type =
+        scan_node_->hdfs_table()->GetColumnDesc(slot_desc).auxType();
+    bool error = !text_converter_->WriteSlot(slot_desc, &aux_type, tuple,
         fields[i].start, len, false, need_escape, pool);
     error_fields[i] = error;
     *error_in_row |= error;
@@ -370,9 +372,12 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanPlanNode* node,
     // If the type is CHAR, WriteSlot for this slot cannot be codegen'd. To keep codegen
     // for other things, we call the interpreted code for this slot from the codegen'd
     // code instead of failing codegen. See IMPALA-9747.
-    if (TextConverter::SupportsCodegenWriteSlot(slot_desc->type())) {
-      RETURN_IF_ERROR(TextConverter::CodegenWriteSlot(codegen, tuple_desc, slot_desc, &fn,
-          node->hdfs_table_->null_column_value().data(),
+    const AuxColumnType& aux_type =
+        node->hdfs_table_->GetColumnDesc(slot_desc).auxType();
+    if (TextConverter::SupportsCodegenWriteSlot(
+        slot_desc->type(), aux_type)) {
+      RETURN_IF_ERROR(TextConverter::CodegenWriteSlot(codegen, tuple_desc, slot_desc,
+          &aux_type, &fn, node->hdfs_table_->null_column_value().data(),
           node->hdfs_table_->null_column_value().size(), true,
           state->query_options().strict_mode));
       if (i >= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) codegen->SetNoInline(fn);
diff --git a/be/src/exec/hdfs-text-table-writer.cc b/be/src/exec/hdfs-text-table-writer.cc
index 84fce8668..aabbf1921 100644
--- a/be/src/exec/hdfs-text-table-writer.cc
+++ b/be/src/exec/hdfs-text-table-writer.cc
@@ -25,6 +25,7 @@
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/string-value.inline.h"
+#include "util/coding-util.h"
 #include "util/hdfs-util.h"
 #include "util/runtime-profile-counters.h"
 
@@ -75,8 +76,8 @@ Status HdfsTextTableWriter::AppendRows(
   COUNTER_ADD(parent_->rows_inserted_counter(), limit);
 
   bool all_rows = row_group_indices.empty();
-  int num_non_partition_cols =
-      table_desc_->num_cols() - table_desc_->num_clustering_cols();
+  int num_partition_cols = table_desc_->num_clustering_cols();
+  int num_non_partition_cols = table_desc_->num_cols() - num_partition_cols;
   DCHECK_GE(output_expr_evals_.size(), num_non_partition_cols) << parent_->DebugString();
 
   {
@@ -99,7 +100,16 @@ Status HdfsTextTableWriter::AppendRows(
             StringValue sv(val_ptr, StringValue::UnpaddedCharLength(val_ptr, type.len));
             PrintEscaped(&sv);
           } else if (type.IsVarLenStringType()) {
-            PrintEscaped(reinterpret_cast<const StringValue*>(value));
+            const ColumnDescriptor& col_desc =
+                table_desc_->col_descs()[num_partition_cols + j];
+            const StringValue* string_value = reinterpret_cast<const StringValue*>(value);
+            if (col_desc.auxType().IsBinaryStringSubtype()) {
+              // TODO: try to find a more efficient imlementation
+              Base64Encode(
+                  string_value->ptr , string_value->len, &rowbatch_stringstream_);
+            } else {
+              PrintEscaped(string_value);
+            }
           } else {
             output_expr_evals_[j]->PrintValue(value, &rowbatch_stringstream_);
           }
diff --git a/be/src/exec/orc/orc-metadata-utils.cc b/be/src/exec/orc/orc-metadata-utils.cc
index 95e37f939..c8d4c78e9 100644
--- a/be/src/exec/orc/orc-metadata-utils.cc
+++ b/be/src/exec/orc/orc-metadata-utils.cc
@@ -472,6 +472,8 @@ Status OrcSchemaResolver::ValidatePrimitiveType(const ColumnType& type,
     case orc::TypeKind::STRING:
     case orc::TypeKind::VARCHAR:
     case orc::TypeKind::CHAR:
+    case orc::TypeKind::BINARY:
+      // orc::TypeKind::BINARY is handled as TYPE_STRING, TYPE_BINARY is not used.
       if (type.type == TYPE_STRING || type.type == TYPE_VARCHAR
           || type.type == TYPE_CHAR) {
         return Status::OK();
diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.cc b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
index f9becdef6..80c0f1a7f 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
@@ -1469,8 +1469,8 @@ Status HdfsParquetTableWriter::CreateSchema() {
     DCHECK_EQ(col_desc.name(), columns_[i]->column_name());
     const int field_id = col_desc.field_id();
     if (field_id != -1) col_schema.__set_field_id(field_id);
-    ParquetMetadataUtils::FillSchemaElement(col_type, string_utf8_,
-                                            timestamp_type_, &col_schema);
+    ParquetMetadataUtils::FillSchemaElement(col_type, string_utf8_, timestamp_type_,
+                                            col_desc.auxType(), &col_schema);
   }
 
   return Status::OK();
diff --git a/be/src/exec/parquet/parquet-metadata-utils.cc b/be/src/exec/parquet/parquet-metadata-utils.cc
index b0c2c4573..6b8c15edf 100644
--- a/be/src/exec/parquet/parquet-metadata-utils.cc
+++ b/be/src/exec/parquet/parquet-metadata-utils.cc
@@ -415,6 +415,7 @@ parquet::Type::type ParquetMetadataUtils::ConvertInternalToParquetType(
 
 void ParquetMetadataUtils::FillSchemaElement(const ColumnType& col_type,
     bool string_utf8, TParquetTimestampType::type timestamp_type,
+    const AuxColumnType& aux_type,
     parquet::SchemaElement* col_schema) {
   col_schema->__set_type(ConvertInternalToParquetType(col_type.type, timestamp_type));
   col_schema->__set_repetition_type(parquet::FieldRepetitionType::OPTIONAL);
@@ -430,7 +431,7 @@ void ParquetMetadataUtils::FillSchemaElement(const ColumnType& col_type,
     case TYPE_STRING:
       // By default STRING has no logical type, see IMPALA-5982.
       // VARCHAR and CHAR are always set to UTF8.
-      if (string_utf8) {
+      if (string_utf8 && !aux_type.IsBinaryStringSubtype()) {
         SetUtf8ConvertedAndLogicalType(col_schema);
       }
       break;
diff --git a/be/src/exec/parquet/parquet-metadata-utils.h b/be/src/exec/parquet/parquet-metadata-utils.h
index 74ab9e730..2981f7cef 100644
--- a/be/src/exec/parquet/parquet-metadata-utils.h
+++ b/be/src/exec/parquet/parquet-metadata-utils.h
@@ -66,7 +66,7 @@ class ParquetMetadataUtils {
   /// and this function's arguments.
   static void FillSchemaElement(const ColumnType& col_type,
       bool string_utf8, TParquetTimestampType::type timestamp_type,
-      parquet::SchemaElement* col_schema);
+      const AuxColumnType& aux_type, parquet::SchemaElement* col_schema);
 };
 
 struct ParquetFileVersion {
diff --git a/be/src/exec/rcfile/hdfs-rcfile-scanner.cc b/be/src/exec/rcfile/hdfs-rcfile-scanner.cc
index 2776a1db1..8264faf2b 100644
--- a/be/src/exec/rcfile/hdfs-rcfile-scanner.cc
+++ b/be/src/exec/rcfile/hdfs-rcfile-scanner.cc
@@ -590,8 +590,10 @@ Status HdfsRCFileScanner::ProcessRange(RowBatch* row_batch) {
           return Status(ss.str());
         }
 
-        if (!text_converter_->WriteSlot(slot_desc, tuple, col_start, field_len,
-            false, false, row_batch->tuple_data_pool())) {
+        const AuxColumnType& aux_type =
+            scan_node_->hdfs_table()->GetColumnDesc(slot_desc).auxType();
+        if (!text_converter_->WriteSlot(slot_desc, &aux_type, tuple, col_start,
+             field_len, false, false, row_batch->tuple_data_pool())) {
           ReportColumnParseError(slot_desc, col_start, field_len);
           error_in_row = true;
         }
diff --git a/be/src/exec/text-converter.cc b/be/src/exec/text-converter.cc
index 876841234..044ea5b2e 100644
--- a/be/src/exec/text-converter.cc
+++ b/be/src/exec/text-converter.cc
@@ -107,10 +107,11 @@ void TextConverter::UnescapeString(const char* src, char* dest, int* len,
 // TODO: convert this function to use cross-compilation + constant substitution in whole
 // or part. It is currently too complex and doesn't implement the full functionality.
 Status TextConverter::CodegenWriteSlot(LlvmCodeGen* codegen,
-    TupleDescriptor* tuple_desc, SlotDescriptor* slot_desc, llvm::Function** fn,
-    const char* null_col_val, int len, bool check_null, bool strict_mode) {
+    TupleDescriptor* tuple_desc, SlotDescriptor* slot_desc,
+    const AuxColumnType* aux_type, llvm::Function** fn, const char* null_col_val,
+    int len, bool check_null, bool strict_mode) {
   DCHECK(fn != nullptr);
-  DCHECK(SupportsCodegenWriteSlot(slot_desc->type()));
+  DCHECK(SupportsCodegenWriteSlot(slot_desc->type(), *aux_type));
 
   // Codegen is_null_string
   bool is_default_null = (len == 2 && null_col_val[0] == '\\' && null_col_val[1] == 'N');
@@ -349,6 +350,11 @@ Status TextConverter::CodegenWriteSlot(LlvmCodeGen* codegen,
   return Status::OK();
 }
 
-bool TextConverter::SupportsCodegenWriteSlot(const ColumnType& col_type) {
+bool TextConverter::SupportsCodegenWriteSlot(const ColumnType& col_type,
+    const AuxColumnType& auxType) {
+  if (col_type.type == TYPE_STRING) {
+    // TODO: implement codegen for binary (IMPALA-11475)
+    return auxType.string_subtype != AuxColumnType::StringSubtype::BINARY;
+  }
   return col_type.type != TYPE_CHAR;
 }
diff --git a/be/src/exec/text-converter.h b/be/src/exec/text-converter.h
index 77c97994f..0f0d3b981 100644
--- a/be/src/exec/text-converter.h
+++ b/be/src/exec/text-converter.h
@@ -53,6 +53,7 @@ class TextConverter {
 
   /// Converts slot data, of length 'len',  into type of slot_desc,
   /// and writes the result into the tuples's slot.
+  /// auxType is used to differentiate between BINARY and STRING types.
   /// copy_string indicates whether we need to make a separate copy of the string data:
   /// For regular unescaped strings, we point to the original data in the file_buf_.
   /// For regular escaped strings, we copy its unescaped string into a separate buffer
@@ -61,8 +62,9 @@ class TextConverter {
   /// 'pool' is unused.
   /// Unsuccessful conversions are turned into NULLs.
   /// Returns true if the value was written successfully.
-  bool WriteSlot(const SlotDescriptor* slot_desc, Tuple* tuple,
-      const char* data, int len, bool copy_string, bool need_escape, MemPool* pool);
+  bool WriteSlot(const SlotDescriptor* slot_desc, const AuxColumnType* auxType,
+      Tuple* tuple, const char* data, int len, bool copy_string, bool need_escape,
+      MemPool* pool);
 
   /// Removes escape characters from len characters of the null-terminated string src,
   /// and copies the unescaped string into dest, changing *len to the unescaped length.
@@ -79,6 +81,7 @@ class TextConverter {
   /// bool WriteSlot(Tuple* tuple, const char* data, int len);
   /// The codegen function returns true if the slot could be written and false
   /// otherwise.
+  /// auxType is used to differentiate between BINARY and STRING types.
   /// If check_null is set, then the codegen'd function sets the target slot to NULL
   /// if its input string matches null_vol_val.
   /// The codegenerated function does not support escape characters and should not
@@ -86,11 +89,13 @@ class TextConverter {
   /// strict_mode: If set, numerical overflow/underflow are considered to be parse
   /// errors.
   static Status CodegenWriteSlot(LlvmCodeGen* codegen,
-      TupleDescriptor* tuple_desc, SlotDescriptor* slot_desc, llvm::Function** fn,
-      const char* null_col_val, int len, bool check_null, bool strict_mode = false);
+      TupleDescriptor* tuple_desc, SlotDescriptor* slot_desc,
+      const AuxColumnType* auxType, llvm::Function** fn, const char* null_col_val,
+      int len, bool check_null, bool strict_mode = false);
 
   /// Returns whether codegen is supported for the given type.
-  static bool SupportsCodegenWriteSlot(const ColumnType& col_type);
+  static bool SupportsCodegenWriteSlot(const ColumnType& col_type,
+      const AuxColumnType& auxType);
  private:
   char escape_char_;
   /// Special string to indicate NULL column values.
diff --git a/be/src/exec/text-converter.inline.h b/be/src/exec/text-converter.inline.h
index e3bee9707..ba27fefa2 100644
--- a/be/src/exec/text-converter.inline.h
+++ b/be/src/exec/text-converter.inline.h
@@ -26,6 +26,7 @@
 #include "runtime/runtime-state.h"
 #include "runtime/descriptors.h"
 #include "runtime/tuple.h"
+#include "util/coding-util.h"
 #include "util/string-parser.h"
 #include "runtime/string-value.h"
 #include "runtime/date-value.h"
@@ -38,7 +39,8 @@ namespace impala {
 
 /// Note: this function has a codegen'd version.  Changing this function requires
 /// corresponding changes to CodegenWriteSlot().
-inline bool TextConverter::WriteSlot(const SlotDescriptor* slot_desc, Tuple* tuple,
+inline bool TextConverter::WriteSlot(const SlotDescriptor* slot_desc,
+    const AuxColumnType* auxType, Tuple* tuple,
     const char* data, int len, bool copy_string, bool need_escape, MemPool* pool) {
   if ((len == 0 && !slot_desc->type().IsStringType()) || data == NULL) {
     tuple->SetNull(slot_desc->null_indicator_offset());
@@ -66,6 +68,15 @@ inline bool TextConverter::WriteSlot(const SlotDescriptor* slot_desc, Tuple* tup
           !(len != 0 && (copy_string || need_escape));
       if (type.type == TYPE_CHAR) reuse_data &= (buffer_len <= len);
 
+      bool base64_decode = false;
+      if (auxType->IsBinaryStringSubtype()) {
+        base64_decode = true;
+        reuse_data = false;
+        int64_t out_len;
+        if (!Base64DecodeBufLen(data, len, &out_len)) return false;
+        buffer_len = out_len;
+      }
+
       StringValue str;
       str.len = std::min(buffer_len, len);
       if (reuse_data) {
@@ -83,7 +94,12 @@ inline bool TextConverter::WriteSlot(const SlotDescriptor* slot_desc, Tuple* tup
             reinterpret_cast<char*>(pool->TryAllocateUnaligned(buffer_len)) :
             reinterpret_cast<char*>(slot);
         if (UNLIKELY(str.ptr == NULL)) return false;
-        if (need_escape) {
+        if (base64_decode) {
+          int64_t out_len;
+          if(!Base64Decode(data, len, buffer_len, str.ptr, &out_len)) return false;
+          DCHECK_LE(out_len, buffer_len);
+          str.len = out_len;
+        } else if (need_escape) {
           // Use a temporary variable on the stack to avoid accessing an unaligned
           // pointer.
           int str_len = str.len;
diff --git a/be/src/exec/text/hdfs-text-scanner.cc b/be/src/exec/text/hdfs-text-scanner.cc
index a592ad7c9..f36b21c67 100644
--- a/be/src/exec/text/hdfs-text-scanner.cc
+++ b/be/src/exec/text/hdfs-text-scanner.cc
@@ -949,10 +949,12 @@ void HdfsTextScanner::WritePartialTuple(FieldLocation* fields, int num_fields) {
       need_escape = true;
     }
 
-    const SlotDescriptor* desc = scan_node_->materialized_slots()[slot_idx_];
-    if (!text_converter_->WriteSlot(desc, partial_tuple_,
+    const SlotDescriptor* slot_desc = scan_node_->materialized_slots()[slot_idx_];
+    const AuxColumnType& aux_type =
+        scan_node_->hdfs_table()->GetColumnDesc(slot_desc).auxType();
+    if (!text_converter_->WriteSlot(slot_desc, &aux_type, partial_tuple_,
         fields[i].start, len, true, need_escape, boundary_pool_.get())) {
-      ReportColumnParseError(desc, fields[i].start, len);
+      ReportColumnParseError(slot_desc, fields[i].start, len);
       error_in_row_ = true;
     }
     ++slot_idx_;
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index 688f8f74e..aa34332d3 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -358,7 +358,11 @@ class ExprTest : public testing::TestWithParam<std::tuple<bool, bool>> {
       return "";
     }
     EXPECT_TRUE(status.ok()) << "stmt: " << stmt << "\nerror: " << status.GetDetail();
-    EXPECT_EQ(TypeToOdbcString(expr_type.type), result_types[0].type) << expr;
+    string odbcType = TypeToOdbcString(expr_type.ToThrift());
+    // ColumnType cannot be BINARY, so STRING is used instead.
+    string expectedType =
+        result_types[0].type == "binary" ? "string" : result_types[0].type;
+    EXPECT_EQ(odbcType, expectedType) << expr;
     return result_row;
   }
 
@@ -770,28 +774,32 @@ class ExprTest : public testing::TestWithParam<std::tuple<bool, bool>> {
     TestValue("0/0 < 1/0", TYPE_BOOLEAN, false);
   }
 
-  void TestStringComparisons() {
-    TestValue<bool>("'abc' = 'abc'", TYPE_BOOLEAN, true);
-    TestValue<bool>("'abc' = 'abcd'", TYPE_BOOLEAN, false);
-    TestValue<bool>("'abc' != 'abcd'", TYPE_BOOLEAN, true);
-    TestValue<bool>("'abc' != 'abc'", TYPE_BOOLEAN, false);
-    TestValue<bool>("'abc' < 'abcd'", TYPE_BOOLEAN, true);
-    TestValue<bool>("'abcd' < 'abc'", TYPE_BOOLEAN, false);
-    TestValue<bool>("'abcd' < 'abcd'", TYPE_BOOLEAN, false);
-    TestValue<bool>("'abc' > 'abcd'", TYPE_BOOLEAN, false);
-    TestValue<bool>("'abcd' > 'abc'", TYPE_BOOLEAN, true);
-    TestValue<bool>("'abcd' > 'abcd'", TYPE_BOOLEAN, false);
-    TestValue<bool>("'abc' <= 'abcd'", TYPE_BOOLEAN, true);
-    TestValue<bool>("'abcd' <= 'abc'", TYPE_BOOLEAN, false);
-    TestValue<bool>("'abcd' <= 'abcd'", TYPE_BOOLEAN, true);
-    TestValue<bool>("'abc' >= 'abcd'", TYPE_BOOLEAN, false);
-    TestValue<bool>("'abcd' >= 'abc'", TYPE_BOOLEAN, true);
-    TestValue<bool>("'abcd' >= 'abcd'", TYPE_BOOLEAN, true);
+  void TestStringComparisons(string string_type) {
+    string abc = "cast('abc' as " + string_type + ")";
+    string abcd = "cast('abcd' as " + string_type + ")";
+    string empty = "cast('' as " + string_type + ")";
+
+    TestValue<bool>(abc + " = " + abc, TYPE_BOOLEAN, true);
+    TestValue<bool>(abc + " = " + abcd, TYPE_BOOLEAN, false);
+    TestValue<bool>(abc + " != " + abcd, TYPE_BOOLEAN, true);
+    TestValue<bool>(abc + " != " + abc, TYPE_BOOLEAN, false);
+    TestValue<bool>(abc + " < " + abcd, TYPE_BOOLEAN, true);
+    TestValue<bool>(abcd + " < " + abc, TYPE_BOOLEAN, false);
+    TestValue<bool>(abcd + " < " + abcd, TYPE_BOOLEAN, false);
+    TestValue<bool>(abc + " > " + abcd, TYPE_BOOLEAN, false);
+    TestValue<bool>(abcd + " > " + abc, TYPE_BOOLEAN, true);
+    TestValue<bool>(abcd + " > " + abcd, TYPE_BOOLEAN, false);
+    TestValue<bool>(abc + " <= " + abcd, TYPE_BOOLEAN, true);
+    TestValue<bool>(abcd + " <= " + abc, TYPE_BOOLEAN, false);
+    TestValue<bool>(abcd + " <= " + abcd, TYPE_BOOLEAN, true);
+    TestValue<bool>(abc + " >= " + abcd, TYPE_BOOLEAN, false);
+    TestValue<bool>(abcd + " >= " + abc, TYPE_BOOLEAN, true);
+    TestValue<bool>(abcd + " >= " + abcd, TYPE_BOOLEAN, true);
 
     // Test some empty strings
-    TestValue<bool>("'abcd' >= ''", TYPE_BOOLEAN, true);
-    TestValue<bool>("'' > ''", TYPE_BOOLEAN, false);
-    TestValue<bool>("'' = ''", TYPE_BOOLEAN, true);
+    TestValue<bool>(abcd + " >= " + empty, TYPE_BOOLEAN, true);
+    TestValue<bool>(empty + " > " + empty, TYPE_BOOLEAN, false);
+    TestValue<bool>(empty + " = " + empty, TYPE_BOOLEAN, true);
   }
 
   void TestDecimalComparisons() {
@@ -1205,6 +1213,8 @@ class ExprTest : public testing::TestWithParam<std::tuple<bool, bool>> {
     return pool_.Add(
         UdfTestHarness::CreateTestContext(return_type, arg_types, state, pool));
   }
+
+  void TestBytes();
 };
 
 template<>
@@ -3164,7 +3174,10 @@ TEST_P(ExprTest, BinaryPredicates) {
   TestFixedPointComparisons<int64_t>(false);
   TestFloatingPointComparisons<float>(true);
   TestFloatingPointComparisons<double>(false);
-  TestStringComparisons();
+  TestStringComparisons("STRING");
+  TestStringComparisons("BINARY");
+  TestStringComparisons("VARCHAR(4)");
+  TestStringComparisons("CHAR(4)");
   TestDecimalComparisons();
   TestNullComparisons();
   TestDistinctFrom();
@@ -4725,6 +4738,26 @@ TEST_P(ExprTest, StringFunctions) {
   TestIsNull("concat('a', 'b', NULL)", TYPE_STRING);
   TestStringValue("concat('', '', '')", "");
 
+  // Concat should work with BINARY the same way as with STRING
+  TestStringValue("concat(cast('a' as binary))", "a");
+  TestStringValue("concat(cast('a' as binary), cast('b' as binary))", "ab");
+  TestStringValue(
+      "concat(cast('a' as binary), cast('b' as binary), cast('cde' as binary))",
+      "abcde");
+  TestStringValue(
+      "concat(cast('a' as binary) , cast('b' as binary), "
+      "cast('cde' as binary), cast('fg' as binary))",
+      "abcdefg");
+  TestStringValue(
+       "concat(cast('a' as binary), cast('b' as binary), cast('cde' as binary), "
+       "cast('' as binary), cast('fg' as binary), cast('' as binary))",
+       "abcdefg");
+  TestIsNull("concat(cast(NULL as binary))", TYPE_STRING);
+  TestIsNull("concat(cast('a' as binary), NULL, cast('b' as binary))", TYPE_STRING);
+  TestIsNull("concat(cast('a' as binary), cast('b' as binary), NULL)", TYPE_STRING);
+  TestStringValue(
+      "concat(cast('' as binary), cast('' as binary), cast('' as binary))", "");
+
   TestStringValue("concat_ws(',', 'a')", "a");
   TestStringValue("concat_ws(',', 'a', 'b')", "a,b");
   TestStringValue("concat_ws(',', 'a', 'b', 'cde')", "a,b,cde");
@@ -5498,6 +5531,8 @@ TEST_P(ExprTest, MurmurHashFunction) {
   // changes behavior.
   EXPECT_EQ(-3190198453633110066, expected);
   TestValue("murmur_hash('hello world')", TYPE_BIGINT, expected);
+  // BINARY should return the same hash as STRING
+  TestValue("murmur_hash(cast('hello world' as binary))", TYPE_BIGINT, expected);
   s = string("");
   expected = HashUtil::MurmurHash2_64(s.data(), s.size(), HashUtil::MURMUR_DEFAULT_SEED);
   TestValue("murmur_hash('')", TYPE_BIGINT, expected);
@@ -7247,9 +7282,9 @@ TEST_P(ExprTest, TimestampFunctions) {
     Status status = executor_->Exec(stmt, &result_types);
     EXPECT_TRUE(status.ok()) << "stmt: " << stmt << "\nerror: " << status.GetDetail();
     DCHECK(result_types.size() == 2);
-    EXPECT_EQ(TypeToOdbcString(TYPE_TIMESTAMP), result_types[0].type)
+    EXPECT_EQ(result_types[0].type, "timestamp")
         << "invalid type returned by now()";
-    EXPECT_EQ(TypeToOdbcString(TYPE_TIMESTAMP), result_types[1].type)
+    EXPECT_EQ(result_types[1].type, "timestamp")
         << "invalid type returned by utc_timestamp()";
     string result_row;
     status = executor_->FetchResult(&result_row);
@@ -8405,6 +8440,9 @@ TEST_P(ExprTest, ConditionalFunctions) {
   TestValue("if(FALSE, cast(5.5 as double), cast(8.8 as double))", TYPE_DOUBLE, 8.8);
   TestStringValue("if(TRUE, 'abc', 'defgh')", "abc");
   TestStringValue("if(FALSE, 'abc', 'defgh')", "defgh");
+  TestStringValue("if(TRUE, cast('a' as binary), cast('b' as binary))", "a");
+  TestStringValue("if(FALSE, cast('a' as binary), cast('b' as binary))", "b");
+
   TimestampValue then_val = TimestampValue::FromUnixTime(1293872461, UTCPTR);
   TimestampValue else_val = TimestampValue::FromUnixTime(929387245, UTCPTR);
   TestTimestampValue("if(TRUE, cast('2011-01-01 09:01:01' as timestamp), "
@@ -8427,6 +8465,9 @@ TEST_P(ExprTest, ConditionalFunctions) {
   TestValue("nvl2(NULL, cast(5.5 as double), cast(8.8 as double))", TYPE_DOUBLE, 8.8);
   TestStringValue("nvl2('some string', 'abc', 'defgh')", "abc");
   TestStringValue("nvl2(NULL, 'abc', 'defgh')", "defgh");
+  TestStringValue(
+      "nvl2(cast('' as binary), cast('a' as binary), cast('b' as binary))", "a");
+  TestStringValue("nvl2(NULL, cast('a' as binary), cast('b' as binary))", "b");
   TimestampValue first_val = TimestampValue::FromUnixTime(1293872461, UTCPTR);
   TimestampValue second_val = TimestampValue::FromUnixTime(929387245, UTCPTR);
   TestTimestampValue("nvl2(FALSE, cast('2011-01-01 09:01:01' as timestamp), "
@@ -8456,6 +8497,10 @@ TEST_P(ExprTest, ConditionalFunctions) {
   TestStringValue("nullif('abc', 'def')", "abc");
   TestIsNull("nullif(NULL, 'abc')", TYPE_STRING);
   TestStringValue("nullif('abc', NULL)", "abc");
+  TestIsNull("nullif(cast('a' as binary), cast('a' as binary))", TYPE_STRING);
+  TestStringValue("nullif(cast('a' as binary), cast('b' as binary))", "a");
+  TestIsNull("nullif(NULL, cast('a' as binary))", TYPE_STRING);
+  TestStringValue("nullif(cast('a' as binary), NULL)", "a");
   TestIsNull("nullif(cast('2011-01-01 09:01:01' as timestamp), "
       "cast('2011-01-01 09:01:01' as timestamp))", TYPE_TIMESTAMP);
   TimestampValue testlhs = TimestampValue::FromUnixTime(1293872461, UTCPTR);
@@ -8492,6 +8537,7 @@ TEST_P(ExprTest, ConditionalFunctions) {
     TestValue(f + "(NULL, cast(10.0 as float))", TYPE_FLOAT, 10.0f);
     TestValue(f + "(NULL, cast(10.0 as double))", TYPE_DOUBLE, 10.0);
     TestStringValue(f + "(NULL, 'abc')", "abc");
+    TestStringValue(f + "(NULL, cast('abc' as binary))", "abc");
     TestTimestampValue(f + "(NULL, " + default_timestamp_str_ + ")",
         default_timestamp_val_);
     TestDateValue(f + "(NULL, " + default_date_str_ + ")", default_date_val_);
@@ -8525,6 +8571,10 @@ TEST_P(ExprTest, ConditionalFunctions) {
   TestStringValue("coalesce(NULL, 'abc', NULL)", "abc");
   TestStringValue("coalesce('defgh', NULL, 'abc', NULL)", "defgh");
   TestStringValue("coalesce(NULL, NULL, NULL, 'abc', NULL, NULL)", "abc");
+  TestStringValue("coalesce(cast('a' as binary))", "a");
+  TestStringValue("coalesce(NULL, cast('a' as binary), NULL)", "a");
+  TestStringValue("coalesce(cast('a' as binary), NULL, cast('b' as binary), NULL)", "a");
+  TestStringValue("coalesce(NULL, NULL, NULL, cast('a' as binary), NULL, NULL)", "a");
   TimestampValue ats = TimestampValue::FromUnixTime(1293872461, UTCPTR);
   TimestampValue bts = TimestampValue::FromUnixTime(929387245, UTCPTR);
   TestTimestampValue("coalesce(cast('2011-01-01 09:01:01' as timestamp))", ats);
@@ -10724,14 +10774,29 @@ TEST_P(ExprTest, Utf8MaskTest) {
   executor_->PopExecOption();
 }
 
-TEST_P(ExprTest, BytesTest) {
+void ExprTest::TestBytes() {
   // Verifies Bytes(exp) counts number of bytes.
   TestIsNull("Bytes(NULL)", TYPE_INT);
   TestValue("Bytes('你好')", TYPE_INT, 6);
   TestValue("Bytes('你好hello')", TYPE_INT, 11);
   TestValue("Bytes('你好 hello 你好')", TYPE_INT, 19);
   TestValue("Bytes('hello')", TYPE_INT, 5);
+  // BINARY uses "bytes" behind "length"
+  TestIsNull("Length(CAST(NULL AS BINARY))", TYPE_INT);
+  TestValue("Length(CAST('你好' AS BINARY))", TYPE_INT, 6);
+  TestValue("Length(CAST('你好hello' AS BINARY))", TYPE_INT, 11);
+  TestValue("Length(CAST('你好 hello 你好' AS BINARY))", TYPE_INT, 19);
+  TestValue("Length(CAST('hello' AS BINARY))", TYPE_INT, 5);
 }
+
+TEST_P(ExprTest, BytesTest) {
+  // Bytes should behave the same regardless of utf8_mode.
+  TestBytes();
+  executor_->PushExecOption("utf8_mode=true");
+  TestBytes();
+  executor_->PopExecOption();
+}
+
 TEST_P(ExprTest, Utf8Test) {
   // Verifies utf8_length() counts length by UTF-8 characters instead of bytes.
   // '你' and '好' are both encoded into 3 bytes.
diff --git a/be/src/exprs/utility-functions-ir.cc b/be/src/exprs/utility-functions-ir.cc
index 38ca5c983..ce2925350 100644
--- a/be/src/exprs/utility-functions-ir.cc
+++ b/be/src/exprs/utility-functions-ir.cc
@@ -212,6 +212,11 @@ StringVal UtilityFunctions::TypeOf(FunctionContext* ctx, const T& /*input_val*/)
   }
 }
 
+StringVal UtilityFunctions::TypeOfBinary(
+    FunctionContext* ctx, const StringVal& /*input_val*/) {
+  return AnyValUtil::FromString(ctx, "BINARY");
+}
+
 template StringVal UtilityFunctions::TypeOf(
     FunctionContext* ctx, const BooleanVal& input_val);
 template StringVal UtilityFunctions::TypeOf(
diff --git a/be/src/exprs/utility-functions.h b/be/src/exprs/utility-functions.h
index 210da6163..ac380e7fe 100644
--- a/be/src/exprs/utility-functions.h
+++ b/be/src/exprs/utility-functions.h
@@ -114,6 +114,10 @@ class UtilityFunctions {
   /// string.
   static StringVal Md5(FunctionContext* ctx, const StringVal& input_str);
 
+  /// Implementation of the typeOf() for BINARY type, which is generally not
+  /// differentiated from STRING by the backend.
+  static StringVal TypeOfBinary(FunctionContext* ctx, const StringVal& input_val);
+
  private:
   static StringVal GenUuid(FunctionContext* ctx);
 };
diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc
index 2afc59e5d..2b3ffc92f 100644
--- a/be/src/runtime/descriptors.cc
+++ b/be/src/runtime/descriptors.cc
@@ -165,7 +165,8 @@ inline bool SlotDescriptor::IsChildOfStruct() const {
 
 ColumnDescriptor::ColumnDescriptor(const TColumnDescriptor& tdesc)
   : name_(tdesc.name),
-    type_(ColumnType::FromThrift(tdesc.type)) {
+    type_(ColumnType::FromThrift(tdesc.type)),
+    aux_type_(tdesc.type) {
   if (tdesc.__isset.icebergFieldId) {
     field_id_ = tdesc.icebergFieldId;
     // Get key and value field_id for Iceberg table column with Map type.
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 007cdb850..be50ccfaa 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -213,17 +213,23 @@ class ColumnDescriptor {
   ColumnDescriptor(const TColumnDescriptor& tdesc);
   const std::string& name() const { return name_; }
   const ColumnType& type() const { return type_; }
+
   int field_id() const { return field_id_; }
   int field_map_key_id() const { return field_map_key_id_; }
   int field_map_value_id() const { return field_map_value_id_; }
+
+  const AuxColumnType& auxType() const { return aux_type_; }
   std::string DebugString() const;
 
  private:
   std::string name_;
   ColumnType type_;
+
   int field_id_ = -1;
   int field_map_key_id_ = -1;
   int field_map_value_id_ = -1;
+
+  AuxColumnType aux_type_;
 };
 
 /// Base class for table descriptors.
@@ -243,6 +249,13 @@ class TableDescriptor {
         !slot_desc->IsVirtual();
   }
 
+  /// Get ColumnDesc based on SchemaPath.
+  const ColumnDescriptor& GetColumnDesc(const SlotDescriptor* slot_desc) const {
+    DCHECK_EQ(slot_desc->col_path().size(), 1); // Not supported for nested types.
+    DCHECK_LT(slot_desc->col_path()[0], col_descs_.size());
+    return col_descs_[slot_desc->col_path()[0]];
+  }
+
   const std::string& name() const { return name_; }
   const std::string& database() const { return database_; }
   int id() const { return id_; }
diff --git a/be/src/runtime/types.cc b/be/src/runtime/types.cc
index 2879e51a8..5ae510ff3 100644
--- a/be/src/runtime/types.cc
+++ b/be/src/runtime/types.cc
@@ -20,7 +20,6 @@
 #include <ostream>
 #include <sstream>
 
-#include "gen-cpp/TCLIService_constants.h"
 #include "codegen/llvm-codegen.h"
 
 #include "common/names.h"
@@ -38,6 +37,12 @@ const int ColumnType::MAX_DECIMAL8_PRECISION;
 
 const char* ColumnType::LLVM_CLASS_NAME = "struct.impala::ColumnType";
 
+AuxColumnType::AuxColumnType(const TColumnType& col_type) {
+  if (col_type.types.size() != 1 || !col_type.types[0].__isset.scalar_type) return;
+  TPrimitiveType::type primitive_type = col_type.types[0].scalar_type.type;
+  if (primitive_type == TPrimitiveType::BINARY) string_subtype = StringSubtype::BINARY;
+}
+
 ColumnType::ColumnType(const std::vector<TTypeNode>& types, int* idx)
   : len(-1), precision(-1), scale(-1) {
   DCHECK_GE(*idx, 0);
@@ -106,7 +111,8 @@ PrimitiveType ThriftToType(TPrimitiveType::type ttype) {
     case TPrimitiveType::TIMESTAMP: return TYPE_TIMESTAMP;
     case TPrimitiveType::STRING: return TYPE_STRING;
     case TPrimitiveType::VARCHAR: return TYPE_VARCHAR;
-    case TPrimitiveType::BINARY: return TYPE_BINARY;
+    // BINARY is generally handled the same way as STRING by the backend.
+    case TPrimitiveType::BINARY: return TYPE_STRING;
     case TPrimitiveType::DECIMAL: return TYPE_DECIMAL;
     case TPrimitiveType::CHAR: return TYPE_CHAR;
     case TPrimitiveType::FIXED_UDA_INTERMEDIATE: return TYPE_FIXED_UDA_INTERMEDIATE;
@@ -130,7 +136,9 @@ TPrimitiveType::type ToThrift(PrimitiveType ptype) {
     case TYPE_TIMESTAMP: return TPrimitiveType::TIMESTAMP;
     case TYPE_STRING: return TPrimitiveType::STRING;
     case TYPE_VARCHAR: return TPrimitiveType::VARCHAR;
-    case TYPE_BINARY: return TPrimitiveType::BINARY;
+    case TYPE_BINARY:
+      DCHECK(false) << "STRING should be used instead of BINARY in the backend.";
+      return TPrimitiveType::INVALID_TYPE;
     case TYPE_DECIMAL: return TPrimitiveType::DECIMAL;
     case TYPE_CHAR: return TPrimitiveType::CHAR;
     case TYPE_FIXED_UDA_INTERMEDIATE: return TPrimitiveType::FIXED_UDA_INTERMEDIATE;
@@ -169,9 +177,15 @@ string TypeToString(PrimitiveType t) {
   return "";
 }
 
-string TypeToOdbcString(PrimitiveType t) {
+string TypeToOdbcString(const TColumnType& type) {
+  DCHECK_EQ(1, type.types.size());
+  DCHECK_EQ(TTypeNodeType::SCALAR, type.types[0].type);
+  DCHECK(type.types[0].__isset.scalar_type);
+  TPrimitiveType::type col_type = type.types[0].scalar_type.type;
+  PrimitiveType primitive_type = ThriftToType(col_type);
+  AuxColumnType aux_type(type);
   // ODBC driver requires types in lower case
-  switch (t) {
+  switch (primitive_type) {
     case INVALID_TYPE: return "invalid";
     case TYPE_NULL: return "null";
     case TYPE_BOOLEAN: return "boolean";
@@ -184,14 +198,20 @@ string TypeToOdbcString(PrimitiveType t) {
     case TYPE_DATE: return "date";
     case TYPE_DATETIME: return "datetime";
     case TYPE_TIMESTAMP: return "timestamp";
-    case TYPE_STRING: return "string";
+    case TYPE_STRING:
+      if(aux_type.IsBinaryStringSubtype()) {
+        return "binary";
+      } else {
+        return "string";
+      }
     case TYPE_VARCHAR: return "string";
-    case TYPE_BINARY: return "binary";
+
     case TYPE_DECIMAL: return "decimal";
     case TYPE_CHAR: return "char";
     case TYPE_STRUCT: return "struct";
     case TYPE_ARRAY: return "array";
     case TYPE_MAP: return "map";
+    case TYPE_BINARY:
     case TYPE_FIXED_UDA_INTERMEDIATE:
       // This type is not exposed to clients and should not be returned.
       DCHECK(false);
@@ -240,88 +260,6 @@ void ColumnType::ToThrift(TColumnType* thrift_type) const {
   }
 }
 
-TTypeEntry ColumnType::ToHs2Type() const {
-  TPrimitiveTypeEntry type_entry;
-  switch (type) {
-    // Map NULL_TYPE to BOOLEAN, otherwise Hive's JDBC driver won't
-    // work for queries like "SELECT NULL" (IMPALA-914).
-    case TYPE_NULL:
-      type_entry.__set_type(TTypeId::BOOLEAN_TYPE);
-      break;
-    case TYPE_BOOLEAN:
-      type_entry.__set_type(TTypeId::BOOLEAN_TYPE);
-      break;
-    case TYPE_TINYINT:
-      type_entry.__set_type(TTypeId::TINYINT_TYPE);
-      break;
-    case TYPE_SMALLINT:
-      type_entry.__set_type(TTypeId::SMALLINT_TYPE);
-      break;
-    case TYPE_INT:
-      type_entry.__set_type(TTypeId::INT_TYPE);
-      break;
-    case TYPE_BIGINT:
-      type_entry.__set_type(TTypeId::BIGINT_TYPE);
-      break;
-    case TYPE_FLOAT:
-      type_entry.__set_type(TTypeId::FLOAT_TYPE);
-      break;
-    case TYPE_DOUBLE:
-      type_entry.__set_type(TTypeId::DOUBLE_TYPE);
-      break;
-    case TYPE_DATE:
-      type_entry.__set_type(TTypeId::DATE_TYPE);
-      break;
-    case TYPE_TIMESTAMP:
-      type_entry.__set_type(TTypeId::TIMESTAMP_TYPE);
-      break;
-    case TYPE_STRING:
-      type_entry.__set_type(TTypeId::STRING_TYPE);
-      break;
-    case TYPE_BINARY:
-      type_entry.__set_type(TTypeId::BINARY_TYPE);
-      break;
-    case TYPE_DECIMAL: {
-      TTypeQualifierValue tprecision;
-      tprecision.__set_i32Value(precision);
-      TTypeQualifierValue tscale;
-      tscale.__set_i32Value(scale);
-
-      TTypeQualifiers type_quals;
-      type_quals.qualifiers[g_TCLIService_constants.PRECISION] = tprecision;
-      type_quals.qualifiers[g_TCLIService_constants.SCALE] = tscale;
-      type_entry.__set_typeQualifiers(type_quals);
-      type_entry.__set_type(TTypeId::DECIMAL_TYPE);
-      break;
-    }
-    case TYPE_CHAR:
-    case TYPE_VARCHAR: {
-      TTypeQualifierValue tmax_len;
-      tmax_len.__set_i32Value(len);
-
-      TTypeQualifiers type_quals;
-      type_quals.qualifiers[g_TCLIService_constants.CHARACTER_MAXIMUM_LENGTH] = tmax_len;
-      type_entry.__set_typeQualifiers(type_quals);
-      type_entry.__set_type(
-          (type == TYPE_CHAR) ? TTypeId::CHAR_TYPE : TTypeId::VARCHAR_TYPE);
-      break;
-    }
-    case TYPE_STRUCT:
-    case TYPE_ARRAY:
-      type_entry.__set_type(TTypeId::STRING_TYPE);
-      break;
-    default:
-      // HiveServer2 does not have a type for invalid, date, datetime or
-      // fixed_uda_intermediate.
-      DCHECK(false) << "bad TypeToTValueType() type: " << DebugString();
-      type_entry.__set_type(TTypeId::STRING_TYPE);
-  };
-
-  TTypeEntry result;
-  result.__set_primitiveEntry(type_entry);
-  return result;
-}
-
 string ColumnType::DebugString() const {
   stringstream ss;
   switch (type) {
diff --git a/be/src/runtime/types.h b/be/src/runtime/types.h
index db90212e8..a8c68663e 100644
--- a/be/src/runtime/types.h
+++ b/be/src/runtime/types.h
@@ -23,7 +23,6 @@
 
 #include "common/logging.h"
 #include "gen-cpp/Types_types.h"  // for TPrimitiveType
-#include "gen-cpp/TCLIService_types.h"  // for HiveServer2 Type
 
 namespace llvm {
   class ConstantStruct;
@@ -48,7 +47,7 @@ enum PrimitiveType {
   TYPE_STRING,
   TYPE_DATE,
   TYPE_DATETIME,    // Not implemented
-  TYPE_BINARY,      // Not implemented
+  TYPE_BINARY,      // Not used, see AuxColumnType::StringSubtype
   TYPE_DECIMAL,
   TYPE_CHAR,
   TYPE_VARCHAR,
@@ -62,7 +61,30 @@ enum PrimitiveType {
 PrimitiveType ThriftToType(TPrimitiveType::type ttype);
 TPrimitiveType::type ToThrift(PrimitiveType ptype);
 std::string TypeToString(PrimitiveType t);
-std::string TypeToOdbcString(PrimitiveType t);
+
+// Contains information about a type that generally doesn't affect how it should be
+// handled by backend, but can affect encoding / decoding.
+struct AuxColumnType {
+  // Differentiates between STRING and BINARY.
+  // As STRING is just a byte array in Impala (no UTF-8 encoding), the two types are
+  // practically the same in the backend - only some file format readers/writers
+  // differentiate between the two. Instead of using PrimitiveType::TYPE_BINARY BINARY
+  // uses TYPE_STRING to ensure that everything that works for STRING also works for
+  // BINARY.
+  enum class StringSubtype {
+    STRING,
+    BINARY
+  };
+  StringSubtype string_subtype = StringSubtype::STRING;
+
+  AuxColumnType(const TColumnType& thrift_type);
+
+  inline bool IsBinaryStringSubtype() const {
+    return string_subtype == StringSubtype::BINARY;
+  }
+};
+
+std::string TypeToOdbcString(const TColumnType& type);
 
 // Describes a type. Includes the enum, children types, and any type-specific metadata
 // (e.g. precision and scale for decimals).
@@ -101,6 +123,7 @@ struct ColumnType {
     : type(type), len(-1), precision(-1), scale(-1) {
     DCHECK_NE(type, TYPE_CHAR);
     DCHECK_NE(type, TYPE_VARCHAR);
+    DCHECK_NE(type, TYPE_BINARY);
     DCHECK_NE(type, TYPE_DECIMAL);
     DCHECK_NE(type, TYPE_STRUCT);
     DCHECK_NE(type, TYPE_ARRAY);
@@ -246,7 +269,6 @@ struct ColumnType {
   /// optimizer can pull out fields of the returned ConstantStruct for constant folding.
   llvm::ConstantStruct* ToIR(LlvmCodeGen* codegen) const;
 
-  apache::hive::service::cli::thrift::TTypeEntry ToHs2Type() const;
   std::string DebugString() const;
 
   /// Used to create a possibly nested type from the flattened Thrift representation.
diff --git a/be/src/service/hs2-util.cc b/be/src/service/hs2-util.cc
index 83c9e085b..ce9178e9a 100644
--- a/be/src/service/hs2-util.cc
+++ b/be/src/service/hs2-util.cc
@@ -25,6 +25,7 @@
 #include "common/logging.h"
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
+#include "gen-cpp/TCLIService_constants.h"
 #include "runtime/date-value.h"
 #include "runtime/decimal-value.inline.h"
 #include "runtime/raw-value.inline.h"
@@ -119,10 +120,12 @@ void impala::TColumnValueToHS2TColumn(const TColumnValue& col_val,
     case TPrimitiveType::CHAR:
     case TPrimitiveType::VARCHAR:
     case TPrimitiveType::DECIMAL:
+    case TPrimitiveType::BINARY:
       is_null = !col_val.__isset.string_val;
       column->stringVal.values.push_back(col_val.string_val);
       nulls = &column->stringVal.nulls;
       break;
+
     default:
       DCHECK(false) << "Unhandled type: "
                     << TypeToString(ThriftToType(type.types[0].scalar_type.type));
@@ -275,23 +278,44 @@ static void DateExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval,
   }
 }
 
-// Implementation for STRING and VARCHAR.
-static void StringExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch,
-    int start_idx, int num_rows, uint32_t output_row_idx,
-    apache::hive::service::cli::thrift::TColumn* column) {
-  ReserveSpace(num_rows, output_row_idx, &column->stringVal);
+// Common logic for BINARY, STRING and VARCHAR.
+static void StringExprValuesToHS2TColumnHelper(ScalarExprEvaluator* expr_eval,
+    RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx,
+    vector<string>&  values, string& nulls) {
   FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
     StringVal val = expr_eval->GetStringVal(it.Get());
     if (val.is_null) {
-      column->stringVal.values.emplace_back();
+      values.emplace_back();
     } else {
-      column->stringVal.values.emplace_back(reinterpret_cast<char*>(val.ptr), val.len);
+      values.emplace_back(reinterpret_cast<char*>(val.ptr), val.len);
     }
-    SetNullBit(output_row_idx, val.is_null, &column->stringVal.nulls);
+    SetNullBit(output_row_idx, val.is_null, &nulls);
     ++output_row_idx;
   }
 }
 
+// Implementation for STRING and VARCHAR.
+static void StringExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch,
+    int start_idx, int num_rows, uint32_t output_row_idx,
+    apache::hive::service::cli::thrift::TColumn* column) {
+  ReserveSpace(num_rows, output_row_idx, &column->stringVal);
+  StringExprValuesToHS2TColumnHelper(
+      expr_eval, batch, start_idx, num_rows, output_row_idx,
+      column->stringVal.values, column->stringVal.nulls);
+}
+
+// Implementation for BINARY. Same as for STRING with the exception of using a different
+// Thrift field.
+static void BinaryExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch,
+    int start_idx, int num_rows, uint32_t output_row_idx,
+    apache::hive::service::cli::thrift::TColumn* column) {
+  ReserveSpace(num_rows, output_row_idx, &column->binaryVal);
+  StringExprValuesToHS2TColumnHelper(
+      expr_eval, batch, start_idx, num_rows, output_row_idx,
+      column->binaryVal.values, column->binaryVal.nulls);
+}
+
+
 // Implementation for CHAR.
 static void CharExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval,
     const TColumnType& type, RowBatch* batch, int start_idx, int num_rows,
@@ -486,6 +510,10 @@ void impala::ExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval,
       StringExprValuesToHS2TColumn(
           expr_eval, batch, start_idx, num_rows, output_row_idx, column);
       return;
+    case TPrimitiveType::BINARY:
+      BinaryExprValuesToHS2TColumn(
+          expr_eval, batch, start_idx, num_rows, output_row_idx, column);
+      return;
     case TPrimitiveType::CHAR:
       CharExprValuesToHS2TColumn(
           expr_eval, type, batch, start_idx, num_rows, output_row_idx, column);
@@ -546,6 +574,7 @@ void impala::TColumnValueToHS2TColumnValue(const TColumnValue& col_val,
     case TPrimitiveType::DATE:
     case TPrimitiveType::VARCHAR:
     case TPrimitiveType::CHAR:
+    case TPrimitiveType::BINARY:
       // HiveServer2 requires timestamp to be presented as string. Note that the .thrift
       // spec says it should be a BIGINT; AFAICT Hive ignores that and produces a string.
       hs2_col_val->__isset.stringVal = true;
@@ -614,6 +643,8 @@ void impala::ExprValueToHS2TColumnValue(const void* value, const TColumnType& ty
       break;
     case TPrimitiveType::STRING:
     case TPrimitiveType::VARCHAR:
+    // Unlike TColumn, TColumnValue does not differentiate between STRING and BINARY.
+    case TPrimitiveType::BINARY:
       hs2_col_val->__isset.stringVal = true;
       hs2_col_val->stringVal.__isset.value = not_null;
       if (not_null) {
@@ -834,3 +865,92 @@ bool impala::isOneFieldSet(const impala::TColumnValue& value) {
           value.__isset.decimal_val ||
           value.__isset.date_val);
 }
+
+thrift::TTypeEntry impala::ColumnToHs2Type(
+    const TColumnType& columnType) {
+  const ColumnType& type = ColumnType::FromThrift(columnType);
+  AuxColumnType aux_type(columnType);
+  thrift::TPrimitiveTypeEntry type_entry;
+  switch (type.type) {
+    // Map NULL_TYPE to BOOLEAN, otherwise Hive's JDBC driver won't
+    // work for queries like "SELECT NULL" (IMPALA-914).
+    case TYPE_NULL:
+      type_entry.__set_type(thrift::TTypeId::BOOLEAN_TYPE);
+      break;
+    case TYPE_BOOLEAN:
+      type_entry.__set_type(thrift::TTypeId::BOOLEAN_TYPE);
+      break;
+    case TYPE_TINYINT:
+      type_entry.__set_type(thrift::TTypeId::TINYINT_TYPE);
+      break;
+    case TYPE_SMALLINT:
+      type_entry.__set_type(thrift::TTypeId::SMALLINT_TYPE);
+      break;
+    case TYPE_INT:
+      type_entry.__set_type(thrift::TTypeId::INT_TYPE);
+      break;
+    case TYPE_BIGINT:
+      type_entry.__set_type(thrift::TTypeId::BIGINT_TYPE);
+      break;
+    case TYPE_FLOAT:
+      type_entry.__set_type(thrift::TTypeId::FLOAT_TYPE);
+      break;
+    case TYPE_DOUBLE:
+      type_entry.__set_type(thrift::TTypeId::DOUBLE_TYPE);
+      break;
+    case TYPE_DATE:
+      type_entry.__set_type(thrift::TTypeId::DATE_TYPE);
+      break;
+    case TYPE_TIMESTAMP:
+      type_entry.__set_type(thrift::TTypeId::TIMESTAMP_TYPE);
+      break;
+    case TYPE_STRING:
+      if (aux_type.string_subtype == AuxColumnType::StringSubtype::BINARY) {
+        type_entry.__set_type(thrift::TTypeId::BINARY_TYPE);
+      } else {
+        type_entry.__set_type(thrift::TTypeId::STRING_TYPE);
+      }
+      break;
+    case TYPE_DECIMAL: {
+      thrift::TTypeQualifierValue tprecision;
+      tprecision.__set_i32Value(type.precision);
+      thrift::TTypeQualifierValue tscale;
+      tscale.__set_i32Value(type.scale);
+
+      thrift::TTypeQualifiers type_quals;
+      type_quals.qualifiers[thrift::g_TCLIService_constants.PRECISION] = tprecision;
+      type_quals.qualifiers[thrift::g_TCLIService_constants.SCALE] = tscale;
+      type_entry.__set_typeQualifiers(type_quals);
+      type_entry.__set_type(thrift::TTypeId::DECIMAL_TYPE);
+      break;
+    }
+    case TYPE_CHAR:
+    case TYPE_VARCHAR: {
+      thrift::TTypeQualifierValue tmax_len;
+      tmax_len.__set_i32Value(type.len);
+
+      thrift::TTypeQualifiers type_quals;
+      type_quals.qualifiers[thrift::g_TCLIService_constants.CHARACTER_MAXIMUM_LENGTH]
+          = tmax_len;
+      type_entry.__set_typeQualifiers(type_quals);
+      type_entry.__set_type((type.type == TYPE_CHAR)
+          ? thrift::TTypeId::CHAR_TYPE : thrift::TTypeId::VARCHAR_TYPE);
+      break;
+    }
+    case TYPE_STRUCT:
+    case TYPE_ARRAY:
+      type_entry.__set_type(thrift::TTypeId::STRING_TYPE);
+      break;
+    case TYPE_BINARY:
+    default:
+      // HiveServer2 does not have a type for invalid, datetime or
+      // fixed_uda_intermediate. Binary should be stored as TYPE_STRING, not
+      // TYPE_BINARY in the backend.
+      DCHECK(false) << "bad TypeToTValueType() type: " << type.DebugString();
+      type_entry.__set_type(thrift::TTypeId::STRING_TYPE);
+  };
+
+  thrift::TTypeEntry result;
+  result.__set_primitiveEntry(type_entry);
+  return result;
+}
diff --git a/be/src/service/hs2-util.h b/be/src/service/hs2-util.h
index 083059cc3..b6a97b09a 100644
--- a/be/src/service/hs2-util.h
+++ b/be/src/service/hs2-util.h
@@ -68,4 +68,7 @@ std::string PrintTColumnValue(const impala::TColumnValue& colval);
 
 /// Return true if one field in value is set. Return false otherwise.
 bool isOneFieldSet(const impala::TColumnValue& value);
+
+apache::hive::service::cli::thrift::TTypeEntry ColumnToHs2Type(
+    const TColumnType& columnType);
 }
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index 21df93f2e..bd4a3b4ae 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -205,12 +205,11 @@ string ImpalaServer::ColumnTypeToBeeswaxTypeString(const TColumnType& type) {
   if (type.types.size() == 1) {
     DCHECK_EQ(TTypeNodeType::SCALAR, type.types[0].type);
     DCHECK(type.types[0].__isset.scalar_type);
-    TPrimitiveType::type col_type = type.types[0].scalar_type.type;
-    return TypeToOdbcString(ThriftToType(col_type));
+    return TypeToOdbcString(type);
   } else if (type.types[0].type == TTypeNodeType::ARRAY) {
     DCHECK_GT(type.types.size(), 1);
     // TODO (IMPALA-11041): consider returning the real type
-    return TypeToOdbcString(PrimitiveType::TYPE_STRING);
+    return "string";
   } else if (type.types[0].type == TTypeNodeType::STRUCT) {
     DCHECK_GT(type.types.size(), 1);
     RaiseBeeswaxException("Returning struct types is not supported through the "
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 7e87e1cd1..7f9abc34c 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -897,8 +897,8 @@ void ImpalaServer::GetResultSetMetadata(TGetResultSetMetadataResp& return_val,
             result_set_md->columns[i].columnName);
         return_val.schema.columns[i].position = i;
         return_val.schema.columns[i].typeDesc.types.resize(1);
-        ColumnType t = ColumnType::FromThrift(result_set_md->columns[i].columnType);
-        return_val.schema.columns[i].typeDesc.types[0] = t.ToHs2Type();
+        return_val.schema.columns[i].typeDesc.types[0] =
+            ColumnToHs2Type(result_set_md->columns[i].columnType);
       }
     }
   }
diff --git a/be/src/service/query-result-set.cc b/be/src/service/query-result-set.cc
index 9d0300954..19f2b73d7 100644
--- a/be/src/service/query-result-set.cc
+++ b/be/src/service/query-result-set.cc
@@ -414,6 +414,12 @@ int HS2ColumnarResultSet::AddRows(
             from->stringVal.values.begin() + start_idx,
             from->stringVal.values.begin() + start_idx + rows_added);
         break;
+      case TPrimitiveType::BINARY:
+        StitchNulls(num_rows_, rows_added, start_idx, from->binaryVal.nulls,
+            &(to->binaryVal.nulls));
+        to->binaryVal.values.insert(to->binaryVal.values.end(),
+            from->binaryVal.values.begin() + start_idx,
+            from->binaryVal.values.begin() + start_idx + rows_added);
       default:
         DCHECK(false) << "Unsupported type: "
                       << TypeToString(ThriftToType(
@@ -477,6 +483,9 @@ void HS2ColumnarResultSet::InitColumns() {
         case TPrimitiveType::STRING:
           col_output.__isset.stringVal = true;
           break;
+        case TPrimitiveType::BINARY:
+          col_output.__isset.binaryVal = true;
+          break;
         default:
           DCHECK(false) << "Unhandled column type: "
               << TypeToString(ThriftToType(input_type));
diff --git a/be/src/testutil/test-udfs.cc b/be/src/testutil/test-udfs.cc
index 4e5fd4103..cb670f704 100644
--- a/be/src/testutil/test-udfs.cc
+++ b/be/src/testutil/test-udfs.cc
@@ -64,10 +64,11 @@ IntVal AllTypes(
     FunctionContext* context, const StringVal& string, const BooleanVal& boolean,
     const TinyIntVal& tiny_int, const SmallIntVal& small_int, const IntVal& int_val,
     const BigIntVal& big_int, const FloatVal& float_val, const DoubleVal& double_val,
-    const DecimalVal& decimal, const DateVal& date_val) {
+    const DecimalVal& decimal, const DateVal& date_val, const StringVal& binary_val) {
   int result = string.len + boolean.val + tiny_int.val + small_int.val + int_val.val
                + big_int.val + static_cast<int64_t>(float_val.val)
-               + static_cast<int64_t>(double_val.val) + decimal.val4 + date_val.val;
+               + static_cast<int64_t>(double_val.val) + decimal.val4 + date_val.val
+               + binary_val.len;
   return IntVal(result);
 }
 
diff --git a/be/src/util/coding-util.cc b/be/src/util/coding-util.cc
index 6ebf0d5f7..b093b1e7b 100644
--- a/be/src/util/coding-util.cc
+++ b/be/src/util/coding-util.cc
@@ -134,7 +134,7 @@ bool Base64Encode(const char* in, int64_t in_len, int64_t out_max, char* out,
   return true;
 }
 
-static inline void Base64Encode(const char* in, int64_t in_len, stringstream* out) {
+void Base64Encode(const char* in, int64_t in_len, stringstream* out) {
   if (in_len == 0) {
     (*out) << "";
     return;
diff --git a/be/src/util/coding-util.h b/be/src/util/coding-util.h
index bb2b0395b..8716d0245 100644
--- a/be/src/util/coding-util.h
+++ b/be/src/util/coding-util.h
@@ -57,6 +57,7 @@ void Base64Encode(const std::vector<uint8_t>& in, std::string* out);
 void Base64Encode(const std::vector<uint8_t>& in, std::stringstream* out);
 void Base64Encode(const std::string& in, std::string* out);
 void Base64Encode(const std::string& in, std::stringstream* out);
+void Base64Encode(const char* in, int64_t in_len, std::stringstream* out);
 
 /// Calculate the maximum output buffer size needed for Base64Decode. Returns false if
 /// in_len is invalid.
diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt
index 7df364eed..7aa2c3f66 100644
--- a/bin/rat_exclude_files.txt
+++ b/bin/rat_exclude_files.txt
@@ -137,6 +137,7 @@ testdata/cluster/node_templates/cdh5/etc/hadoop/conf/*.xml.tmpl
 testdata/cluster/node_templates/common/etc/kudu/*.conf.tmpl
 testdata/cluster/node_templates/common/etc/hadoop/conf/*.xml.tmpl
 testdata/cluster/ranger/setup/*.json.template
+testdata/data/binary_tbl/000000_0.txt
 testdata/data/chars-formats.txt
 testdata/data/chars-tiny.txt
 testdata/data/parent_table.txt
diff --git a/common/function-registry/impala_functions.py b/common/function-registry/impala_functions.py
index d6956aa60..1326e100f 100644
--- a/common/function-registry/impala_functions.py
+++ b/common/function-registry/impala_functions.py
@@ -515,6 +515,7 @@ visible_functions = [
   [['rpad'], 'STRING', ['STRING', 'BIGINT', 'STRING'], 'impala::StringFunctions::Rpad'],
   [['bytes'], 'INT', ['STRING'], 'impala::StringFunctions::Bytes'],
   [['length'], 'INT', ['STRING'], 'impala::StringFunctions::Length'],
+  [['length'], 'INT', ['BINARY'], 'impala::StringFunctions::Bytes'],
   [['length'], 'INT', ['CHAR'], 'impala::StringFunctions::CharLength'],
   [['char_length'], 'INT', ['STRING'], 'impala::StringFunctions::Length'],
   [['character_length'], 'INT', ['STRING'], 'impala::StringFunctions::Length'],
@@ -584,6 +585,7 @@ visible_functions = [
    '_ZN6impala15StringFunctions23RegexpMatchCountPrepareEPN10impala_udf15FunctionContextENS2_18FunctionStateScopeE',
    '_ZN6impala15StringFunctions11RegexpCloseEPN10impala_udf15FunctionContextENS2_18FunctionStateScopeE'],
   [['concat'], 'STRING', ['STRING', '...'], 'impala::StringFunctions::Concat'],
+  [['concat'], 'BINARY', ['BINARY', '...'], 'impala::StringFunctions::Concat'],
   [['concat_ws'], 'STRING', ['STRING', 'STRING', '...'],
    'impala::StringFunctions::ConcatWs'],
   [['find_in_set'], 'INT', ['STRING', 'STRING'], 'impala::StringFunctions::FindInSet'],
@@ -635,6 +637,7 @@ visible_functions = [
   [['if'], 'FLOAT', ['BOOLEAN', 'FLOAT', 'FLOAT'], ''],
   [['if'], 'DOUBLE', ['BOOLEAN', 'DOUBLE', 'DOUBLE'], ''],
   [['if'], 'STRING', ['BOOLEAN', 'STRING', 'STRING'], ''],
+  [['if'], 'BINARY', ['BOOLEAN', 'BINARY', 'BINARY'], ''],
   [['if'], 'TIMESTAMP', ['BOOLEAN', 'TIMESTAMP', 'TIMESTAMP'], ''],
   [['if'], 'DECIMAL', ['BOOLEAN', 'DECIMAL', 'DECIMAL'], ''],
   [['if'], 'DATE', ['BOOLEAN', 'DATE', 'DATE'], ''],
@@ -663,6 +666,7 @@ visible_functions = [
   [['isnull', 'ifnull', 'nvl'], 'FLOAT', ['FLOAT', 'FLOAT'], ''],
   [['isnull', 'ifnull', 'nvl'], 'DOUBLE', ['DOUBLE', 'DOUBLE'], ''],
   [['isnull', 'ifnull', 'nvl'], 'STRING', ['STRING', 'STRING'], ''],
+  [['isnull', 'ifnull', 'nvl'], 'BINARY', ['BINARY', 'BINARY'], ''],
   [['isnull', 'ifnull', 'nvl'], 'TIMESTAMP', ['TIMESTAMP', 'TIMESTAMP'], ''],
   [['isnull', 'ifnull', 'nvl'], 'DECIMAL', ['DECIMAL', 'DECIMAL'], ''],
   [['isnull', 'ifnull', 'nvl'], 'DATE', ['DATE', 'DATE'], ''],
@@ -675,6 +679,7 @@ visible_functions = [
   [['coalesce'], 'FLOAT', ['FLOAT', '...'], ''],
   [['coalesce'], 'DOUBLE', ['DOUBLE', '...'], ''],
   [['coalesce'], 'STRING', ['STRING', '...'], ''],
+  [['coalesce'], 'BINARY', ['BINARY', '...'], ''],
   [['coalesce'], 'TIMESTAMP', ['TIMESTAMP', '...'], ''],
   [['coalesce'], 'DECIMAL', ['DECIMAL', '...'], ''],
   [['coalesce'], 'DATE', ['DATE', '...'], ''],
@@ -711,6 +716,7 @@ visible_functions = [
   [['typeOf'], 'STRING', ['TIMESTAMP'], '_ZN6impala16UtilityFunctions6TypeOfIN10impala_udf12TimestampValEEENS2_9StringValEPNS2_15FunctionContextERKT_'],
   [['typeOf'], 'STRING', ['DECIMAL'], '_ZN6impala16UtilityFunctions6TypeOfIN10impala_udf10DecimalValEEENS2_9StringValEPNS2_15FunctionContextERKT_'],
   [['typeOf'], 'STRING', ['DATE'], '_ZN6impala16UtilityFunctions6TypeOfIN10impala_udf7DateValEEENS2_9StringValEPNS2_15FunctionContextERKT_'],
+  [['typeOf'], 'STRING', ['BINARY'], '_ZN6impala16UtilityFunctions12TypeOfBinaryEPN10impala_udf15FunctionContextERKNS1_9StringValE'],
   [['fnv_hash'], 'BIGINT', ['TINYINT'],
    '_ZN6impala16UtilityFunctions7FnvHashIN10impala_udf10TinyIntValEEENS2_9BigIntValEPNS2_15FunctionContextERKT_'],
   [['fnv_hash'], 'BIGINT', ['SMALLINT'],
@@ -747,6 +753,8 @@ visible_functions = [
    '_ZN6impala16UtilityFunctions10MurmurHashIN10impala_udf7DateValEEENS2_9BigIntValEPNS2_15FunctionContextERKT_'],
   [['murmur_hash'], 'BIGINT', ['STRING'],
    '_ZN6impala16UtilityFunctions16MurmurHashStringEPN10impala_udf15FunctionContextERKNS1_9StringValE'],
+  [['murmur_hash'], 'BIGINT', ['BINARY'],
+   '_ZN6impala16UtilityFunctions16MurmurHashStringEPN10impala_udf15FunctionContextERKNS1_9StringValE'],
   [['murmur_hash'], 'BIGINT', ['TIMESTAMP'],
    '_ZN6impala16UtilityFunctions19MurmurHashTimestampEPN10impala_udf15FunctionContextERKNS1_12TimestampValE'],
   [['murmur_hash'], 'BIGINT', ['DECIMAL'],
@@ -1089,6 +1097,7 @@ invisible_functions = [
   [['distinctfrom'], 'BOOLEAN', ['FLOAT', 'FLOAT'], 'impala::Operators::DistinctFrom_FloatVal_FloatVal'],
   [['distinctfrom'], 'BOOLEAN', ['DOUBLE', 'DOUBLE'], 'impala::Operators::DistinctFrom_DoubleVal_DoubleVal'],
   [['distinctfrom'], 'BOOLEAN', ['STRING', 'STRING'], 'impala::Operators::DistinctFrom_StringVal_StringVal'],
+  [['distinctfrom'], 'BOOLEAN', ['BINARY', 'BINARY'], 'impala::Operators::DistinctFrom_StringVal_StringVal'],
   [['distinctfrom'], 'BOOLEAN', ['TIMESTAMP', 'TIMESTAMP'], 'impala::Operators::DistinctFrom_TimestampVal_TimestampVal'],
   [['distinctfrom'], 'BOOLEAN', ['CHAR', 'CHAR'], 'impala::Operators::DistinctFrom_Char_Char'],
   [['distinctfrom'], 'BOOLEAN', ['DECIMAL', 'DECIMAL'], 'impala::DecimalOperators::DistinctFrom_DecimalVal_DecimalVal'],
@@ -1102,6 +1111,7 @@ invisible_functions = [
   [['notdistinct'], 'BOOLEAN', ['FLOAT', 'FLOAT'], 'impala::Operators::NotDistinct_FloatVal_FloatVal'],
   [['notdistinct'], 'BOOLEAN', ['DOUBLE', 'DOUBLE'], 'impala::Operators::NotDistinct_DoubleVal_DoubleVal'],
   [['notdistinct'], 'BOOLEAN', ['STRING', 'STRING'], 'impala::Operators::NotDistinct_StringVal_StringVal'],
+  [['notdistinct'], 'BOOLEAN', ['BINARY', 'BINARY'], 'impala::Operators::NotDistinct_StringVal_StringVal'],
   [['notdistinct'], 'BOOLEAN', ['TIMESTAMP', 'TIMESTAMP'], 'impala::Operators::NotDistinct_TimestampVal_TimestampVal'],
   [['notdistinct'], 'BOOLEAN', ['CHAR', 'CHAR'], 'impala::Operators::NotDistinct_Char_Char'],
   [['notdistinct'], 'BOOLEAN', ['DECIMAL', 'DECIMAL'], 'impala::DecimalOperators::NotDistinct_DecimalVal_DecimalVal'],
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index b2742e269..a3d50ebfb 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -1693,6 +1693,10 @@ public class Analyzer {
       throw new AnalysisException(
           "MAP type inside collection types is not supported.");
     }
+    if (resolvedPathToItem.destType().isBinary()) {
+      throw new AnalysisException(
+          "Binary type inside collection types is not supported (IMPALA-11491).");
+    }
     registerSlotRef(resolvedPathToItem, false);
     return desc;
   }
diff --git a/fe/src/main/java/org/apache/impala/analysis/CastExpr.java b/fe/src/main/java/org/apache/impala/analysis/CastExpr.java
index 771560d82..81b7e8b36 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CastExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CastExpr.java
@@ -184,6 +184,11 @@ public class CastExpr extends Expr {
         }
         // Disable no-op casts
         if (fromType.equals(toType) && !fromType.isDecimal()) continue;
+        // No built-in function needed for BINARY <-> STRING conversion, while there is
+        // no conversion from / to any other type.
+        if (fromType.isBinary() || toType.isBinary()) {
+          continue;
+        }
         String beClass = toType.isDecimal() || fromType.isDecimal() ?
             "DecimalOperators" : "CastFunctions";
         String beSymbol = "impala::" + beClass + "::CastTo" + Function.getUdfType(toType);
@@ -350,12 +355,20 @@ public class CastExpr extends Expr {
 
     Type childType = children_.get(0).type_;
     Preconditions.checkState(!childType.isNull());
+
     // IMPALA-4550: We always need to set noOp_ to the correct value, since we could
     // be performing a subsequent analysis run and its correct value might have changed.
     // This can happen if the child node gets substituted and its type changes.
     noOp_ = childType.equals(type_);
     if (noOp_) return;
 
+    // BINARY can be only converted from / to STRING and the conversion is NOOP.
+    if ((childType.isBinary() && type_.getPrimitiveType() == PrimitiveType.STRING)
+        || (type_.isBinary() && childType.getPrimitiveType() == PrimitiveType.STRING)) {
+      noOp_ = true;
+      return;
+    }
+
     FunctionName fnName = new FunctionName(BuiltinsDb.NAME, getFnName(type_));
     Type[] args = { childType };
     Function searchDesc = new Function(fnName, args, Type.INVALID, false);
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index 4fd542370..124315860 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -260,17 +260,23 @@ public class ComputeStatsStmt extends StatementBase {
       }
       if (ignoreColumn(c)) continue;
 
-      // NDV approximation function. Add explicit alias for later identification when
-      // updating the Metastore.
       String colRefSql = ToSqlUtils.getIdentSql(c.getName());
-      if (isIncremental_) {
-        columnStatsSelectList.add("NDV_NO_FINALIZE(" + colRefSql + ") AS " + colRefSql);
-      } else if (isSampling()) {
-        columnStatsSelectList.add(String.format("SAMPLED_NDV(%s, %.10f) AS %s",
-            colRefSql, effectiveSamplePerc_, colRefSql));
+
+      if (c.getType().isBinary()) {
+        // NDV is not calculated for BINARY columns (similarly to Hive).
+        columnStatsSelectList.add("NULL AS " + colRefSql);
       } else {
-        // Regular (non-incremental) compute stats without sampling.
-        columnStatsSelectList.add("NDV(" + colRefSql + ") AS " + colRefSql);
+        // NDV approximation function. Add explicit alias for later identification when
+        // updating the Metastore.
+        if (isIncremental_) {
+          columnStatsSelectList.add("NDV_NO_FINALIZE(" + colRefSql + ") AS " + colRefSql);
+        } else if (isSampling()) {
+          columnStatsSelectList.add(String.format("SAMPLED_NDV(%s, %.10f) AS %s",
+              colRefSql, effectiveSamplePerc_, colRefSql));
+        } else {
+          // Regular (non-incremental) compute stats without sampling.
+          columnStatsSelectList.add("NDV(" + colRefSql + ") AS " + colRefSql);
+        }
       }
 
       // Count the number of NULL values.
diff --git a/fe/src/main/java/org/apache/impala/analysis/InPredicate.java b/fe/src/main/java/org/apache/impala/analysis/InPredicate.java
index 5b26533f9..bd0d8a7d0 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InPredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InPredicate.java
@@ -56,7 +56,7 @@ public class InPredicate extends Predicate {
       if (t.getPrimitiveType() == PrimitiveType.CHAR) continue;
 
       String typeString = t.getPrimitiveType().toString().toLowerCase();
-      if (t.isScalarType(PrimitiveType.VARCHAR)) typeString = "string";
+      if (t.isVarchar() || t.isBinary()) typeString = "string";
 
       db.addBuiltin(ScalarFunction.createBuiltin(IN_ITERATE,
           Lists.newArrayList(t, t), true, Type.BOOLEAN,
@@ -74,7 +74,6 @@ public class InPredicate extends Predicate {
       db.addBuiltin(ScalarFunction.createBuiltin(NOT_IN_SET_LOOKUP,
           Lists.newArrayList(t, t), true, Type.BOOLEAN,
           "impala::InPredicate::NotInSetLookup", prepareFn, closeFn, false));
-
     }
   }
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/LikePredicate.java b/fe/src/main/java/org/apache/impala/analysis/LikePredicate.java
index 5223f4de3..cdaf22ed7 100644
--- a/fe/src/main/java/org/apache/impala/analysis/LikePredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/LikePredicate.java
@@ -113,14 +113,18 @@ public class LikePredicate extends Predicate {
     msg.node_type = TExprNodeType.FUNCTION_CALL;
   }
 
+  private static boolean isLikeableType(Type type) {
+    return (type.isStringType() && !type.isBinary()) || type.isNull();
+  }
+
   @Override
   protected void analyzeImpl(Analyzer analyzer) throws AnalysisException {
     super.analyzeImpl(analyzer);
-    if (!getChild(0).getType().isStringType() && !getChild(0).getType().isNull()) {
+    if (!isLikeableType(getChild(0).getType())) {
       throw new AnalysisException(
           "left operand of " + op_.toString() + " must be of type STRING: " + toSql());
     }
-    if (!getChild(1).getType().isStringType() && !getChild(1).getType().isNull()) {
+    if (!isLikeableType(getChild(1).getType())) {
       throw new AnalysisException(
           "right operand of " + op_.toString() + " must be of type STRING: " + toSql());
     }
diff --git a/fe/src/main/java/org/apache/impala/analysis/LiteralExpr.java b/fe/src/main/java/org/apache/impala/analysis/LiteralExpr.java
index 7f9012ecf..17c5fa0b5 100644
--- a/fe/src/main/java/org/apache/impala/analysis/LiteralExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/LiteralExpr.java
@@ -88,6 +88,7 @@ public abstract class LiteralExpr extends Expr implements Comparable<LiteralExpr
       case STRING:
       case VARCHAR:
       case CHAR:
+      case BINARY:
         e = new StringLiteral(value, type, false);
         break;
       case DATE:
@@ -265,6 +266,7 @@ public abstract class LiteralExpr extends Expr implements Comparable<LiteralExpr
       case STRING:
       case VARCHAR:
       case CHAR:
+      case BINARY:
         if (val.isSetBinary_val()) {
           byte[] bytes = new byte[val.binary_val.remaining()];
           val.binary_val.get(bytes);
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
index ac3ceaaf4..3cdace050 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
@@ -222,6 +222,10 @@ public class SlotRef extends Expr {
         throw new AnalysisException("Struct containing a collection type is not " +
             "allowed in the select list.");
       }
+      if (fieldType.isBinary()) {
+        throw new AnalysisException("Struct containing a BINARY type is not " +
+            "allowed in the select list (IMPALA-11491).");
+      }
     }
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
index 889210851..16abcb377 100644
--- a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
@@ -396,6 +396,9 @@ public class BuiltinsDb extends Db {
             "11DsHllUpdateIN10impala_udf8FloatValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE")
         .put(Type.DOUBLE,
             "11DsHllUpdateIN10impala_udf9DoubleValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE")
+         // Hive's Datasketches implementation currently uses different hash for STRING and BINARY.
+         // Impala only supports it for STRING, but treats it as Hive treats BINARY.
+         // See IMPALA-9939 for more details.
         .put(Type.STRING,
             "11DsHllUpdateIN10impala_udf9StringValEEEvPNS2_15FunctionContextERKT_PS3_")
         .build();
@@ -606,6 +609,8 @@ public class BuiltinsDb extends Db {
             "3MinIN10impala_udf9DoubleValEEEvPNS2_15FunctionContextERKT_PS6_")
         .put(Type.STRING,
             "3MinIN10impala_udf9StringValEEEvPNS2_15FunctionContextERKT_PS6_")
+        .put(Type.BINARY,
+            "3MinIN10impala_udf9StringValEEEvPNS2_15FunctionContextERKT_PS6_")
         .put(Type.TIMESTAMP,
             "3MinIN10impala_udf12TimestampValEEEvPNS2_15FunctionContextERKT_PS6_")
         .put(Type.DECIMAL,
@@ -632,6 +637,8 @@ public class BuiltinsDb extends Db {
             "3MaxIN10impala_udf9DoubleValEEEvPNS2_15FunctionContextERKT_PS6_")
         .put(Type.STRING,
             "3MaxIN10impala_udf9StringValEEEvPNS2_15FunctionContextERKT_PS6_")
+        .put(Type.BINARY,
+            "3MaxIN10impala_udf9StringValEEEvPNS2_15FunctionContextERKT_PS6_")
         .put(Type.TIMESTAMP,
             "3MaxIN10impala_udf12TimestampValEEEvPNS2_15FunctionContextERKT_PS6_")
         .put(Type.DECIMAL,
@@ -988,6 +995,7 @@ public class BuiltinsDb extends Db {
       if (t.isNull()) continue; // NULL is handled through type promotion.
       if (t.isScalarType(PrimitiveType.CHAR)) continue; // promoted to STRING
       if (t.isScalarType(PrimitiveType.VARCHAR)) continue; // promoted to STRING
+
       // Count
       db.addBuiltin(AggregateFunction.createBuiltin(db, "count",
           Lists.newArrayList(t), Type.BIGINT, Type.BIGINT,
@@ -1016,6 +1024,14 @@ public class BuiltinsDb extends Db {
           prefix + MAX_UPDATE_SYMBOL.get(t),
           minMaxSerializeOrFinalize, minMaxGetValue,
           null, minMaxSerializeOrFinalize, true, true, false));
+    }
+
+    for (Type t: Type.getSupportedTypes()) {
+      if (t.isNull()) continue; // NULL is handled through type promotion.
+      if (t.isScalarType(PrimitiveType.CHAR)) continue; // promoted to STRING
+      if (t.isScalarType(PrimitiveType.VARCHAR)) continue; // promoted to STRING
+      if (t.isBinary()) continue; // Only supported for count/min/max
+
       // Sample
       db.addBuiltin(AggregateFunction.createBuiltin(db, "sample",
           Lists.newArrayList(t), Type.STRING, Type.STRING,
@@ -1584,6 +1600,8 @@ public class BuiltinsDb extends Db {
       if (t.isNull()) continue; // NULL is handled through type promotion.
       if (t.isScalarType(PrimitiveType.CHAR)) continue; // promoted to STRING
       if (t.isScalarType(PrimitiveType.VARCHAR)) continue; // promoted to STRING
+      // BINARY is not supported for analytic functions.
+      if (t.isBinary()) continue;
       db.addBuiltin(AggregateFunction.createAnalyticBuiltin(
           db, "first_value", Lists.newArrayList(t), t, t,
           t.isStringType() ? initNullString : initNull,
diff --git a/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java b/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
index fcb52f83c..8f5621ec5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
@@ -623,13 +623,17 @@ public class ColumnStats {
         }
         break;
       case BINARY:
-        isCompatible = statsData.isSetStringStats();
+        isCompatible = statsData.isSetBinaryStats();
         if (isCompatible) {
           BinaryColumnStatsData binaryStats = statsData.getBinaryStats();
           numNulls_ = binaryStats.getNumNulls();
           maxSize_ = binaryStats.getMaxColLen();
           avgSize_ = Double.valueOf(binaryStats.getAvgColLen()).floatValue();
-          avgSerializedSize_ = avgSize_ + PrimitiveType.BINARY.getSlotSize();
+          if (avgSize_ >= 0) {
+            avgSerializedSize_ = avgSize_ + PrimitiveType.BINARY.getSlotSize();
+          } else {
+            avgSerializedSize_ = -1;
+          }
         }
         break;
       case DECIMAL:
@@ -737,6 +741,8 @@ public class ColumnStats {
     long numFalses = colStats.getNum_falses();
     boolean isLowValueSet = colStats.isSetLow_value();
     boolean isHighValueSet = colStats.isSetHigh_value();
+    long maxStrLen = colStats.getMax_size();
+    double avgStrLen = colStats.getAvg_size();
     switch(colType.getPrimitiveType()) {
       case BOOLEAN:
         colStatsData.setBooleanStats(
@@ -854,11 +860,14 @@ public class ColumnStats {
       case CHAR:
       case VARCHAR:
       case STRING:
-        long maxStrLen = colStats.getMax_size();
-        double avgStrLen = colStats.getAvg_size();
         colStatsData.setStringStats(
             new StringColumnStatsData(maxStrLen, avgStrLen, numNulls, ndv));
         break;
+      case BINARY:
+        // No NDV is stored for BINARY.
+        colStatsData.setBinaryStats(
+            new BinaryColumnStatsData(maxStrLen, avgStrLen, numNulls));
+        break;
       case DECIMAL:
         {
           double decMaxNdv = Math.pow(10, colType.getPrecision());
diff --git a/fe/src/main/java/org/apache/impala/catalog/Function.java b/fe/src/main/java/org/apache/impala/catalog/Function.java
index 609743cd4..62c5f040d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Function.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Function.java
@@ -523,6 +523,7 @@ public class Function extends CatalogObjectImpl {
     case VARCHAR:
     case CHAR:
     case FIXED_UDA_INTERMEDIATE:
+    case BINARY:
       // These types are marshaled into a StringVal.
       return "StringVal";
     case TIMESTAMP:
diff --git a/fe/src/main/java/org/apache/impala/catalog/PrimitiveType.java b/fe/src/main/java/org/apache/impala/catalog/PrimitiveType.java
index d8e0f549c..d759bcea6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/PrimitiveType.java
+++ b/fe/src/main/java/org/apache/impala/catalog/PrimitiveType.java
@@ -41,9 +41,7 @@ public enum PrimitiveType {
   // 8-byte pointer and 4-byte length indicator (12 bytes total).
   STRING("STRING", 12, TPrimitiveType.STRING),
   VARCHAR("VARCHAR", 12, TPrimitiveType.VARCHAR),
-
-  // Unsupported scalar type.
-  BINARY("BINARY", -1, TPrimitiveType.BINARY),
+  BINARY("BINARY", 12, TPrimitiveType.BINARY),
 
   // For decimal at the highest precision, the BE uses 16 bytes.
   DECIMAL("DECIMAL", 16, TPrimitiveType.DECIMAL),
diff --git a/fe/src/main/java/org/apache/impala/catalog/ScalarFunction.java b/fe/src/main/java/org/apache/impala/catalog/ScalarFunction.java
index 62b560fc1..5ad3f9845 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ScalarFunction.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ScalarFunction.java
@@ -142,6 +142,7 @@ public class ScalarFunction extends Function {
           break;
         case STRING:
         case VARCHAR:
+        case BINARY:
           beFn += "_StringVal";
           break;
         case CHAR:
diff --git a/fe/src/main/java/org/apache/impala/catalog/ScalarType.java b/fe/src/main/java/org/apache/impala/catalog/ScalarType.java
index 23877b5e7..48b1d5755 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ScalarType.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ScalarType.java
@@ -325,10 +325,15 @@ public class ScalarType extends Type {
 
   @Override
   public boolean supportsTablePartitioning() {
-    if (!isSupported() || isComplexType() || type_ == PrimitiveType.TIMESTAMP) {
-      return false;
+    if (!isSupported() || isComplexType()) return false;
+    switch (type_) {
+      case TIMESTAMP:
+      // Hive allows BINARY partition columns, but it is buggy at the moment: HIVE-12680
+      case BINARY:
+        return false;
+      default:
+        return true;
     }
-    return true;
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/Type.java b/fe/src/main/java/org/apache/impala/catalog/Type.java
index 249eb9dbd..1068ab8d6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Type.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Type.java
@@ -109,9 +109,9 @@ public abstract class Type {
     supportedTypes.add(TIMESTAMP);
     supportedTypes.add(DECIMAL);
     supportedTypes.add(DATE);
+    supportedTypes.add(BINARY);
 
     unsupportedTypes = new ArrayList<>();
-    unsupportedTypes.add(BINARY);
     unsupportedTypes.add(DATETIME);
   }
 
@@ -189,13 +189,14 @@ public abstract class Type {
   public boolean isDecimal() { return isScalarType(PrimitiveType.DECIMAL); }
   public boolean isFullySpecifiedDecimal() { return false; }
   public boolean isVarchar() { return isScalarType(PrimitiveType.VARCHAR); }
+  public boolean isBinary() { return isScalarType(PrimitiveType.BINARY); }
   public boolean isWildcardDecimal() { return false; }
   public boolean isWildcardVarchar() { return false; }
   public boolean isWildcardChar() { return false; }
 
   public boolean isStringType() {
     return isScalarType(PrimitiveType.STRING) || isScalarType(PrimitiveType.VARCHAR) ||
-        isScalarType(PrimitiveType.CHAR);
+        isScalarType(PrimitiveType.CHAR) || isScalarType(PrimitiveType.BINARY);
   }
 
   public boolean isScalarType() { return this instanceof ScalarType; }
@@ -508,6 +509,7 @@ public abstract class Type {
     ScalarType t = (ScalarType) this;
     switch (t.getPrimitiveType()) {
       case STRING:
+      case BINARY:
         return Integer.MAX_VALUE;
       case TIMESTAMP:
         return 29;
@@ -679,9 +681,12 @@ public abstract class Type {
     for (int i = 0; i < PrimitiveType.values().length; ++i) {
       // Each type is compatible with itself.
       compatibilityMatrix[i][i] = PrimitiveType.values()[i];
-      // BINARY is not supported.
-      compatibilityMatrix[BINARY.ordinal()][i] = PrimitiveType.INVALID_TYPE;
-      compatibilityMatrix[i][BINARY.ordinal()] = PrimitiveType.INVALID_TYPE;
+
+      if (i != BINARY.ordinal() && i != STRING.ordinal()) {
+        // BINARY can be only cast to / from STRING.
+        compatibilityMatrix[BINARY.ordinal()][i] = PrimitiveType.INVALID_TYPE;
+        compatibilityMatrix[i][BINARY.ordinal()] = PrimitiveType.INVALID_TYPE;
+      }
 
       // FIXED_UDA_INTERMEDIATE cannot be cast to/from another type
       if (i != FIXED_UDA_INTERMEDIATE.ordinal()) {
@@ -825,6 +830,11 @@ public abstract class Type {
 
     compatibilityMatrix[STRING.ordinal()][VARCHAR.ordinal()] = PrimitiveType.STRING;
     compatibilityMatrix[STRING.ordinal()][CHAR.ordinal()] = PrimitiveType.STRING;
+    // STRING <->  BINARY conversion is not lossy, but implicit cast is not allowed to
+    // enforce exact type match in function calls.
+    compatibilityMatrix[STRING.ordinal()][BINARY.ordinal()] = PrimitiveType.INVALID_TYPE;
+    strictCompatibilityMatrix[STRING.ordinal()][BINARY.ordinal()]
+        = PrimitiveType.INVALID_TYPE;
 
     compatibilityMatrix[VARCHAR.ordinal()][CHAR.ordinal()] = PrimitiveType.INVALID_TYPE;
 
diff --git a/fe/src/main/java/org/apache/impala/hive/executor/HiveGenericJavaFunction.java b/fe/src/main/java/org/apache/impala/hive/executor/HiveGenericJavaFunction.java
index 8491bb53b..c16543cef 100644
--- a/fe/src/main/java/org/apache/impala/hive/executor/HiveGenericJavaFunction.java
+++ b/fe/src/main/java/org/apache/impala/hive/executor/HiveGenericJavaFunction.java
@@ -184,6 +184,8 @@ public class HiveGenericJavaFunction implements HiveJavaFunction {
         return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
       case STRING:
         return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
+      case BINARY:
+        return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
       default:
         throw new CatalogException("Unsupported type: " + t);
     }
diff --git a/fe/src/main/java/org/apache/impala/hive/executor/HiveLegacyJavaFunction.java b/fe/src/main/java/org/apache/impala/hive/executor/HiveLegacyJavaFunction.java
index e88f16644..3de5add1b 100644
--- a/fe/src/main/java/org/apache/impala/hive/executor/HiveLegacyJavaFunction.java
+++ b/fe/src/main/java/org/apache/impala/hive/executor/HiveLegacyJavaFunction.java
@@ -199,7 +199,7 @@ public class HiveLegacyJavaFunction implements HiveJavaFunction {
     // the function definition. This happens when both of them map to the same primitive
     // type.
     JavaUdfDataType javaRetType = JavaUdfDataType.getType(m.getReturnType());
-    if (retType.getPrimitiveType().toThrift() != javaRetType.getPrimitiveType()) {
+    if (!javaRetType.isCompatibleWith(retType.getPrimitiveType().toThrift())) {
       return false;
     }
 
@@ -211,8 +211,8 @@ public class HiveLegacyJavaFunction implements HiveJavaFunction {
     for (int i = 0; i < m.getParameterTypes().length; ++i) {
       JavaUdfDataType javaArgType =
           JavaUdfDataType.getType(m.getParameterTypes()[i]);
-      if (javaArgType.getPrimitiveType() !=
-          parameterTypes[i].getPrimitiveType().toThrift()) {
+      if (!javaArgType.isCompatibleWith(
+          parameterTypes[i].getPrimitiveType().toThrift())) {
         return false;
       }
     }
diff --git a/fe/src/main/java/org/apache/impala/hive/executor/JavaUdfDataType.java b/fe/src/main/java/org/apache/impala/hive/executor/JavaUdfDataType.java
index 66d000fc1..43410f553 100644
--- a/fe/src/main/java/org/apache/impala/hive/executor/JavaUdfDataType.java
+++ b/fe/src/main/java/org/apache/impala/hive/executor/JavaUdfDataType.java
@@ -101,6 +101,8 @@ import org.apache.impala.thrift.TPrimitiveType;
           return JavaUdfDataType.DOUBLE_WRITABLE;
         case STRING:
           return JavaUdfDataType.TEXT;
+        case BINARY:
+          return JavaUdfDataType.BYTES_WRITABLE;
         default:
           return null;
       }
@@ -151,6 +153,11 @@ import org.apache.impala.thrift.TPrimitiveType;
       if (TPrimitiveType.INVALID_TYPE == t.getPrimitiveType().toThrift()) {
         return false;
       }
+
+      // While BYTES_WRITABLE and BYTE_ARRAY maps to STRING to keep compatibility,
+      // BINARY is also accepted (IMPALA-11340).
+      if (t.isBinary()) return true;
+
       for(JavaUdfDataType javaType: JavaUdfDataType.values()) {
         if (javaType.getPrimitiveType() == t.getPrimitiveType().toThrift()) {
           return true;
@@ -158,5 +165,15 @@ import org.apache.impala.thrift.TPrimitiveType;
       }
       return false;
     }
+
+    public boolean isCompatibleWith(TPrimitiveType t) {
+      if (t == getPrimitiveType()) return true;
+      if (t == TPrimitiveType.BINARY) {
+        // While BYTES_WRITABLE and BYTE_ARRAY maps to STRING to keep compatibility,
+        // BINARY is also accepted (IMPALA-11340).
+        if (this == BYTE_ARRAY || this == BYTES_WRITABLE) return true;
+      }
+      return false;
+    }
   }
 
diff --git a/fe/src/main/java/org/apache/impala/util/AvroSchemaConverter.java b/fe/src/main/java/org/apache/impala/util/AvroSchemaConverter.java
index 773c3f1ba..31022260b 100644
--- a/fe/src/main/java/org/apache/impala/util/AvroSchemaConverter.java
+++ b/fe/src/main/java/org/apache/impala/util/AvroSchemaConverter.java
@@ -151,6 +151,7 @@ public class AvroSchemaConverter {
       case STRING: return Schema.create(Schema.Type.STRING);
       case CHAR: return Schema.create(Schema.Type.STRING);
       case VARCHAR: return Schema.create(Schema.Type.STRING);
+      case BINARY: return Schema.create(Schema.Type.BYTES);
       case TINYINT: return Schema.create(Schema.Type.INT);
       case SMALLINT: return Schema.create(Schema.Type.INT);
       case INT: return Schema.create(Schema.Type.INT);
diff --git a/fe/src/main/java/org/apache/impala/util/AvroSchemaParser.java b/fe/src/main/java/org/apache/impala/util/AvroSchemaParser.java
index 92992f0ce..a214664f7 100644
--- a/fe/src/main/java/org/apache/impala/util/AvroSchemaParser.java
+++ b/fe/src/main/java/org/apache/impala/util/AvroSchemaParser.java
@@ -131,11 +131,7 @@ public class AvroSchemaParser {
         return structType;
       case BYTES:
         String logicalType = schema.getProp("logicalType");
-        if (logicalType == null) {
-          throw new AnalysisException(String.format(
-            "logicalType for column '%s' specified at wrong level or was not specified",
-             colName));
-        }
+        if (logicalType == null) return Type.BINARY;
         // Decimal is stored in Avro as a BYTE.
         if (logicalType.equalsIgnoreCase("decimal")) {
           return getDecimalType(schema);
diff --git a/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java b/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java
index 3c6bb51ea..250f344b4 100644
--- a/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java
@@ -178,7 +178,8 @@ public abstract class AvroSchemaUtils {
       // are taken from the Avro schema.
       if ((colDef.getType().isStringType() && avroCol.getType().isStringType())) {
         Preconditions.checkState(
-            avroCol.getType().getPrimitiveType() == PrimitiveType.STRING);
+            avroCol.getType().getPrimitiveType() == PrimitiveType.STRING
+            || avroCol.getType().isBinary());
         Map<ColumnDef.Option, Object> option = Maps.newHashMap();
         String comment = avroCol.getComment();
         if (comment != null) option.put(ColumnDef.Option.COMMENT, comment);
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 292443425..87df7da18 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -1931,7 +1931,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         "DROP VIEW not allowed on a table: functional.alltypes");
 
     // No analysis error for tables that can't be loaded.
-    AnalyzesOk("drop table functional.unsupported_partition_types");
+    AnalyzesOk("drop table functional.unsupported_binary_partition");
   }
 
   @Test
@@ -2666,6 +2666,8 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     // Unsupported partition-column types.
     AnalysisError("create table new_table (i int) PARTITIONED BY (t timestamp)",
         "Type 'TIMESTAMP' is not supported as partition-column type in column: t");
+    AnalysisError("create table new_table (i int) PARTITIONED BY (t binary)",
+        "Type 'BINARY' is not supported as partition-column type in column: t");
 
     // Caching ops
     AnalyzesOk("create table cached_tbl(i int) partitioned by(j int) " +
@@ -3327,7 +3329,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     AnalyzesOk("create function identity(string) RETURNS int " +
         "LOCATION '/test-warehouse/libTestUdfs.so' " + "SYMBOL='Identity'");
     AnalyzesOk("create function all_types_fn(string, boolean, tinyint, " +
-        "smallint, int, bigint, float, double, decimal, date) returns int " +
+        "smallint, int, bigint, float, double, decimal, date, binary) returns int " +
         "location '/test-warehouse/libTestUdfs.so' symbol='AllTypes'");
 
     // Try creating functions with illegal function names.
@@ -3642,6 +3644,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     TypeDefsAnalyzeOk("DECIMAL");
     TypeDefsAnalyzeOk("TIMESTAMP");
     TypeDefsAnalyzeOk("DATE");
+    TypeDefsAnalyzeOk("BINARY");
 
     // Test decimal.
     TypeDefsAnalyzeOk("DECIMAL");
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java
index 1230451cb..568ae3582 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java
@@ -279,6 +279,10 @@ public class AnalyzeExprsTest extends AnalyzerTest {
       // Operator can compare bool column and literals
       AnalyzesOk("select * from functional.alltypes where bool_col " + operator
           + " true");
+      // Operator can compare binary columns and literals.
+      AnalyzesOk(
+          "select * from functional.binary_tbl where binary_col " + operator +
+           " cast('hi' as binary)");
 
       // Decimal types of different precisions and scales are comparable
       String decimalColumns[] = new String[]{"d1", "d2", "d3", "d4", "d5", "NULL"};
@@ -313,7 +317,7 @@ public class AnalyzeExprsTest extends AnalyzerTest {
       // Binary operators do not operate on expression with incompatible types
       for (String numeric_type: new String[]{"BOOLEAN", "TINYINT", "SMALLINT", "INT",
           "BIGINT", "FLOAT", "DOUBLE", "DECIMAL(9,0)"}) {
-        for (String string_type: new String[]{"STRING", "TIMESTAMP", "DATE"}) {
+        for (String string_type: new String[]{"STRING", "BINARY", "TIMESTAMP", "DATE"}) {
           AnalysisError("select cast(NULL as " + numeric_type + ") "
               + operator + " cast(NULL as " + string_type + ")",
               "operands of type " + numeric_type + " and " + string_type +
@@ -368,11 +372,14 @@ public class AnalyzeExprsTest extends AnalyzerTest {
         "Invalid type cast of CAST(1 AS TIMESTAMP) from TIMESTAMP to DECIMAL(9,0)");
     AnalysisError("select cast(date '1970-01-01' as decimal)",
         "Invalid type cast of DATE '1970-01-01' from DATE to DECIMAL(9,0)");
+    AnalysisError("select cast(cast(\"1.1\" as binary) as decimal)",
+        "Invalid type cast of CAST('1.1' AS BINARY) from BINARY to DECIMAL(9,0)");
 
     for (Type type: Type.getSupportedTypes()) {
       if (type.isNull() || type.isDecimal() || type.isBoolean() || type.isDateOrTimeType()
           || type.getPrimitiveType() == PrimitiveType.VARCHAR
-          || type.getPrimitiveType() == PrimitiveType.CHAR) {
+          || type.getPrimitiveType() == PrimitiveType.CHAR
+          || type.getPrimitiveType() == PrimitiveType.BINARY) {
         continue;
       }
       AnalyzesOk("select cast(1.1 as " + type + ")");
@@ -576,10 +583,15 @@ public class AnalyzeExprsTest extends AnalyzerTest {
         "right operand of LIKE must be of type STRING");
     AnalysisError("select * from functional.alltypes where 'test' ilike 5",
         "right operand of ILIKE must be of type STRING");
+    AnalysisError("select * from functional.alltypes where string_col like " +
+                  "cast('test%' as binary)",
+        "right operand of LIKE must be of type STRING");
     AnalysisError("select * from functional.alltypes where int_col like 'test%'",
         "left operand of LIKE must be of type STRING");
     AnalysisError("select * from functional.alltypes where int_col ilike 'test%'",
         "left operand of ILIKE must be of type STRING");
+    AnalysisError("select * from functional.binary_tbl where binary_col like 'test%'",
+        "left operand of LIKE must be of type STRING");
     AnalysisError("select * from functional.alltypes where string_col regexp 'test]['",
         "invalid regular expression in 'string_col REGEXP 'test][''");
     AnalysisError("select * from functional.alltypes where string_col iregexp 'test]['",
@@ -659,6 +671,7 @@ public class AnalyzeExprsTest extends AnalyzerTest {
   public void TestIsNullPredicates() throws AnalysisException {
     AnalyzesOk("select * from functional.alltypes where int_col is null");
     AnalyzesOk("select * from functional.alltypes where string_col is not null");
+    AnalyzesOk("select * from functional.binary_tbl where binary_col is null");
     AnalyzesOk("select * from functional.alltypes where null is not null");
 
     AnalysisError("select 1 from functional.allcomplextypes where int_map_col is null",
@@ -721,6 +734,9 @@ public class AnalyzeExprsTest extends AnalyzerTest {
         "where 'abc' between string_col and date_string_col");
     AnalyzesOk("select * from functional.alltypes " +
         "where 'abc' not between string_col and date_string_col");
+    AnalyzesOk("select * from functional.binary_tbl " +
+        "where cast('abc' as binary) not between cast(string_col as binary) " +
+        "and binary_col");
     // Additional predicates before and/or after between predicate.
     AnalyzesOk("select * from functional.alltypes " +
         "where string_col = 'abc' and tinyint_col between 10 and 20");
@@ -774,6 +790,11 @@ public class AnalyzeExprsTest extends AnalyzerTest {
         "where tiny_struct between 10 and 20",
         "Incompatible return types 'STRUCT<b:BOOLEAN>' and 'TINYINT' " +
         "of exprs 'tiny_struct' and '10'.");
+    AnalysisError("select * from functional.binary_tbl " +
+        "where string_col between binary_col and 'a'",
+        "Incompatible return types 'STRING' and 'BINARY' " +
+        "of exprs 'string_col' and 'binary_col'.");
+
     // IMPALA-7211: Do not cast decimal types to other decimal types
     AnalyzesOk("select cast(1 as decimal(38,2)) between " +
         "0.9 * cast(1 as decimal(38,3)) and 3");
@@ -1926,6 +1947,11 @@ public class AnalyzeExprsTest extends AnalyzerTest {
     AnalysisError("select nullif(1,2,3)", "default.nullif() unknown");
     AnalysisError("select nullif('x', 1)",
         "operands of type STRING and TINYINT are not comparable: 'x' IS DISTINCT FROM 1");
+
+    // Check limited function support for BINARY.
+    AnalyzesOk("select length(cast('a' as binary))");
+    AnalysisError("select lower(cast('a' as binary))",
+        "No matching function with signature: lower(BINARY).");
   }
 
   @Test
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index 4926637c4..3dd0a30dd 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -1671,6 +1671,12 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
     AnalysisError("select * from functional.alltypes a full outer join " +
         "functional.alltypes b",
         "FULL OUTER JOIN requires an ON or USING clause");
+
+    // BINARY columns can be used in joins.
+    AnalyzesOk("select * from functional.binary_tbl a join " +
+        "functional.binary_tbl b on a.binary_col = b.binary_col");
+    AnalyzesOk("select * from functional.binary_tbl a join " +
+        "functional.binary_tbl b using (binary_col)");
   }
 
   @Test
@@ -2302,6 +2308,9 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
         "SUM requires a numeric parameter: sum(string_col)");
     AnalysisError("select avg(string_col) from functional.alltypes",
         "AVG requires a numeric or timestamp parameter: avg(string_col)");
+    AnalysisError("select avg(binary_col) from functional.binary_tbl",
+        "AVG requires a numeric or timestamp parameter: avg(binary_col)");
+
 
     // aggregate requires table in the FROM clause
     AnalysisError("select count(*)", "aggregation without a FROM clause is not allowed");
@@ -2327,6 +2336,12 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
     AnalyzesOk("select ndv(date_col), distinctpc(date_col), distinctpcsa(date_col), "
         + "count(distinct date_col) from functional.date_tbl");
 
+    // Binary
+    AnalyzesOk("select min(binary_col), max(binary_col), count(binary_col), "
+        + "max(length(binary_col)) from functional.binary_tbl");
+    AnalysisError("select ndv(binary_col) from functional.binary_tbl",
+        "No matching function with signature: ndv(BINARY).");
+
     // Test select stmt avg smap.
     AnalyzesOk("select cast(avg(c1) as decimal(10,4)) as c from " +
         "functional.decimal_tiny group by c3 having cast(avg(c1) as " +
@@ -2899,6 +2914,8 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
         "group by 1, 2");
     AnalyzesOk("select date_part, date_col, count(*) from functional.date_tbl " +
         "group by 1, 2");
+    AnalyzesOk("select binary_col, count(*) from functional.binary_tbl " +
+        "group by binary_col");
 
     // doesn't group by all non-agg select list items
     AnalysisError("select zip, count(*) from functional.testtbl",
@@ -2998,6 +3015,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
         "order by true asc, false desc, NULL asc");
     AnalyzesOk("select d1, d2 from functional.decimal_tbl order by d1");
     AnalyzesOk("select date_col, date_part from functional.date_tbl order by date_col");
+    AnalyzesOk("select string_col from functional.binary_tbl order by binary_col");
 
     // resolves ordinals
     AnalyzesOk("select zip, id from functional.testtbl order by 1");
@@ -4256,7 +4274,8 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
         "from functional.alltypes");
 
     String hbaseQuery =  "INSERT " + qualifier + " TABLE " +
-        "functional_hbase.insertalltypesagg select id, bigint_col, bool_col, " +
+        "functional_hbase.insertalltypesagg select id, bigint_col, " +
+        "cast(string_col as binary), bool_col, " +
         "date_string_col, day, double_col, float_col, int_col, month, smallint_col, " +
         "string_col, timestamp_col, tinyint_col, year from functional.alltypesagg";
 
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
index 2613b3c62..7c3661fb1 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
@@ -105,6 +105,10 @@ public class AnalyzeSubqueriesTest extends AnalyzerTest {
       // Timestamp in the subquery predicate
       AnalyzesOk(String.format("select * from functional.date_tbl where " +
             "date_col %s (select timestamp_col from functional.alltypes)", op));
+      // Binary in the subquery predicate
+      AnalyzesOk(String.format("select * from functional.binary_tbl where " +
+            "binary_col %s (select cast(string_col as binary) " +
+            "from functional.alltypes)", op));
 
       // Subqueries with multiple predicates in the WHERE clause
       AnalyzesOk(String.format("select * from functional.alltypes t where t.id %s " +
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
index 7d4b8b002..ad91500eb 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
@@ -275,7 +275,7 @@ public class AnalyzerTest extends FrontendTestBase {
    * 1. Complex types, e.g., map
    *    For tables with such types we prevent loading the table metadata.
    * 2. Primitive types
-   *    For tables with unsupported primitive types (e.g., binary)
+   *    For tables with unsupported primitive types (e.g. datetime)
    *    we can run queries as long as the unsupported columns are not referenced.
    *    We fail analysis if a query references an unsupported primitive column.
    * 3. Partition-column types
@@ -283,33 +283,35 @@ public class AnalyzerTest extends FrontendTestBase {
    */
   @Test
   public void TestUnsupportedTypes() {
-    // Select supported types from a table with mixed supported/unsupported types.
-    AnalyzesOk("select int_col, date_col, str_col, bigint_col " +
-        "from functional.unsupported_types");
-
-    // Unsupported type binary.
-    AnalysisError("select bin_col from functional.unsupported_types",
-        "Unsupported type 'BINARY' in 'bin_col'.");
-    // Unsupported type binary in a star expansion.
-    AnalysisError("select * from functional.unsupported_types",
-        "Unsupported type 'BINARY' in 'functional.unsupported_types.bin_col'.");
-    // Mixed supported/unsupported types.
-    AnalysisError("select int_col, str_col, bin_col " +
-        "from functional.unsupported_types",
-        "Unsupported type 'BINARY' in 'bin_col'.");
-    AnalysisError("create table tmp as select * from functional.unsupported_types",
-        "Unsupported type 'BINARY' in 'functional.unsupported_types.bin_col'.");
-    // Unsupported type in the target insert table.
-    AnalysisError("insert into functional.unsupported_types " +
-        "values(null, null, null, null, null, null)",
-        "Unable to INSERT into target table (functional.unsupported_types) because " +
-        "the column 'bin_col' has an unsupported type 'BINARY'");
-    // Unsupported partition-column type.
-    AnalysisError("select * from functional.unsupported_partition_types",
-        "Failed to load metadata for table: 'functional.unsupported_partition_types'");
+    // With DATE and BINARY support Impala can now handle the same scalar types as Hive.
+    // There is still some gap in supported types for partition columns.
 
+    // Unsupported partition-column type.
+    AnalysisError("select * from functional.unsupported_timestamp_partition",
+        "Failed to load metadata for table: " +
+        "'functional.unsupported_timestamp_partition'");
+    AnalysisError("select * from functional.unsupported_binary_partition",
+        "Failed to load metadata for table: 'functional.unsupported_binary_partition'");
     // Try with hbase
     AnalyzesOk("describe functional_hbase.allcomplextypes");
+    // Returning complex types with BINARY in select list is not yet implemented
+    // (IMPALA-11491). Note that this is also problematic in Hive (HIVE-26454).
+    AnalysisError(
+        "select binary_item_col from functional_parquet.binary_in_complex_types",
+        "Binary type inside collection types is not supported (IMPALA-11491).");
+    AnalysisError(
+        "select binary_member_col from functional_parquet.binary_in_complex_types",
+        "Struct containing a BINARY type is not allowed in the select list " +
+        "(IMPALA-11491).");
+    // TODO: change error message once IMPALA-10918 is finished.
+    AnalysisError(
+        "select binary_key_col from functional_parquet.binary_in_complex_types",
+        "Expr 'binary_key_col' in select list returns a map type 'MAP<BINARY,INT>'." +
+        "\nMap type is not allowed in the select list.");
+    AnalysisError(
+        "select binary_value_col from functional_parquet.binary_in_complex_types",
+        "Expr 'binary_value_col' in select list returns a map type 'MAP<INT,BINARY>'." +
+        "\nMap type is not allowed in the select list.");
 
     for (ScalarType t: Type.getUnsupportedTypes()) {
       // Create/Alter table.
@@ -683,7 +685,7 @@ public class AnalyzerTest extends FrontendTestBase {
   @Test
   // Test matching function signatures.
   public void TestFunctionMatching() {
-    Function[] fns = new Function[18];
+    Function[] fns = new Function[19];
     // test()
     fns[0] = createFunction(false);
 
@@ -736,6 +738,8 @@ public class AnalyzerTest extends FrontendTestBase {
     fns[16] = createFunction(true, Type.DATE);
     // test(string...)
     fns[17] = createFunction(true, Type.STRING);
+    // test(binary...)
+    fns[18] = createFunction(true, Type.BINARY);
 
     Assert.assertFalse(fns[1].compare(fns[0], Function.CompareMode.IS_SUPERTYPE_OF));
     Assert.assertTrue(fns[1].compare(fns[2], Function.CompareMode.IS_SUPERTYPE_OF));
@@ -794,6 +798,8 @@ public class AnalyzerTest extends FrontendTestBase {
         Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF));
     Assert.assertFalse(fns[17].compare(fns[16],
         Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF));
+    Assert.assertFalse(fns[18].compare(fns[17],
+        Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF));
 
     for (int i = 0; i < fns.length; ++i) {
       for (int j = 0; j < fns.length; ++j) {
diff --git a/fe/src/test/java/org/apache/impala/analysis/AuditingTest.java b/fe/src/test/java/org/apache/impala/analysis/AuditingTest.java
index 17d2bfa3b..96f4377c5 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AuditingTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AuditingTest.java
@@ -236,16 +236,16 @@ public class AuditingTest extends FrontendTestBase {
 
     // Dropping a table that fails loading should still result in an access event.
     accessEvents = AnalyzeAccessEvents(
-        "drop table functional.unsupported_partition_types");
+        "drop table functional.unsupported_binary_partition");
     Assert.assertEquals(accessEvents, Sets.newHashSet(new TAccessEvent(
-        "functional.unsupported_partition_types", TCatalogObjectType.TABLE, "DROP")));
+        "functional.unsupported_binary_partition", TCatalogObjectType.TABLE, "DROP")));
 
     // Dropping a table without using a fully qualified path should generate the correct
     // access event (see IMPALA-5318).
     accessEvents = AnalyzeAccessEvents(
-        "drop table unsupported_partition_types", "functional");
+        "drop table unsupported_binary_partition", "functional");
     Assert.assertEquals(accessEvents, Sets.newHashSet(new TAccessEvent(
-        "functional.unsupported_partition_types", TCatalogObjectType.TABLE, "DROP")));
+        "functional.unsupported_binary_partition", TCatalogObjectType.TABLE, "DROP")));
   }
 
   @Test
diff --git a/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java b/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java
index e799a3582..dd38aface 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java
@@ -405,6 +405,8 @@ public class ExprRewriteRulesTest extends FrontendTestBase {
     RewritesOk("repeat('A', 65536)", rule, repeat("A", 65_536));
     RewritesOk("repeat('A', 4294967296)", rule, null);
 
+    // Check that constant folding can handle binary results.
+    RewritesOk("cast(concat('a', 'b') as binary)", rule, "'ab'");
   }
 
   @Test
diff --git a/fe/src/test/java/org/apache/impala/analysis/LiteralExprTest.java b/fe/src/test/java/org/apache/impala/analysis/LiteralExprTest.java
index 88d1aa3fb..42d6d96e6 100644
--- a/fe/src/test/java/org/apache/impala/analysis/LiteralExprTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/LiteralExprTest.java
@@ -44,6 +44,7 @@ public class LiteralExprTest extends FrontendTestBase {
     testLiteralExprPositive("1.0", Type.FLOAT);
     testLiteralExprPositive("1.0", Type.DOUBLE);
     testLiteralExprPositive("ABC", Type.STRING);
+    testLiteralExprPositive("ABC", Type.BINARY);
     testLiteralExprPositive("1.1", ScalarType.createDecimalType(2, 1));
     testLiteralExprPositive("2001-02-28", Type.DATE);
 
diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
index 10f4022ca..1ffe71114 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
@@ -972,6 +972,10 @@ public class ToSqlTest extends FrontendTestBase {
         "VALUES(112, DATE '1970-01-01')");
     testToSql("upsert into table functional_kudu.testtbl values(1, 'a', 1)",
         "UPSERT INTO TABLE functional_kudu.testtbl VALUES(1, 'a', 1)");
+    testToSql("insert into table functional.binary_tbl " +
+        "values(1, 'a', cast('a' as binary))",
+        "INSERT INTO TABLE functional.binary_tbl " +
+        "VALUES(1, 'a', CAST('a' AS BINARY))");
   }
 
   /**
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
index ff4d0b672..d301e20a1 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
@@ -154,6 +154,7 @@ public class CatalogTest {
     assertNotNull(catalog_.getOrLoadTable("functional", "uservisitssmall", "test", null));
     assertNotNull(catalog_.getOrLoadTable("functional", "view_view", "test", null));
     assertNotNull(catalog_.getOrLoadTable("functional", "date_tbl", "test", null));
+    assertNotNull(catalog_.getOrLoadTable("functional", "binary_tbl", "test", null));
     // IMP-163 - table with string partition column does not load if there are partitions
     assertNotNull(
         catalog_.getOrLoadTable("functional", "StringPartitionKey", "test", null));
@@ -331,6 +332,10 @@ public class CatalogTest {
         new String[] {"date_part", "id_col", "date_col"},
         new Type[] {Type.DATE, Type.INT, Type.DATE});
 
+    checkTableCols(functionalDb, "binary_tbl", 0,
+        new String[] {"id", "string_col", "binary_col"},
+        new Type[] {Type.INT, Type.STRING, Type.BINARY});
+
     // case-insensitive lookup
     assertEquals(catalog_.getOrLoadTable("functional", "alltypes", "test", null),
         catalog_.getOrLoadTable("functional", "AllTypes", "test", null));
@@ -605,6 +610,27 @@ public class CatalogTest {
     assertTrue(stringCol.getStats().getAvgSerializedSize() > 0);
     assertTrue(stringCol.getStats().getMaxSize() > 0);
     assertFalse(stringCol.getStats().hasNulls());
+
+    // DATE and BINARY types are missing from alltypesagg, so date_tbl and binary_tbl
+    // are also checked.
+    HdfsTable dateTable = (HdfsTable) catalog_.getOrLoadTable("functional", "date_tbl",
+        "test", null);
+
+    Column dateCol = dateTable.getColumn("date_col");
+    assertEquals(dateCol.getStats().getAvgSerializedSize(),
+        PrimitiveType.DATE.getSlotSize(), 0.0001);
+    assertEquals(dateCol.getStats().getMaxSize(), PrimitiveType.DATE.getSlotSize());
+    assertTrue(dateCol.getStats().hasNulls());
+
+    HdfsTable binaryTable = (HdfsTable) catalog_.getOrLoadTable("functional",
+         "binary_tbl", "test", null);
+
+    Column binaryCol = binaryTable.getColumn("binary_col");
+    assertTrue(binaryCol.getStats().getAvgSerializedSize() > 0);
+    assertTrue(binaryCol.getStats().getMaxSize() > 0);
+    assertTrue(binaryCol.getStats().hasNulls());
+    // NDV is not calculated for BINARY columns
+    assertFalse(binaryCol.getStats().hasNumDistinctValues());
   }
 
   /**
diff --git a/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
index 762908454..907603b2d 100644
--- a/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
@@ -194,19 +194,25 @@ public class PartialCatalogInfoTest {
     assertEquals(resp.lookup_status, CatalogLookupStatus.PARTITION_NOT_FOUND);
   }
 
-  @Test
-  public void testTableStats() throws Exception {
+  private List<ColumnStatisticsObj> fetchColumStats(String dbName, String tblName,
+      ImmutableList<String> columns) throws Exception {
     TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
     req.object_desc = new TCatalogObject();
     req.object_desc.setType(TCatalogObjectType.TABLE);
-    req.object_desc.table = new TTable("functional", "alltypes");
+    req.object_desc.table = new TTable(dbName, tblName);
     req.table_info_selector = new TTableInfoSelector();
-    req.table_info_selector.want_stats_for_column_names = ImmutableList.of(
-        "year", "month", "id", "bool_col", "tinyint_col", "smallint_col",
-        "int_col", "bigint_col", "float_col", "double_col", "date_string_col",
-        "string_col", "timestamp_col");
+    req.table_info_selector.want_stats_for_column_names = columns;
     TGetPartialCatalogObjectResponse resp = sendRequest(req);
-    List<ColumnStatisticsObj> stats = resp.table_info.column_stats;
+    return resp.table_info.column_stats;
+  }
+
+  @Test
+  public void testTableStats() throws Exception {
+    List<ColumnStatisticsObj> stats = fetchColumStats(
+        "functional", "alltypes", ImmutableList.of(
+            "year", "month", "id", "bool_col", "tinyint_col", "smallint_col",
+            "int_col", "bigint_col", "float_col", "double_col", "date_string_col",
+            "string_col", "timestamp_col"));
     // We have 13 columns, but 2 are the clustering columns which don't have stats.
     assertEquals(11, stats.size());
     assertEquals("ColumnStatisticsObj(colName:id, colType:INT, " +
@@ -216,15 +222,9 @@ public class PartialCatalogInfoTest {
 
   @Test
   public void testDateTableStats() throws Exception {
-    TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
-    req.object_desc = new TCatalogObject();
-    req.object_desc.setType(TCatalogObjectType.TABLE);
-    req.object_desc.table = new TTable("functional", "date_tbl");
-    req.table_info_selector = new TTableInfoSelector();
-    req.table_info_selector.want_stats_for_column_names = ImmutableList.of(
-        "date_col", "date_part");
-    TGetPartialCatalogObjectResponse resp = sendRequest(req);
-    List<ColumnStatisticsObj> stats = resp.table_info.column_stats;
+    List<ColumnStatisticsObj> stats = fetchColumStats(
+        "functional", "date_tbl",
+        ImmutableList.of("date_col", "date_part"));
     // We have 2 columns, but 1 is the clustering column which doesn't have stats.
     assertEquals(1, stats.size());
     assertEquals("ColumnStatisticsObj(colName:date_col, colType:DATE, " +
@@ -232,6 +232,17 @@ public class PartialCatalogInfoTest {
         "numNulls:2, numDVs:16)>)", stats.get(0).toString());
   }
 
+  @Test
+  public void testBinaryTableStats() throws Exception {
+    List<ColumnStatisticsObj> stats = fetchColumStats(
+        "functional", "binary_tbl", ImmutableList.of("binary_col"));
+    assertEquals(1, stats.size());
+    assertEquals("ColumnStatisticsObj(colName:binary_col, colType:BINARY, " +
+        "statsData:<ColumnStatisticsData binaryStats:BinaryColumnStatsData(" +
+        "maxColLen:26, avgColLen:8.714285850524902, numNulls:1)>)",
+        stats.get(0).toString());
+  }
+
   @Test
   public void testFetchErrorTable() throws Exception {
     TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
diff --git a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
index 824d9b5b4..63f79ea79 100644
--- a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
@@ -158,6 +158,25 @@ public class LocalCatalogTest {
     assertEquals(5, stats.getRowsSize());
   }
 
+  @Test
+  public void testLoadBinaryTableBasics() throws Exception {
+    FeDb functionalDb = catalog_.getDb("functional");
+    CatalogTest.checkTableCols(functionalDb, "binary_tbl", 0,
+        new String[] {"id", "string_col", "binary_col"},
+        new Type[] {Type.INT, Type.STRING, Type.BINARY});
+    FeTable t = functionalDb.getTable("binary_tbl");
+    assertEquals(8, t.getNumRows());
+
+    assertTrue(t instanceof LocalFsTable);
+    FeFsTable fsTable = (FeFsTable) t;
+    assertEquals(MetaStoreUtil.DEFAULT_NULL_PARTITION_KEY_VALUE,
+        fsTable.getNullPartitionKeyValue());
+
+    // Stats should have one row per partition, plus a "total" row.
+    TResultSet stats = fsTable.getTableStats();
+    assertEquals(1, stats.getRowsSize());
+  }
+
   @Test
   public void testPartitioning() throws Exception {
     FeFsTable t = (FeFsTable) catalog_.getTable("functional",  "alltypes");
@@ -331,6 +350,16 @@ public class LocalCatalogTest {
     assertEquals(2, stats.getNumNulls());
   }
 
+  @Test
+  public void testBinaryColumnStats() throws Exception {
+    FeFsTable t = (FeFsTable) catalog_.getTable("functional",  "binary_tbl");
+    ColumnStats stats = t.getColumn("binary_col").getStats();
+    assertEquals(26, stats.getMaxSize());
+    assertEquals(8.714285850524902, stats.getAvgSize(), 0.0001);
+    assertEquals(-1, stats.getNumDistinctValues());
+    assertEquals(1, stats.getNumNulls());
+  }
+
   @Test
   public void testView() throws Exception {
     FeView v = (FeView) catalog_.getTable("functional",  "alltypes_view");
@@ -428,6 +457,19 @@ public class LocalCatalogTest {
         "WITH SERDEPROPERTIES ('hbase.columns.mapping'=':key,d:date_col,d:date_part', " +
         "'serialization.format'='1')"
     ));
+
+    t = (LocalHbaseTable) catalog_.getTable("functional_hbase", "binary_tbl");
+    Assert.assertThat(ToSqlUtils.getCreateTableSql(t), CoreMatchers.startsWith(
+        "CREATE EXTERNAL TABLE functional_hbase.binary_tbl (\n" +
+        "  id INT,\n" +
+        "  binary_col BINARY,\n" +
+        "  string_col STRING\n" +
+        ")\n" +
+        "STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'\n" +
+        "WITH SERDEPROPERTIES (" +
+        "'hbase.columns.mapping'=':key,d:string_col,d:binary_col', " +
+        "'serialization.format'='1')"
+    ));
   }
 
   /**
diff --git a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
index bdf08155a..4bf1af1a6 100644
--- a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
+++ b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
@@ -156,6 +156,8 @@ public class FrontendTestBase extends AbstractFrontendTest {
       "d2 decimal(10, 0), d3 decimal(20, 10), d4 decimal(38, 38), d5 decimal(10, 5), " +
       "timestamp_col timestamp, string_col string, varchar_col varchar(50), " +
       "char_col char (30), date_col date)");
+    // TODO: no BINARY column added at the moment, as this table is used to test all
+    //       columns with sampled_ndv, which is currently not enabled for BINARY
   }
 
   /**
diff --git a/fe/src/test/java/org/apache/impala/hive/executor/TestGenericUdf.java b/fe/src/test/java/org/apache/impala/hive/executor/TestGenericUdf.java
index a732ec8e6..3097878e9 100644
--- a/fe/src/test/java/org/apache/impala/hive/executor/TestGenericUdf.java
+++ b/fe/src/test/java/org/apache/impala/hive/executor/TestGenericUdf.java
@@ -80,6 +80,7 @@ public class TestGenericUdf extends GenericUDF {
           .add(PrimitiveCategory.FLOAT)
           .add(PrimitiveCategory.DOUBLE)
           .add(PrimitiveCategory.STRING)
+          .add(PrimitiveCategory.BINARY)
           .build();
 
   public TestGenericUdf() {
@@ -134,6 +135,8 @@ public class TestGenericUdf extends GenericUDF {
         return evaluateDouble(arguments);
       case STRING:
         return evaluateString(arguments);
+      case BINARY:
+        return evaluateBinary(arguments);
       case DATE:
       case TIMESTAMP:
       default:
@@ -295,7 +298,7 @@ public class TestGenericUdf extends GenericUDF {
         return null;
       }
       if (!(input.get() instanceof Text)) {
-        throw new HiveException("Expected String but got " + input.get().getClass());
+        throw new HiveException("Expected Text but got " + input.get().getClass());
       }
       String currentString = ((Text) input.get()).toString();
       finalString += currentString;
@@ -305,6 +308,25 @@ public class TestGenericUdf extends GenericUDF {
     return resultString;
   }
 
+  public BytesWritable evaluateBinary(DeferredObject[] inputs) throws HiveException {
+    byte[] result = null;
+    for (DeferredObject input : inputs) {
+      if (input == null) {
+        return null;
+      }
+      if (!(input.get() instanceof BytesWritable)) {
+        throw new HiveException(
+            "Expected BytesWritable but got " + input.get().getClass());
+      }
+      byte[] currentArray = ((BytesWritable) input.get()).getBytes();
+      // Unlike other functions, simply return last argument.
+      result = currentArray;
+    }
+    BytesWritable resultBinary = new BytesWritable();
+    if (result != null) resultBinary.set(result, 0, result.length);
+    return resultBinary;
+  }
+
   private String getSignatureString(PrimitiveCategory argAndRetType_,
       List<PrimitiveCategory> inputTypes_) {
     return argAndRetType_ + "TestGenericUdf(" + Joiner.on(",").join(inputTypes_) + ")";
diff --git a/fe/src/test/java/org/apache/impala/service/JdbcTest.java b/fe/src/test/java/org/apache/impala/service/JdbcTest.java
index fc096dc23..6ca2f7ccc 100644
--- a/fe/src/test/java/org/apache/impala/service/JdbcTest.java
+++ b/fe/src/test/java/org/apache/impala/service/JdbcTest.java
@@ -301,6 +301,20 @@ public class JdbcTest extends JdbcTestBase {
     assertFalse(rs.next());
     rs.close();
 
+    // validate BINARY column
+    rs = con_.getMetaData().getColumns(null, "functional", "binary_tbl", null);
+    assertTrue(rs.next());
+    assertEquals("Incorrect type", Types.INTEGER, rs.getInt("DATA_TYPE"));
+    assertEquals(10, rs.getInt("COLUMN_SIZE"));
+    assertTrue(rs.next());
+    assertEquals("Incorrect type", Types.VARCHAR, rs.getInt("DATA_TYPE"));
+    assertEquals(Integer.MAX_VALUE, rs.getInt("COLUMN_SIZE"));
+    assertTrue(rs.next());
+    assertEquals("Incorrect type", Types.BINARY, rs.getInt("DATA_TYPE"));
+    assertEquals(Integer.MAX_VALUE, rs.getInt("COLUMN_SIZE"));
+    assertFalse(rs.next());
+    rs.close();
+
     // Validate complex types STRUCT/MAP/ARRAY.
     // To be consistent with Hive's behavior, the TYPE_NAME field is populated
     // with the primitive type name for scalar types, and with the full toSql()
diff --git a/java/test-hive-udfs/src/main/java/org/apache/impala/TestGenericUdf.java b/java/test-hive-udfs/src/main/java/org/apache/impala/TestGenericUdf.java
index 70052caf5..32ea1f66c 100644
--- a/java/test-hive-udfs/src/main/java/org/apache/impala/TestGenericUdf.java
+++ b/java/test-hive-udfs/src/main/java/org/apache/impala/TestGenericUdf.java
@@ -80,6 +80,7 @@ public class TestGenericUdf extends GenericUDF {
           .add(PrimitiveCategory.FLOAT)
           .add(PrimitiveCategory.DOUBLE)
           .add(PrimitiveCategory.STRING)
+          .add(PrimitiveCategory.BINARY)
           .build();
 
   public TestGenericUdf() {
@@ -134,6 +135,8 @@ public class TestGenericUdf extends GenericUDF {
         return evaluateDouble(arguments);
       case STRING:
         return evaluateString(arguments);
+      case BINARY:
+        return evaluateBinary(arguments);
       case DATE:
       case TIMESTAMP:
       default:
@@ -295,7 +298,7 @@ public class TestGenericUdf extends GenericUDF {
         return null;
       }
       if (!(input.get() instanceof Text)) {
-        throw new HiveException("Expected String but got " + input.get().getClass());
+        throw new HiveException("Expected Text but got " + input.get().getClass());
       }
       String currentString = ((Text) input.get()).toString();
       finalString += currentString;
@@ -305,6 +308,25 @@ public class TestGenericUdf extends GenericUDF {
     return resultString;
   }
 
+  public BytesWritable evaluateBinary(DeferredObject[] inputs) throws HiveException {
+    byte[] result = null;
+    for (DeferredObject input : inputs) {
+      if (input == null) {
+        return null;
+      }
+      if (!(input.get() instanceof BytesWritable)) {
+        throw new HiveException(
+            "Expected BytesWritable but got " + input.get().getClass());
+      }
+      byte[] currentArray = ((BytesWritable) input.get()).getBytes();
+      // Unlike other functions, simply return last argument.
+      result = currentArray;
+    }
+    BytesWritable resultBinary = new BytesWritable();
+    if (result != null) resultBinary.set(result, 0, result.length);
+    return resultBinary;
+  }
+
   private String getSignatureString(PrimitiveCategory argAndRetType_,
       List<PrimitiveCategory> inputTypes_) {
     return argAndRetType_ + "TestGenericUdf(" + Joiner.on(",").join(inputTypes_) + ")";
diff --git a/testdata/UnsupportedTypes/data.csv b/testdata/UnsupportedTypes/data.csv
deleted file mode 100644
index 551c34af4..000000000
--- a/testdata/UnsupportedTypes/data.csv
+++ /dev/null
@@ -1,5 +0,0 @@
-0,0,2016-05-05,aaaa,a,0
-1,1.0,,bbbb,b,10
-2,1111111111.12345678901234567890,2018-05-05,cccc,cccccccccc,20
-,2222222222.1234567890,2019-05-05,\N,dddddddddd,,
-4,,2020-05-05,eeee,,40
diff --git a/testdata/bin/compute-table-stats.sh b/testdata/bin/compute-table-stats.sh
index c452b4043..5e887e1ef 100755
--- a/testdata/bin/compute-table-stats.sh
+++ b/testdata/bin/compute-table-stats.sh
@@ -35,7 +35,7 @@ ${COMPUTE_STATS_SCRIPT} --db_names=functional\
     --table_names="alltypes,alltypesagg,alltypesaggmultifilesnopart,alltypesaggnonulls,
     alltypessmall,alltypestiny,jointbl,dimtbl,stringpartitionkey,nulltable,nullrows,
     date_tbl,chars_medium,part_strings_with_quotes,alltypes_date_partition,
-    alltypes_date_partition_2,mv1_alltypes_jointbl"
+    alltypes_date_partition_2,mv1_alltypes_jointbl,binary_tbl,binary_tbl_big"
 
 # We cannot load HBase on s3 and isilon yet.
 if [ "${TARGET_FILESYSTEM}" = "hdfs" ]; then
diff --git a/testdata/bin/generate-schema-statements.py b/testdata/bin/generate-schema-statements.py
index 525f80f0f..e37e485b8 100755
--- a/testdata/bin/generate-schema-statements.py
+++ b/testdata/bin/generate-schema-statements.py
@@ -219,6 +219,7 @@ HIVE_TO_AVRO_TYPE_MAP = {
   # a timestamp column will fail. We probably want to convert back to timestamps
   # in our tests.
   'TIMESTAMP': 'string',
+  'BINARY': 'bytes',
   }
 
 PARQUET_ALTER_STATEMENT = "ALTER TABLE %(table_name)s SET\n\
diff --git a/testdata/data/binary_tbl/000000_0.txt b/testdata/data/binary_tbl/000000_0.txt
new file mode 100644
index 000000000..a1daa66f0
--- /dev/null
+++ b/testdata/data/binary_tbl/000000_0.txt
@@ -0,0 +1,8 @@
+1,ascii,YmluYXJ5MQ==
+2,ascii,YmluYXJ5Mg==
+3,null,\N
+4,empty,
+5,valid utf8,w6FydsOtenTFsXLFkXTDvGvDtnJmw7pyw7M=
+6,valid utf8,5L2g5aW9aGVsbG8=
+7,invalid utf8,AP8A/w==
+8,invalid utf8,/0QzIhEA
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 919a6e58e..d8e185d0c 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -1192,6 +1192,7 @@ CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} (
   double_col double,
   date_string_col string,
   string_col string,
+  binary_col binary,
   timestamp_col timestamp,
   year int,
   month int,
@@ -1199,7 +1200,7 @@ CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} (
 STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
 WITH SERDEPROPERTIES (
   "hbase.columns.mapping" =
-  ":key#b,d:bool_col#b,d:tinyint_col#b,d:smallint_col#b,d:int_col#b,d:bigint_col#b,d:float_col#b,d:double_col#b,d:date_string_col,d:string_col,d:timestamp_col,d:year#b,d:month#b,d:day#b"
+  ":key#b,d:bool_col#b,d:tinyint_col#b,d:smallint_col#b,d:int_col#b,d:bigint_col#b,d:float_col#b,d:double_col#b,d:date_string_col,d:string_col,d:binary_col,d:timestamp_col,d:year#b,d:month#b,d:day#b"
 )
 TBLPROPERTIES("hbase.table.name" = "functional_hbase.insertalltypesaggbinary");
 ====
@@ -1222,6 +1223,7 @@ float_col float
 double_col double
 date_string_col string
 string_col string
+binary_col binary
 timestamp_col timestamp
 ====
 ---- DATASET
@@ -1737,39 +1739,22 @@ partition by range(id)
 ---- DATASET
 functional
 ---- BASE_TABLE_NAME
-unsupported_types
+unsupported_timestamp_partition
 ---- CREATE_HIVE
--- Create a table that mixes supported and unsupported scalar types.
--- We should be able to read the column values of supported types and
--- fail queries that reference  columns of unsupported types.
-CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} (
-  int_col INT,
-  dec_col DECIMAL,
-  date_col DATE,
-  str_col STRING,
-  bin_col BINARY,
-  bigint_col BIGINT)
-ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
-STORED AS {file_format}
-LOCATION '{hdfs_location}';
----- TABLE_PROPERTIES
-transactional=false
----- DEPENDENT_LOAD
-INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM {db_name}.{table_name};
----- LOAD
-LOAD DATA LOCAL INPATH '{impala_home}/testdata/UnsupportedTypes/data.csv' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name};
+-- Create a table that is partitioned on an unsupported partition-column type
+CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} (
+  int_col INT)
+PARTITIONED BY (t TIMESTAMP);
 ====
 ---- DATASET
 functional
 ---- BASE_TABLE_NAME
-unsupported_partition_types
+unsupported_binary_partition
 ---- CREATE_HIVE
 -- Create a table that is partitioned on an unsupported partition-column type
 CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} (
   int_col INT)
-PARTITIONED BY (t TIMESTAMP);
----- DEPENDENT_LOAD
-INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM {db_name}.{table_name};
+PARTITIONED BY (t BINARY);
 ====
 ---- DATASET
 functional
@@ -3518,6 +3503,67 @@ INSERT INTO {db_name}{db_suffix}.{table_name} VALUES
   array("1", "2", NULL),
   array(array("1", "2", NULL), array("3")),
   array(array(array("1", "2", NULL), array("3")), array(array("4")))
- )
+ );
+---- LOAD
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+binary_tbl
+---- COLUMNS
+id INT
+string_col STRING
+binary_col BINARY
+---- ROW_FORMAT
+delimited fields terminated by ','
+---- LOAD
+LOAD DATA LOCAL INPATH '{impala_home}/testdata/data/binary_tbl/000000_0.txt' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name};
+---- DEPENDENT_LOAD
+insert overwrite table {db_name}{db_suffix}.{table_name}
+select id, string_col, binary_col from functional.{table_name};
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+binary_tbl_big
+---- PARTITION_COLUMNS
+year INT
+month INT
+---- COLUMNS
+id INT
+int_col INT
+binary_col BINARY
+binary_col_with_nulls BINARY
 ---- LOAD
+SET hive.exec.dynamic.partition.mode=nonstrict;
+SET hive.exec.dynamic.partition=true;
+insert overwrite table {db_name}{db_suffix}.{table_name} partition(year, month)
+select id, int_col, cast(string_col as binary),
+       cast(case when id % 2 = 0 then date_string_col else NULL end as binary),
+       year, month
+    from functional.alltypes;
+---- DEPENDENT_LOAD
+insert overwrite table {db_name}{db_suffix}.{table_name} partition(year, month)
+select id, int_col, cast(string_col as binary),
+       cast(case when id % 2 = 0 then date_string_col else NULL end as binary),
+       year, month
+    from functional.alltypes;
 ====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+binary_in_complex_types
+---- COLUMNS
+binary_item_col array<binary>
+binary_key_col map<binary, int>
+binary_value_col map<int, binary>
+binary_member_col struct<i:int, b:binary>
+---- DEPENDENT_LOAD_HIVE
+insert overwrite table {db_name}{db_suffix}.{table_name}
+values (
+  array(cast("item1" as binary), cast("item2" as binary)),
+  map(cast("key1" as binary), 1, cast("key2" as binary), 2),
+  map(1, cast("value1" as binary), 2, cast("value2" as binary)),
+  named_struct("i", 0, "b", cast("member" as binary))
+  )
+====
\ No newline at end of file
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index 68ebace5c..79b2c6e26 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -119,7 +119,6 @@ table_name:complextypes_arrays, constraint:restrict_to, table_format:orc/def/blo
 
 table_name:alltypeserror, constraint:exclude, table_format:parquet/none/none
 table_name:alltypeserrornonulls, constraint:exclude, table_format:parquet/none/none
-table_name:unsupported_types, constraint:exclude, table_format:parquet/none/none
 table_name:escapechartesttable, constraint:exclude, table_format:parquet/none/none
 table_name:TblWithRaggedColumns, constraint:exclude, table_format:parquet/none/none
 
@@ -130,7 +129,8 @@ table_name:text_thorn_ecirc_newline, constraint:restrict_to, table_format:text/n
 
 table_name:bad_serde, constraint:restrict_to, table_format:text/none/none
 table_name:rcfile_lazy_binary_serde, constraint:restrict_to, table_format:rc/none/none
-table_name:unsupported_partition_types, constraint:restrict_to, table_format:text/none/none
+table_name:unsupported_timestamp_partition, constraint:restrict_to, table_format:text/none/none
+table_name:unsupported_binary_partition, constraint:restrict_to, table_format:text/none/none
 table_name:nullformat_custom, constraint:exclude, table_format:parquet/none/none
 
 table_name:alltypes_view, constraint:restrict_to, table_format:text/none/none
@@ -171,7 +171,6 @@ table_name:widerow, constraint:exclude, table_format:hbase/none/none
 # which is not supported in hbase. The schema is also specified in HIVE_CREATE
 # with no corresponding LOAD statement.
 table_name:nullformat_custom, constraint:exclude, table_format:hbase/none/none
-table_name:unsupported_types, constraint:exclude, table_format:hbase/none/none
 
 # Decimal can only be tested on formats Impala can write to (text and parquet).
 # TODO: add Avro once Hive or Impala can write Avro decimals
@@ -290,6 +289,11 @@ table_name:date_tbl_error, constraint:restrict_to, table_format:text/snap/block
 table_name:date_tbl_error, constraint:restrict_to, table_format:text/def/block
 table_name:insert_date_tbl, constraint:restrict_to, table_format:hbase/none/none
 
+table_name:binary_tbl, constraint:exclude, table_format:kudu/none/none
+table_name:binary_tbl_big, constraint:exclude, table_format:kudu/none/none
+table_name:binary_in_complex_types, constraint:restrict_to, table_format:parquet/none/none
+table_name:binary_in_complex_types, constraint:restrict_to, table_format:orc/def/block
+
 # Full transactional table is only supported for ORC
 table_name:full_transactional_table, constraint:restrict_to, table_format:orc/def/block
 
diff --git a/testdata/workloads/functional-query/queries/QueryTest/binary-type.test b/testdata/workloads/functional-query/queries/QueryTest/binary-type.test
new file mode 100644
index 000000000..3b145baa4
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/binary-type.test
@@ -0,0 +1,149 @@
+====
+---- QUERY
+select id, string_col, cast(binary_col as string) from binary_tbl
+where string_col != "invalid utf8"
+---- TYPES
+INT, STRING, STRING
+---- RESULTS: RAW_STRING
+1,'ascii','binary1'
+2,'ascii','binary2'
+3,'null','NULL'
+4,'empty',''
+5,'valid utf8','árvíztűrőtükörfúró'
+6,'valid utf8','你好hello'
+====
+---- QUERY
+select id, hex(cast(binary_col as string)) from binary_tbl
+where string_col = "invalid utf8"
+---- TYPES
+INT, STRING
+---- RESULTS
+7,'00FF00FF'
+8,'FF4433221100'
+====
+---- QUERY
+set utf8_mode=0;
+select string_col, length(binary_col) from binary_tbl
+---- TYPES
+STRING, INT
+---- RESULTS
+'ascii',7
+'ascii',7
+'null',NULL
+'empty',0
+'valid utf8',26
+'valid utf8',11
+'invalid utf8',4
+'invalid utf8',6
+====
+---- QUERY
+set utf8_mode=1;
+select string_col, length(binary_col) from binary_tbl
+---- TYPES
+STRING, INT
+---- RESULTS
+'ascii',7
+'ascii',7
+'null',NULL
+'empty',0
+'valid utf8',26
+'valid utf8',11
+'invalid utf8',4
+'invalid utf8',6
+====
+---- QUERY
+select binary_col_with_nulls from binary_tbl_big
+  where binary_col_with_nulls = cast("01/02/09" as binary)
+---- TYPES
+BINARY
+---- RESULTS
+'01/02/09'
+'01/02/09'
+'01/02/09'
+'01/02/09'
+'01/02/09'
+====
+---- QUERY
+select binary_col_with_nulls from binary_tbl_big
+  where binary_col_with_nulls > cast("12/31/09" as binary)
+---- TYPES
+BINARY
+---- RESULTS
+'12/31/10'
+'12/31/10'
+'12/31/10'
+'12/31/10'
+'12/31/10'
+====
+---- QUERY
+select distinct binary_col_with_nulls from binary_tbl_big
+  where binary_col_with_nulls < cast("01/02/09" as binary)
+---- TYPES
+BINARY
+---- RESULTS
+'01/01/09'
+'01/01/10'
+====
+---- QUERY
+set DISABLE_OUTERMOST_TOPN=0;
+select  binary_col_with_nulls from binary_tbl_big
+  where binary_col = cast("4" as binary)
+  order by binary_col_with_nulls limit 3
+---- TYPES
+BINARY
+---- RESULTS
+'01/01/09'
+'01/01/10'
+'01/02/09'
+====
+---- QUERY
+set DISABLE_OUTERMOST_TOPN=1;
+select  binary_col_with_nulls from binary_tbl_big
+  where binary_col = cast("4" as binary)
+  order by binary_col_with_nulls limit 3
+---- TYPES
+BINARY
+---- RESULTS
+'01/01/09'
+'01/01/10'
+'01/02/09'
+====
+---- QUERY
+select count(binary_col_with_nulls), max(binary_col), min(binary_col) from binary_tbl_big
+---- TYPES
+BIGINT, BINARY, BINARY
+---- RESULTS
+3650,'9','0'
+====
+---- QUERY
+# ndv is not yet added for BINARY, casting is needed (IMPALA-11351)
+select count(distinct binary_col_with_nulls), ndv(cast(binary_col_with_nulls as string))
+  from binary_tbl_big
+---- TYPES
+BIGINT, BIGINT
+---- RESULTS
+730,736
+====
+---- QUERY
+# Test multiple count distinct
+select count(distinct binary_col), count(distinct binary_col_with_nulls)
+  from binary_tbl_big
+  where id < 20
+---- TYPES
+BIGINT, BIGINT
+---- RESULTS
+10,2
+====
+---- QUERY
+select bb1.id, bb2.id, bb1.binary_col_with_nulls, bb2.binary_col_with_nulls
+  from binary_tbl_big bb1 left join functional_hbase.binary_tbl_big bb2
+  on bb1.binary_col_with_nulls = bb2.binary_col_with_nulls
+  where bb1.id < 3 and bb2.id < 3;
+---- TYPES
+INT,  INT, BINARY, BINARY
+---- RESULTS
+0,2,'01/01/09','01/01/09'
+0,0,'01/01/09','01/01/09'
+2,2,'01/01/09','01/01/09'
+2,0,'01/01/09','01/01/09'
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/generic-java-udf.test b/testdata/workloads/functional-query/queries/QueryTest/generic-java-udf.test
index 2bf2407b3..4b0849746 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/generic-java-udf.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/generic-java-udf.test
@@ -81,6 +81,13 @@ int, int, int
 10,20,30
 ====
 ---- QUERY
+select generic_identity(cast("a" as binary)), generic_identity(cast(NULL as binary));
+---- TYPES
+binary, binary
+---- RESULTS
+'a','NULL'
+====
+---- QUERY
 # IMPALA-1392: Hive UDFs that throw exceptions should return NULL
 select generic_throws_exception();
 ---- TYPES
diff --git a/testdata/workloads/functional-query/queries/QueryTest/hbase-inserts.test b/testdata/workloads/functional-query/queries/QueryTest/hbase-inserts.test
index 1ac2fd2fd..3e43e3098 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/hbase-inserts.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/hbase-inserts.test
@@ -1,7 +1,7 @@
 ====
 ---- QUERY
 insert into table insertalltypesagg
-select id, bigint_col, bool_col, date_string_col, day, double_col, float_col,
+select id, bigint_col, cast(string_col as binary), bool_col, date_string_col, day, double_col, float_col,
 int_col, month, smallint_col, string_col, timestamp_col, tinyint_col, year from functional.alltypesagg
 ---- RESULTS
 : 11000
@@ -19,7 +19,7 @@ INT, BOOLEAN
 ====
 ---- QUERY
 insert into table insertalltypesagg
-select 9999999, bigint_col, false, date_string_col, day, double_col, float_col,
+select 9999999, bigint_col, cast(string_col as binary), false, date_string_col, day, double_col, float_col,
 int_col, month, smallint_col, string_col, timestamp_col, tinyint_col, year from functional.alltypesagg
 ---- RESULTS
 : 11000
@@ -46,22 +46,22 @@ select * from insertalltypesagg limit 1
 # test inserting Hive's default text representation of NULL '\N'
 # and make sure a scan returns the string and not NULL
 insert into table insertalltypesagg
-select 9999999, bigint_col, false, "\\N", day, double_col, float_col,
+select 9999999, bigint_col, cast("\\N" as binary), false, "\\N", day, double_col, float_col,
 int_col, month, smallint_col, "\\N", timestamp_col, tinyint_col, year from functional.alltypesagg limit 1
 ---- RESULTS
 : 1
 ====
 ---- QUERY
-select id, date_string_col, string_col from insertalltypesagg
+select id, date_string_col, string_col, binary_col from insertalltypesagg
 where id = 9999999
 ---- RESULTS
-9999999,'\\N','\\N'
+9999999,'\\N','\\N','\\N'
 ---- TYPES
-INT, STRING, STRING
+INT, STRING, STRING, BINARY
 ====
 ---- QUERY
 insert into table insertalltypesaggbinary
-select id, bigint_col, bool_col, date_string_col, day, double_col, float_col,
+select id, bigint_col, cast(string_col as binary), bool_col, date_string_col, day, double_col, float_col,
 int_col, month, smallint_col, string_col, timestamp_col, tinyint_col, year from functional.alltypesagg
 ---- RESULTS
 : 11000
@@ -109,7 +109,7 @@ INT, BOOLEAN
 ====
 ---- QUERY
 insert into table insertalltypesaggbinary
-select 9999999, bigint_col, false, date_string_col, day, double_col, float_col,
+select 9999999, bigint_col,  cast(string_col as binary), false, date_string_col, day, double_col, float_col,
 int_col, month, smallint_col, string_col, timestamp_col, tinyint_col, year from functional.alltypesagg
 ---- RESULTS
 : 11000
@@ -136,18 +136,18 @@ select * from insertalltypesaggbinary limit 1
 # test inserting Hive's default text representation of NULL '\N'
 # and make sure a scan returns the string and not NULL
 insert into table insertalltypesaggbinary
-select 9999999, bigint_col, false, "\\N", day, double_col, float_col,
+select 9999999, bigint_col,  cast("\\N" as binary), false, "\\N", day, double_col, float_col,
 int_col, month, smallint_col, "\\N", timestamp_col, tinyint_col, year from functional.alltypesagg limit 1
 ---- RESULTS
 : 1
 ====
 ---- QUERY
-select id, date_string_col, string_col from insertalltypesaggbinary
+select id, date_string_col, string_col, binary_col from insertalltypesaggbinary
 where id = 9999999
 ---- RESULTS
-9999999,'\\N','\\N'
+9999999,'\\N','\\N','\\N'
 ---- TYPES
-INT, STRING, STRING
+INT, STRING, STRING, BINARY
 ====
 ---- QUERY
 #IMPALA-715 handle large string value
@@ -233,3 +233,16 @@ select * from insert_date_tbl limit 1
 ---- RESULTS
 : 1
 ====
+---- QUERY
+# Insert special characters to binary_col.
+insert into table insertalltypesagg (id, binary_col) values (99999999, cast(unhex('00112233445566778899AABBCCDDEEFF') as binary))
+---- RESULTS
+: 1
+====
+---- QUERY
+select id, hex(cast(binary_col as string)) from insertalltypesagg where id = 99999999
+---- RESULTS
+99999999,'00112233445566778899AABBCCDDEEFF'
+---- TYPES
+INT, STRING
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/java-udf.test b/testdata/workloads/functional-query/queries/QueryTest/java-udf.test
index e23139de1..3a18780da 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/java-udf.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/java-udf.test
@@ -85,6 +85,13 @@ string, string, string, string, string, string
 'why hello there','why hello there','why hello there','NULL','NULL','NULL'
 ====
 ---- QUERY
+select identity(cast("a" as binary)), identity(cast(NULL as binary));
+---- TYPES
+binary, binary
+---- RESULTS
+'a','NULL'
+====
+---- QUERY
 # IMPALA-1134. Each "identity" call below tests a different type (BytesWritable, Text,
 # and String). The different types are handled slightly differently.
 select length(identity("0123456789")),
@@ -237,6 +244,13 @@ string
 'abc'
 ====
 ---- QUERY
+# BINARY is only supported when the function is created
+# specifically with BINARY arguments / return type (IMPALA-11340).
+select identity_anytype(cast("a" as binary));
+---- CATCH
+AnalysisException: No matching function with signature
+====
+---- QUERY
 # IMPALA-3378: test many Java UDFs being opened and run concurrently
 select * from
 (select max(int_col) from functional.alltypesagg
diff --git a/testdata/workloads/functional-query/queries/QueryTest/load-generic-java-udfs.test b/testdata/workloads/functional-query/queries/QueryTest/load-generic-java-udfs.test
index 41ae36c24..ab679bcd4 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/load-generic-java-udfs.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/load-generic-java-udfs.test
@@ -48,6 +48,10 @@ create function generic_identity(string) returns string
 location '$FILESYSTEM_PREFIX/test-warehouse/impala-hive-udfs.jar'
 symbol='org.apache.impala.TestGenericUdf';
 
+create function generic_identity(binary) returns binary
+location '$FILESYSTEM_PREFIX/test-warehouse/impala-hive-udfs.jar'
+symbol='org.apache.impala.TestGenericUdf';
+
 create function generic_add(string, string) returns string
 location '$FILESYSTEM_PREFIX/test-warehouse/impala-hive-udfs.jar'
 symbol='org.apache.impala.TestGenericUdf';
diff --git a/testdata/workloads/functional-query/queries/QueryTest/load-java-udfs.test b/testdata/workloads/functional-query/queries/QueryTest/load-java-udfs.test
index 0d02b7b44..45827011b 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/load-java-udfs.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/load-java-udfs.test
@@ -45,6 +45,10 @@ create function identity(string) returns string
 location '$FILESYSTEM_PREFIX/test-warehouse/impala-hive-udfs.jar'
 symbol='org.apache.impala.TestUdf';
 
+create function identity(binary) returns binary
+location '$FILESYSTEM_PREFIX/test-warehouse/impala-hive-udfs.jar'
+symbol='org.apache.impala.TestUdf';
+
 create function identity(string, string) returns string
 location '$FILESYSTEM_PREFIX/test-warehouse/impala-hive-udfs.jar'
 symbol='org.apache.impala.TestUdf';
diff --git a/testdata/workloads/functional-query/queries/QueryTest/misc.test b/testdata/workloads/functional-query/queries/QueryTest/misc.test
index 6e8d40842..4bd6db4d4 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/misc.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/misc.test
@@ -144,24 +144,4 @@ SELECT count(*) from alltypes group by bool_col having bool_col
 3650
 ---- TYPES
 bigint
-====
----- QUERY
-# IMPALA-3812: Verfiy that the correct error message is shown when the star expansion adds
-# the BINARY unsupported type to the select list.
-select * from functional.unsupported_types
----- CATCH
-Unsupported type 'BINARY' in 'functional.unsupported_types.bin_col'.
-====
----- QUERY
-# IMPALA-3812: Verfiy that DATE type is displayed correctly in the describe table.
-describe functional.unsupported_types
----- RESULTS
-'int_col','int',''
-'dec_col','decimal(10,0)',''
-'date_col','date',''
-'str_col','string',''
-'bin_col','binary',''
-'bigint_col','bigint',''
----- TYPES
-STRING, STRING, STRING
-====
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/nested-types-scanner-basic.test b/testdata/workloads/functional-query/queries/QueryTest/nested-types-scanner-basic.test
index 266d90461..03c002d61 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/nested-types-scanner-basic.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/nested-types-scanner-basic.test
@@ -429,3 +429,37 @@ select key, value from pos_item_key_value_complextypestbl.int_map;
 ---- TYPES
 STRING,INT
 ====
+---- QUERY
+# Tests specifically for BINARY type in complex types.
+# BINARY is currently not supported in complex types in select lists
+# due to uncertainty about formatting (IMPALA-11491).
+select binary_member_col.b from binary_in_complex_types
+---- TYPES
+BINARY
+---- RESULTS
+'member'
+====
+---- QUERY
+select a.item from binary_in_complex_types t, t.binary_item_col a
+---- TYPES
+BINARY
+---- RESULTS
+'item1'
+'item2'
+====
+---- QUERY
+select m.key, m.value from binary_in_complex_types t, t.binary_key_col m
+---- TYPES
+BINARY,INT
+---- RESULTS
+'key1',1
+'key2',2
+====
+---- QUERY
+select m.key, m.value from binary_in_complex_types t, t.binary_value_col m
+---- TYPES
+INT,BINARY
+---- RESULTS
+1,'value1'
+2,'value2'
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/udf.test b/testdata/workloads/functional-query/queries/QueryTest/udf.test
index 7a9a31f85..2d6f92148 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/udf.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/udf.test
@@ -57,6 +57,13 @@ string
 'why hello there'
 ====
 ---- QUERY
+select identity(cast("why hello there" as binary));
+---- TYPES
+binary
+---- RESULTS
+'why hello there'
+====
+---- QUERY
 select identity(now());
 ---- TYPES
 timestamp
@@ -123,11 +130,11 @@ timestamp
 ---- QUERY
 # Test UDFs with different arguments
 select all_types_fn("1", true, 2, 3, 4, 5, 6.0, 7.0, cast(8 as decimal(2,0)),
-    date '1970-01-10');
+    date '1970-01-10', cast("binary" as binary));
 ---- TYPES
 int
 ---- RESULTS
-46
+52
 ====
 ---- QUERY
 select no_args();
@@ -218,7 +225,7 @@ false,1,1,1,10,1.100000023841858,10.1,'1',2009-03-01 00:01:00,2009
 select sum(all_types_fn(
     string_col, bool_col, tinyint_col, smallint_col,
     int_col, bigint_col, float_col, double_col, cast(tinyint_col as decimal(2,0)),
-    cast(adddate('1970-01-01', tinyint_col) as date)))
+    cast(adddate('1970-01-01', tinyint_col) as date), cast(string_col as binary)))
 from functional.alltypes;
 ---- TYPES
 bigint
@@ -226,9 +233,9 @@ bigint
 # Verify with 'select sum(length(string_col)) + sum(cast(bool_col as int))
 #  + sum(tinyint_col) + sum(smallint_col) + sum(int_col) + sum(bigint_col)
 #  + sum(cast(float_col as bigint)) + sum(cast(double_col as bigint)) + sum(tinyint_col)
-#  + sum(tinyint_col)
+#  + sum(tinyint_col) + sum(bytes(string_col))
 #  from functional.alltypes;'
-865050
+872350
 ====
 ---- QUERY
 select no_args() from functional.alltypes limit 1;
diff --git a/tests/common/test_result_verifier.py b/tests/common/test_result_verifier.py
index 0a08fc264..69f22e509 100644
--- a/tests/common/test_result_verifier.py
+++ b/tests/common/test_result_verifier.py
@@ -503,7 +503,7 @@ def parse_result_rows(exec_result, escape_strings=True):
     assert len(cols) == len(col_types)
     new_cols = list()
     for i in xrange(len(cols)):
-      if col_types[i] in ['STRING', 'CHAR', 'VARCHAR']:
+      if col_types[i] in ['STRING', 'CHAR', 'VARCHAR', 'BINARY']:
         col = cols[i]
         if isinstance(col, str):
           try:
diff --git a/tests/custom_cluster/test_permanent_udfs.py b/tests/custom_cluster/test_permanent_udfs.py
index 3c6719bf6..94abed452 100644
--- a/tests/custom_cluster/test_permanent_udfs.py
+++ b/tests/custom_cluster/test_permanent_udfs.py
@@ -552,7 +552,7 @@ class TestUdfPersistence(CustomClusterTestSuite):
     drop function if exists {database}.identity(decimal(38,10));
     drop function if exists {database}.all_types_fn(
         string, boolean, tinyint, smallint, int, bigint, float, double, decimal(2,0),
-        date);
+        date, binary);
     drop function if exists {database}.no_args();
     drop function if exists {database}.var_and(boolean...);
     drop function if exists {database}.var_sum(int...);
@@ -602,6 +602,10 @@ class TestUdfPersistence(CustomClusterTestSuite):
     location '{location}'
     symbol='_Z8IdentityPN10impala_udf15FunctionContextERKNS_9StringValE';
 
+    create function {database}.identity(binary) returns binary
+    location '{location}'
+    symbol='_Z8IdentityPN10impala_udf15FunctionContextERKNS_9StringValE';
+
     create function {database}.identity(timestamp) returns timestamp
     location '{location}'
     symbol='_Z8IdentityPN10impala_udf15FunctionContextERKNS_12TimestampValE';
@@ -624,7 +628,7 @@ class TestUdfPersistence(CustomClusterTestSuite):
 
     create function {database}.all_types_fn(
         string, boolean, tinyint, smallint, int, bigint, float, double, decimal(2,0),
-        date)
+        date, binary)
     returns int
     location '{location}' symbol='AllTypes';
 
diff --git a/tests/hs2/test_fetch.py b/tests/hs2/test_fetch.py
index 747adfb0f..7b4fe396c 100644
--- a/tests/hs2/test_fetch.py
+++ b/tests/hs2/test_fetch.py
@@ -47,21 +47,27 @@ class TestFetch(HS2TestSuite):
     assert p.i32Value == precision
     assert s.i32Value == scale
 
+  def __fetch_result_column_types(self, query, expected_row_count, execute_statement_req):
+    """ Fetches the results for 'query' and return the response and the
+    array of column types."""
+    execute_statement_req.statement = query
+    execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req)
+    HS2TestSuite.check_response(execute_statement_resp)
+    results = self.fetch_at_most(execute_statement_resp.operationHandle,
+                               TCLIService.TFetchOrientation.FETCH_NEXT, 1, 1)
+    assert len(results.results.rows) == expected_row_count
+    metadata_resp = self.result_metadata(execute_statement_resp.operationHandle)
+    return execute_statement_resp, metadata_resp.schema.columns
+
   @needs_session(TCLIService.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
   def test_result_metadata_v1(self):
     execute_statement_req = TCLIService.TExecuteStatementReq()
     execute_statement_req.sessionHandle = self.session_handle
 
     # Verify all primitive types in the alltypes table.
-    execute_statement_req.statement =\
-        "SELECT * FROM functional.alltypessmall ORDER BY id LIMIT 1"
-    execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req)
-    HS2TestSuite.check_response(execute_statement_resp)
-    results = self.fetch_at_most(execute_statement_resp.operationHandle,
-                                 TCLIService.TFetchOrientation.FETCH_NEXT, 1, 1)
-    assert len(results.results.rows) == 1
-    metadata_resp = self.result_metadata(execute_statement_resp.operationHandle)
-    column_types = metadata_resp.schema.columns
+    execute_statement_resp, column_types = self.__fetch_result_column_types(
+        "SELECT * FROM functional.alltypessmall ORDER BY id LIMIT 1", 1,
+        execute_statement_req)
     assert len(column_types) == 13
     self.__verify_primitive_type(TTypeId.INT_TYPE, column_types[0])
     self.__verify_primitive_type(TTypeId.BOOLEAN_TYPE, column_types[1])
@@ -79,17 +85,9 @@ class TestFetch(HS2TestSuite):
     self.close(execute_statement_resp.operationHandle)
 
     # Verify the result metadata for the DECIMAL type.
-    execute_statement_req.statement =\
-        "SELECT d1,d5 FROM functional.decimal_tbl ORDER BY d1 LIMIT 1"
-    execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req)
-    HS2TestSuite.check_response(execute_statement_resp)
-    results = self.fetch_at_most(execute_statement_resp.operationHandle,
-                                 TCLIService.TFetchOrientation.FETCH_NEXT, 1, 1)
-    assert len(results.results.rows) == 1
-    # Verify the result schema is what we expect. The result has 2 columns, the
-    # first is decimal(9,0) and the second is decimal(10,5)
-    metadata_resp = self.result_metadata(execute_statement_resp.operationHandle)
-    column_types = metadata_resp.schema.columns
+    execute_statement_resp, column_types = self.__fetch_result_column_types(
+        "SELECT d1,d5 FROM functional.decimal_tbl ORDER BY d1 LIMIT 1", 1,
+        execute_statement_req)
     assert len(column_types) == 2
     self.__verify_primitive_type(TTypeId.DECIMAL_TYPE, column_types[0])
     self.__verify_decimal_precision_scale(column_types[0], 9, 0)
@@ -98,15 +96,9 @@ class TestFetch(HS2TestSuite):
     self.close(execute_statement_resp.operationHandle)
 
     # Verify the result metadata for the CHAR/VARCHAR types.
-    execute_statement_req.statement =\
-        "SELECT * FROM functional.chars_tiny ORDER BY cs LIMIT 1"
-    execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req)
-    HS2TestSuite.check_response(execute_statement_resp)
-    results = self.fetch_at_most(execute_statement_resp.operationHandle,
-                                 TCLIService.TFetchOrientation.FETCH_NEXT, 1, 1)
-    assert len(results.results.rows) == 1
-    metadata_resp = self.result_metadata(execute_statement_resp.operationHandle)
-    column_types = metadata_resp.schema.columns
+    execute_statement_resp, column_types = self.__fetch_result_column_types(
+        "SELECT * FROM functional.chars_tiny ORDER BY cs LIMIT 1", 1,
+        execute_statement_req)
     assert len(column_types) == 3
     self.__verify_primitive_type(TTypeId.CHAR_TYPE, column_types[0])
     self.__verify_char_max_len(column_types[0], 5)
@@ -117,21 +109,25 @@ class TestFetch(HS2TestSuite):
     self.close(execute_statement_resp.operationHandle)
 
     # Verify the result metadata for the DATE type.
-    execute_statement_req.statement =\
-        "SELECT * FROM functional.date_tbl ORDER BY date_col LIMIT 1"
-    execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req)
-    HS2TestSuite.check_response(execute_statement_resp)
-    results = self.fetch_at_most(execute_statement_resp.operationHandle,
-                                 TCLIService.TFetchOrientation.FETCH_NEXT, 1, 1)
-    assert len(results.results.rows) == 1
-    metadata_resp = self.result_metadata(execute_statement_resp.operationHandle)
-    column_types = metadata_resp.schema.columns
+    execute_statement_resp, column_types = self.__fetch_result_column_types(
+        "SELECT * FROM functional.date_tbl ORDER BY date_col LIMIT 1", 1,
+        execute_statement_req)
     assert len(column_types) == 3
     self.__verify_primitive_type(TTypeId.INT_TYPE, column_types[0])
     self.__verify_primitive_type(TTypeId.DATE_TYPE, column_types[1])
     self.__verify_primitive_type(TTypeId.DATE_TYPE, column_types[2])
     self.close(execute_statement_resp.operationHandle)
 
+    # Verify the result metadata for the BINARY type.
+    execute_statement_resp, column_types = self.__fetch_result_column_types(
+        "SELECT * from functional.binary_tbl ORDER BY binary_col LIMIT 1", 1,
+        execute_statement_req)
+    assert len(column_types) == 3
+    self.__verify_primitive_type(TTypeId.INT_TYPE, column_types[0])
+    self.__verify_primitive_type(TTypeId.STRING_TYPE, column_types[1])
+    self.__verify_primitive_type(TTypeId.BINARY_TYPE, column_types[2])
+    self.close(execute_statement_resp.operationHandle)
+
   def __query_and_fetch(self, query):
     execute_statement_req = TCLIService.TExecuteStatementReq()
     execute_statement_req.sessionHandle = self.session_handle
@@ -186,6 +182,12 @@ class TestFetch(HS2TestSuite):
     num_rows, result = self.column_results_to_string(fetch_results_resp.results.columns)
     assert result == ("0, 0001-01-01, 0001-01-01\n")
 
+    # Binary
+    fetch_results_resp = self.__query_and_fetch(
+      "SELECT * from functional.binary_tbl ORDER BY id LIMIT 1")
+    num_rows, result = self.column_results_to_string(fetch_results_resp.results.columns)
+    assert result == ("1, ascii, binary1\n")
+
   @needs_session()
   def test_show_partitions(self):
     """Regression test for IMPALA-1330"""
diff --git a/tests/hs2/test_hs2.py b/tests/hs2/test_hs2.py
index 9adcfd2f2..b7f71a19c 100644
--- a/tests/hs2/test_hs2.py
+++ b/tests/hs2/test_hs2.py
@@ -980,7 +980,7 @@ class TestHS2(HS2TestSuite):
     results = fetch_results_resp.results
     types = ['BOOLEAN', 'TINYINT', 'SMALLINT', 'INT', 'BIGINT', 'FLOAT', 'DOUBLE', 'DATE',
              'TIMESTAMP', 'STRING', 'VARCHAR', 'DECIMAL', 'CHAR', 'ARRAY', 'MAP',
-             'STRUCT']
+             'STRUCT', 'BINARY']
     assert self.get_num_rows(results) == len(types)
     # Validate that each type description (result row) has the required 18 fields as
     # described in the DatabaseMetaData.getTypeInfo() documentation.
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index bce5a9373..31c927e29 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -1818,3 +1818,19 @@ class TestErasureCoding(ImpalaTestSuite):
   @SkipIf.not_ec
   def test_erasure_coding(self, vector):
     self.run_test_case('QueryTest/hdfs-erasure-coding', vector)
+
+
+class TestBinaryType(ImpalaTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestBinaryType, cls).add_test_dimensions()
+    # todo: IMPALA-5323: Support Kudu BINARY
+    cls.ImpalaTestMatrix.add_constraint(
+      lambda v: v.get_value('table_format').file_format != 'kudu')
+
+  def test_binary_type(self, vector):
+    self.run_test_case('QueryTest/binary-type', vector)
diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py
index 1e0b38443..7f899a4f8 100644
--- a/tests/query_test/test_udfs.py
+++ b/tests/query_test/test_udfs.py
@@ -113,6 +113,11 @@ returns decimal(20,0) intermediate string location '{location}'
 init_fn='AggStringIntermediateInit' update_fn='AggStringIntermediateUpdate'
 merge_fn='AggStringIntermediateMerge' finalize_fn='AggStringIntermediateFinalize';
 
+create aggregate function {database}.agg_binary_intermediate(decimal(20,10), bigint, binary)
+returns decimal(20,0) intermediate binary location '{location}'
+init_fn='AggStringIntermediateInit' update_fn='AggStringIntermediateUpdate'
+merge_fn='AggStringIntermediateMerge' finalize_fn='AggStringIntermediateFinalize';
+
 create aggregate function {database}.char_intermediate_sum(int) returns int
 intermediate char(10) LOCATION '{location}' update_fn='AggCharIntermediateUpdate'
 init_fn='AggCharIntermediateInit' merge_fn='AggCharIntermediateMerge'
@@ -146,6 +151,10 @@ create function {database}.identity(string) returns string
 location '{location}'
 symbol='_Z8IdentityPN10impala_udf15FunctionContextERKNS_9StringValE';
 
+create function {database}.identity(binary) returns binary
+location '{location}'
+symbol='_Z8IdentityPN10impala_udf15FunctionContextERKNS_9StringValE';
+
 create function {database}.identity(timestamp) returns timestamp
 location '{location}'
 symbol='_Z8IdentityPN10impala_udf15FunctionContextERKNS_12TimestampValE';
@@ -167,7 +176,8 @@ location '{location}'
 symbol='_Z8IdentityPN10impala_udf15FunctionContextERKNS_10DecimalValE';
 
 create function {database}.all_types_fn(
-    string, boolean, tinyint, smallint, int, bigint, float, double, decimal(2,0), date)
+    string, boolean, tinyint, smallint, int, bigint, float, double, decimal(2,0), date,
+    binary)
 returns int
 location '{location}' symbol='AllTypes';