You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2019/07/26 05:03:12 UTC

[hive] branch master updated: HIVE-12971: Hive Support for Kudu (Grant Henke, reviewed by Jesus Camacho Rodriguez)

This is an automated email from the ASF dual-hosted git repository.

jcamacho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new f7f7c58  HIVE-12971: Hive Support for Kudu (Grant Henke, reviewed by Jesus Camacho Rodriguez)
f7f7c58 is described below

commit f7f7c586d7c8a8142223ae95c8e3add0873c2c86
Author: Grant Henke <gr...@apache.org>
AuthorDate: Thu Jul 25 21:55:50 2019 -0700

    HIVE-12971: Hive Support for Kudu (Grant Henke, reviewed by Jesus Camacho Rodriguez)
    
    Close apache/hive#733
    
    Signed-off-by: Jesus Camacho Rodriguez <jc...@apache.org>
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |    5 +
 itests/pom.xml                                     |    1 +
 itests/qtest-kudu/pom.xml                          |  393 ++++++
 .../apache/hadoop/hive/cli/TestKuduCliDriver.java  |   67 +
 .../hadoop/hive/cli/TestKuduNegativeCliDriver.java |   67 +
 .../org/apache/hadoop/hive/cli/package-info.java   |   22 +
 itests/util/pom.xml                                |   16 +
 .../apache/hadoop/hive/cli/control/CliConfigs.java |   46 +
 .../hadoop/hive/cli/control/CoreKuduCliDriver.java |  138 ++
 .../cli/control/CoreKuduNegativeCliDriver.java     |  135 ++
 .../org/apache/hadoop/hive/kudu/KuduTestSetup.java |  150 +++
 .../org/apache/hadoop/hive/kudu/package-info.java  |   22 +
 .../apache/hadoop/hive/ql/QTestMiniClusters.java   |    5 +-
 kudu-handler/pom.xml                               |  155 +++
 .../org/apache/hadoop/hive/kudu/KuduHiveUtils.java |  147 +++
 .../apache/hadoop/hive/kudu/KuduInputFormat.java   |  302 +++++
 .../apache/hadoop/hive/kudu/KuduOutputFormat.java  |  203 +++
 .../hadoop/hive/kudu/KuduPredicateHandler.java     |  267 ++++
 .../org/apache/hadoop/hive/kudu/KuduSerDe.java     |  285 ++++
 .../hadoop/hive/kudu/KuduStorageHandler.java       |  216 ++++
 .../org/apache/hadoop/hive/kudu/KuduWritable.java  |  113 ++
 .../org/apache/hadoop/hive/kudu/package-info.java  |   22 +
 .../org/apache/hadoop/hive/kudu/KuduTestUtils.java |   56 +
 .../hadoop/hive/kudu/TestKuduInputFormat.java      |  336 +++++
 .../hadoop/hive/kudu/TestKuduOutputFormat.java     |  191 +++
 .../hadoop/hive/kudu/TestKuduPredicateHandler.java |  476 +++++++
 .../org/apache/hadoop/hive/kudu/TestKuduSerDe.java |  188 +++
 .../org/apache/hadoop/hive/kudu/package-info.java  |   22 +
 .../src/test/queries/negative/kudu_config.q        |    6 +
 .../test/queries/positive/kudu_complex_queries.q   |   45 +
 .../src/test/queries/positive/kudu_queries.q       |  177 +++
 .../src/test/results/negative/kudu_config.q.out    |    7 +
 .../results/positive/kudu_complex_queries.q.out    |  366 ++++++
 .../src/test/results/positive/kudu_queries.q.out   | 1358 ++++++++++++++++++++
 .../llap/cli/service/AsyncTaskCopyAuxJars.java     |    3 +-
 pom.xml                                            |    2 +
 36 files changed, 6008 insertions(+), 2 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a9fcf37..3088f99 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2904,6 +2904,11 @@ public class HiveConf extends Configuration {
     HIVE_HBASE_SNAPSHOT_RESTORE_DIR("hive.hbase.snapshot.restoredir", "/tmp", "The directory in which to " +
         "restore the HBase table snapshot."),
 
+    // For Kudu storage handler
+    HIVE_KUDU_MASTER_ADDRESSES_DEFAULT("hive.kudu.master.addresses.default", "localhost:7050",
+        "Comma-separated list of all of the Kudu master addresses.\n" +
+            "This value is only used for a given table if the kudu.master_addresses table property is not set."),
+
     // For har files
     HIVEARCHIVEENABLED("hive.archive.enabled", false, "Whether archiving operations are permitted"),
 
diff --git a/itests/pom.xml b/itests/pom.xml
index 345e922..9c20ad6 100644
--- a/itests/pom.xml
+++ b/itests/pom.xml
@@ -47,6 +47,7 @@
    <module>hive-unit-hadoop2</module>
    <module>hive-minikdc</module>
    <module>qtest-druid</module>
+   <module>qtest-kudu</module>
   </modules>
 
   <profiles>
diff --git a/itests/qtest-kudu/pom.xml b/itests/qtest-kudu/pom.xml
new file mode 100644
index 0000000..766db3a
--- /dev/null
+++ b/itests/qtest-kudu/pom.xml
@@ -0,0 +1,393 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.hive</groupId>
+    <artifactId>hive-it</artifactId>
+    <version>4.0.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>hive-it-qfile-kudu</artifactId>
+  <packaging>jar</packaging>
+  <name>Hive Integration - QFile Kudu Tests</name>
+
+  <properties>
+    <hive.path.to.root>../..</hive.path.to.root>
+    <exclude.tests>None</exclude.tests>
+  </properties>
+
+  <dependencies>
+    <!-- dependencies are always listed in sorted order by groupId, artifactId -->
+    <!-- test intra-project -->
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-common</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-contrib</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>hive-exec</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-standalone-metastore-common</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-standalone-metastore-common</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-standalone-metastore-server</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-standalone-metastore-server</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-it-custom-serde</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-it-util</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>hive-exec</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>hive-it-druid</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-serde</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-exec</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <classifier>core</classifier>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-exec</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <classifier>tests</classifier>
+    </dependency>
+    <!-- inter-project -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <!-- Declare hive-exec dependencies that were shaded in instead of
+       being listed as dependencies -->
+    <dependency>
+      <groupId>com.esotericsoftware</groupId>
+      <artifactId>kryo</artifactId>
+      <version>${kryo.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>${commons-lang3.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>javolution</groupId>
+      <artifactId>javolution</artifactId>
+      <version>${javolution.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-servlet</artifactId>
+      <version>${jersey.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-archives</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+        <exclusions>
+             <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+      </exclusions>
+   </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+         <exclusions>
+             <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+      </exclusions>
+  </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <version>${hadoop.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <version>${hadoop.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+        <exclusions>
+             <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+      </exclusions>
+   </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-hs</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+        <exclusions>
+             <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+      </exclusions>
+   </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+      <classifier>tests</classifier>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kudu</groupId>
+      <artifactId>kudu-test-utils</artifactId>
+      <version>${kudu.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-tests</artifactId>
+      <version>${tez.version}</version>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-api</artifactId>
+      <version>${tez.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-runtime-library</artifactId>
+      <version>${tez.version}</version>
+      <scope>test</scope>
+        <exclusions>
+             <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+      </exclusions>
+   </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-mapreduce</artifactId>
+      <version>${tez.version}</version>
+      <scope>test</scope>
+        <exclusions>
+             <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+      </exclusions>
+   </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+      <version>${tez.version}</version>
+      <scope>test</scope>
+        <exclusions>
+             <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+      </exclusions>
+   </dependency>
+  </dependencies>
+  <!-- Set the os.detected.classifier property based on the Maven detected OS
+       because Hive's version of Maven doesn't support the os-maven-plugin.  -->
+  <profiles>
+    <profile>
+      <id>kudu-linux</id>
+      <activation>
+        <os>
+          <family>Unix</family>
+        </os>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.kudu</groupId>
+          <artifactId>kudu-binary</artifactId>
+          <version>${kudu.version}</version>
+          <classifier>linux-x86_64</classifier>
+          <scope>test</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
+      <id>kudu-mac</id>
+      <activation>
+        <os>
+          <family>Mac</family>
+        </os>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.kudu</groupId>
+          <artifactId>kudu-binary</artifactId>
+          <version>${kudu.version}</version>
+          <classifier>osx-x86_64</classifier>
+          <scope>test</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
+      <id>kudu-windows</id>
+      <activation>
+        <os>
+          <family>Windows</family>
+        </os>
+      </activation>
+      <properties>
+        <!-- Kudu tests do not support Windows. -->
+        <exclude.tests>**/*.java</exclude.tests>
+      </properties>
+    </profile>
+  </profiles>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <excludes>
+            <exclude>${exclude.tests}</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/itests/qtest-kudu/src/test/java/org/apache/hadoop/hive/cli/TestKuduCliDriver.java b/itests/qtest-kudu/src/test/java/org/apache/hadoop/hive/cli/TestKuduCliDriver.java
new file mode 100644
index 0000000..e16e12a
--- /dev/null
+++ b/itests/qtest-kudu/src/test/java/org/apache/hadoop/hive/cli/TestKuduCliDriver.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.cli;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.hadoop.hive.cli.control.CliAdapter;
+import org.apache.hadoop.hive.cli.control.CliConfigs;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * CliDriver that runs the Kudu Qtests.
+ */
+@RunWith(Parameterized.class)
+public class TestKuduCliDriver {
+
+  static CliAdapter adapter = new CliConfigs.KuduCliConfig().getCliAdapter();
+
+  @Parameters(name = "{0}")
+  public static List<Object[]> getParameters() throws Exception {
+    return adapter.getParameters();
+  }
+
+  // @formatter:off
+  @ClassRule
+  public static TestRule cliClassRule = adapter.buildClassRule();
+  // @formatter:on
+
+  @Rule
+  public TestRule cliTestRule = adapter.buildTestRule();
+
+  private String name;
+  private File qfile;
+
+  public TestKuduCliDriver(String name, File qfile) {
+    this.name = name;
+    this.qfile = qfile;
+  }
+
+  @Test
+  public void testCliDriver() throws Exception {
+    adapter.runTest(name, qfile);
+  }
+
+}
diff --git a/itests/qtest-kudu/src/test/java/org/apache/hadoop/hive/cli/TestKuduNegativeCliDriver.java b/itests/qtest-kudu/src/test/java/org/apache/hadoop/hive/cli/TestKuduNegativeCliDriver.java
new file mode 100644
index 0000000..5d0eaef
--- /dev/null
+++ b/itests/qtest-kudu/src/test/java/org/apache/hadoop/hive/cli/TestKuduNegativeCliDriver.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.cli;
+
+import org.apache.hadoop.hive.cli.control.CliAdapter;
+import org.apache.hadoop.hive.cli.control.CliConfigs;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.File;
+import java.util.List;
+
+/**
+ * CliDriver that runs the negative Kudu Qtests.
+ */
+@RunWith(Parameterized.class)
+public class TestKuduNegativeCliDriver {
+
+  static CliAdapter adapter = new CliConfigs.KuduNegativeCliConfig().getCliAdapter();
+
+  @Parameters(name = "{0}")
+  public static List<Object[]> getParameters() throws Exception {
+    return adapter.getParameters();
+  }
+
+  // @formatter:off
+  @ClassRule
+  public static TestRule cliClassRule = adapter.buildClassRule();
+  // @formatter:on
+
+  @Rule
+  public TestRule cliTestRule = adapter.buildTestRule();
+
+  private String name;
+  private File qfile;
+
+  public TestKuduNegativeCliDriver(String name, File qfile) {
+    this.name = name;
+    this.qfile = qfile;
+  }
+
+  @Test
+  public void testCliDriver() throws Exception {
+    adapter.runTest(name, qfile);
+  }
+
+}
diff --git a/itests/qtest-kudu/src/test/java/org/apache/hadoop/hive/cli/package-info.java b/itests/qtest-kudu/src/test/java/org/apache/hadoop/hive/cli/package-info.java
new file mode 100644
index 0000000..5d8616e
--- /dev/null
+++ b/itests/qtest-kudu/src/test/java/org/apache/hadoop/hive/cli/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package for the Hive QTest CLI classes.
+ */
+package org.apache.hadoop.hive.cli;
diff --git a/itests/util/pom.xml b/itests/util/pom.xml
index 607fd47..bc14d18 100644
--- a/itests/util/pom.xml
+++ b/itests/util/pom.xml
@@ -91,6 +91,17 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hive</groupId>
+      <artifactId>hive-kudu-handler</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-kudu-handler</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
       <artifactId>hive-metastore</artifactId>
       <version>${project.version}</version>
       <exclusions>
@@ -183,6 +194,11 @@
       <version>${hbase.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.kudu</groupId>
+      <artifactId>kudu-test-utils</artifactId>
+      <version>${kudu.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.tez</groupId>
       <artifactId>tez-api</artifactId>
       <version>${tez.version}</version>
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
index 5c17e1a..78908ae 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
@@ -778,4 +778,50 @@ public class CliConfigs {
     }
   }
 
+  /**
+   * The CliConfig implementation for Kudu.
+   */
+  public static class KuduCliConfig extends AbstractCliConfig {
+    public KuduCliConfig() {
+      super(CoreKuduCliDriver.class);
+      try {
+        setQueryDir("kudu-handler/src/test/queries/positive");
+
+        setResultsDir("kudu-handler/src/test/results/positive");
+        setLogDir("itests/qtest/target/qfile-results/kudu/positive");
+
+        setInitScript("q_test_init_src.sql");
+        setCleanupScript("q_test_cleanup_src.sql");
+
+        setHiveConfDir("data/conf/llap");
+        setClusterType(MiniClusterType.TEZ_LOCAL);
+        setMetastoreType(MetastoreType.sql);
+        setFsType(QTestMiniClusters.FsType.LOCAL);
+      } catch (Exception e) {
+        throw new RuntimeException("can't construct cliconfig", e);
+      }
+    }
+  }
+
+  public static class KuduNegativeCliConfig extends AbstractCliConfig {
+    public KuduNegativeCliConfig() {
+      super(CoreKuduNegativeCliDriver.class);
+      try {
+        setQueryDir("kudu-handler/src/test/queries/negative");
+
+        setResultsDir("kudu-handler/src/test/results/negative");
+        setLogDir("itests/qtest/target/qfile-results/kudu/negative");
+
+        setInitScript("q_test_init_src.sql");
+        setCleanupScript("q_test_cleanup_src.sql");
+
+        setHiveConfDir("data/conf/llap");
+        setClusterType(MiniClusterType.TEZ_LOCAL);
+        setMetastoreType(MetastoreType.sql);
+        setFsType(QTestMiniClusters.FsType.LOCAL);
+      } catch (Exception e) {
+        throw new RuntimeException("can't construct cliconfig", e);
+      }
+    }
+  }
 }
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreKuduCliDriver.java b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreKuduCliDriver.java
new file mode 100644
index 0000000..c54410c
--- /dev/null
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreKuduCliDriver.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.cli.control;
+
+import org.apache.hadoop.hive.kudu.KuduTestSetup;
+import org.apache.hadoop.hive.ql.QTestArguments;
+import org.apache.hadoop.hive.ql.QTestProcessExecResult;
+import org.apache.hadoop.hive.ql.QTestUtil;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import java.io.File;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * The Kudu CliAdapter implementation.
+ */
+public class CoreKuduCliDriver extends CliAdapter {
+
+  private QTestUtil qt;
+
+  public CoreKuduCliDriver(AbstractCliConfig cliConfig) {
+    super(cliConfig);
+  }
+
+  @Override
+  @BeforeClass
+  public void beforeClass() {
+    try {
+      qt = new QTestUtil(QTestArguments.QTestArgumentsBuilder.instance()
+          .withOutDir(cliConfig.getResultsDir())
+          .withLogDir(cliConfig.getLogDir())
+          .withClusterType(cliConfig.getClusterType())
+          .withConfDir(cliConfig.getHiveConfDir())
+          .withInitScript(cliConfig.getInitScript())
+          .withCleanupScript(cliConfig.getCleanupScript())
+          .withLlapIo(true)
+          .withQTestSetup(new KuduTestSetup())
+          .build());
+
+      // do a one time initialization
+      qt.newSession();
+      qt.cleanUp();
+      qt.createSources();
+    } catch (Exception e) {
+      throw new RuntimeException("Unexpected exception in setUp", e);
+    }
+  }
+
+  @Override
+  @AfterClass
+  public void shutdown() {
+    try {
+      qt.shutdown();
+    } catch (Exception e) {
+      throw new RuntimeException("Unexpected exception in tearDown", e);
+    }
+  }
+
+  @Override
+  @Before
+  public void setUp() {
+    try {
+      qt.newSession();
+
+    } catch (Exception e) {
+      System.err.println("Exception: " + e.getMessage());
+      e.printStackTrace();
+      System.err.flush();
+      fail("Unexpected exception in setup");
+    }
+  }
+
+  @Override
+  @After
+  public void tearDown() {
+    try {
+      qt.clearPostTestEffects();
+      qt.clearTestSideEffects();
+
+    } catch (Exception e) {
+      System.err.println("Exception: " + e.getMessage());
+      e.printStackTrace();
+      System.err.flush();
+      fail("Unexpected exception in tearDown");
+    }
+  }
+
+  @Override
+  public void runTest(String tname, String fname, String fpath) {
+    long startTime = System.currentTimeMillis();
+    try {
+      System.err.println("Begin query: " + fname);
+
+      qt.addFile(fpath);
+      qt.cliInit(new File(fpath));
+
+      CommandProcessorResponse response = qt.executeClient(fname);
+      if (response.getResponseCode() != 0) {
+        qt.failedQuery(response.getException(), response.getResponseCode(), fname, null);
+      }
+
+      QTestProcessExecResult result = qt.checkCliDriverResults(fname);
+      if (result.getReturnCode() != 0) {
+        qt.failedDiff(result.getReturnCode(), fname, result.getCapturedOutput());
+      }
+      qt.clearPostTestEffects();
+
+    } catch (Exception e) {
+      qt.failedWithException(e, fname, null);
+    }
+
+    long elapsedTime = System.currentTimeMillis() - startTime;
+    System.err.println("Done query: " + fname + " elapsedTime=" + elapsedTime/1000 + "s");
+    assertTrue("Test passed", true);
+  }
+}
+
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreKuduNegativeCliDriver.java b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreKuduNegativeCliDriver.java
new file mode 100644
index 0000000..3a7905e
--- /dev/null
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreKuduNegativeCliDriver.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.cli.control;
+
+import org.apache.hadoop.hive.kudu.KuduTestSetup;
+import org.apache.hadoop.hive.ql.QTestArguments;
+import org.apache.hadoop.hive.ql.QTestProcessExecResult;
+import org.apache.hadoop.hive.ql.QTestUtil;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import java.io.File;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * The Kudu negative CliAdapter implementation.
+ */
+public class CoreKuduNegativeCliDriver extends CliAdapter {
+
+  private QTestUtil qt;
+
+  public CoreKuduNegativeCliDriver(AbstractCliConfig cliConfig) {
+    super(cliConfig);
+  }
+
+  @Override
+  @BeforeClass
+  public void beforeClass() {
+    try {
+      qt = new QTestUtil(QTestArguments.QTestArgumentsBuilder.instance()
+          .withOutDir(cliConfig.getResultsDir())
+          .withLogDir(cliConfig.getLogDir())
+          .withClusterType(cliConfig.getClusterType())
+          .withConfDir(cliConfig.getHiveConfDir())
+          .withInitScript(cliConfig.getInitScript())
+          .withCleanupScript(cliConfig.getCleanupScript())
+          .withLlapIo(true)
+          .withQTestSetup(new KuduTestSetup())
+          .build());
+
+      // do a one time initialization
+      qt.newSession();
+      qt.cleanUp();
+      qt.createSources();
+    } catch (Exception e) {
+      throw new RuntimeException("Unexpected exception in setUp", e);
+    }
+  }
+
+  @Override
+  @AfterClass
+  public void shutdown() {
+    try {
+      qt.shutdown();
+    } catch (Exception e) {
+      throw new RuntimeException("Unexpected exception in tearDown", e);
+    }
+  }
+
+  @Override
+  @Before
+  public void setUp() {
+    try {
+      qt.newSession();
+
+    } catch (Exception e) {
+      System.err.println("Exception: " + e.getMessage());
+      e.printStackTrace();
+      System.err.flush();
+      fail("Unexpected exception in setup");
+    }
+  }
+
+  @Override
+  @After
+  public void tearDown() {
+    try {
+      qt.clearPostTestEffects();
+      qt.clearTestSideEffects();
+
+    } catch (Exception e) {
+      System.err.println("Exception: " + e.getMessage());
+      e.printStackTrace();
+      System.err.flush();
+      fail("Unexpected exception in tearDown");
+    }
+  }
+
+  @Override
+  public void runTest(String tname, String fname, String fpath) {
+    long startTime = System.currentTimeMillis();
+    try {
+      System.err.println("Begin query: " + fname);
+      qt.addFile(fpath);
+      qt.cliInit(new File(fpath));
+      int ecode = qt.executeClient(fname).getResponseCode();
+      if (ecode == 0) {
+        qt.failed(fname, null);
+      }
+
+      QTestProcessExecResult result = qt.checkCliDriverResults(fname);
+      if (result.getReturnCode() != 0) {
+        qt.failedDiff(result.getReturnCode(), fname, result.getCapturedOutput());
+      }
+
+    } catch (Exception e) {
+      qt.failedWithException(e, fname, null);
+    }
+
+    long elapsedTime = System.currentTimeMillis() - startTime;
+    System.err.println("Done query: " + fname + " elapsedTime=" + elapsedTime / 1000 + "s");
+    assertTrue("Test passed", true);
+  }
+}
+
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/kudu/KuduTestSetup.java b/itests/util/src/main/java/org/apache/hadoop/hive/kudu/KuduTestSetup.java
new file mode 100644
index 0000000..2a0f04c
--- /dev/null
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/kudu/KuduTestSetup.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.kudu;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.QTestMiniClusters;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.shaded.com.google.common.collect.ImmutableList;
+import org.apache.kudu.test.cluster.MiniKuduCluster;
+
+import java.io.File;
+import java.util.Arrays;
+
+/**
+ * Start and stop a Kudu MiniCluster for testing purposes.
+ */
+public class KuduTestSetup extends QTestMiniClusters.QTestSetup {
+
+  public static final String KV_TABLE_NAME = "default.kudu_kv";
+  public static final String ALL_TYPES_TABLE_NAME = "default.kudu_all_types";
+
+  public static final Schema ALL_TYPES_SCHEMA = KuduTestUtils.getAllTypesSchema();
+  public static final Schema KV_SCHEMA = new Schema(Arrays.asList(
+      new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build(),
+        new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build())
+  );
+
+  private MiniKuduCluster miniCluster;
+
+  public KuduTestSetup() {
+  }
+
+  @Override
+  public void preTest(HiveConf conf) throws Exception {
+    super.preTest(conf);
+    setupWithHiveConf(conf);
+    createKuduTables();
+  }
+
+  @Override
+  public void postTest(HiveConf conf) throws Exception {
+    dropKuduTables();
+    super.postTest(conf);
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    if (null != miniCluster) {
+      miniCluster.shutdown();
+      miniCluster = null;
+    }
+    super.tearDown();
+  }
+
+  private void setupWithHiveConf(HiveConf conf) throws Exception {
+    if (null == miniCluster) {
+      String testTmpDir = System.getProperty("test.tmp.dir");
+      File tmpDir = new File(testTmpDir, "kudu");
+
+      if (tmpDir.exists()) {
+        FileUtils.deleteDirectory(tmpDir);
+      }
+
+      miniCluster = new MiniKuduCluster.MiniKuduClusterBuilder()
+          .numMasterServers(3)
+          .numTabletServers(3)
+          .build();
+    }
+
+    updateConf(conf);
+  }
+
+  /**
+   * Update hiveConf with the Kudu specific parameters.
+   * @param conf The hiveconf to update
+   */
+  private void updateConf(HiveConf conf) {
+    if (miniCluster != null) {
+      HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_KUDU_MASTER_ADDRESSES_DEFAULT,
+          miniCluster.getMasterAddressesAsString());
+    }
+  }
+
+  private void createKuduTables() throws KuduException {
+    if (null != miniCluster) {
+      String masterAddresses = miniCluster.getMasterAddressesAsString();
+      try (KuduClient client = new KuduClient.KuduClientBuilder(masterAddresses).build()) {
+        createKVTable(client);
+        createAllTypesTable(client);
+      }
+    }
+  }
+
+  private void dropKuduTables() throws KuduException  {
+    if (null != miniCluster) {
+      String masterAddresses = miniCluster.getMasterAddressesAsString();
+      try (KuduClient client = new KuduClient.KuduClientBuilder(masterAddresses).build()) {
+        dropKVTable(client);
+        dropAllTypesTable(client);
+      }
+    }
+  }
+
+  public void createKVTable(KuduClient client) throws KuduException {
+    dropKVTable(client);
+    CreateTableOptions options = new CreateTableOptions()
+        .setRangePartitionColumns(ImmutableList.of("key"));
+    client.createTable(KV_TABLE_NAME, KV_SCHEMA, options);
+  }
+
+  public void dropKVTable(KuduClient client ) throws KuduException {
+    if (client.tableExists(KV_TABLE_NAME)) {
+      client.deleteTable(KV_TABLE_NAME);
+    }
+  }
+
+  public void createAllTypesTable(KuduClient client) throws KuduException {
+    dropAllTypesTable(client);
+    CreateTableOptions options = new CreateTableOptions()
+        .addHashPartitions(Arrays.asList("key"), 4);
+    client.createTable(ALL_TYPES_TABLE_NAME, ALL_TYPES_SCHEMA, options);
+  }
+
+  public void dropAllTypesTable(KuduClient client ) throws KuduException {
+    if (client.tableExists(ALL_TYPES_TABLE_NAME)) {
+      client.deleteTable(ALL_TYPES_TABLE_NAME);
+    }
+  }
+}
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/kudu/package-info.java b/itests/util/src/main/java/org/apache/hadoop/hive/kudu/package-info.java
new file mode 100644
index 0000000..6022130
--- /dev/null
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/kudu/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package for the Kudu QTest classes.
+ */
+package org.apache.hadoop.hive.kudu;
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestMiniClusters.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestMiniClusters.java
index bd4c76e..f8b7e29 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestMiniClusters.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestMiniClusters.java
@@ -120,7 +120,8 @@ public class QTestMiniClusters {
     DRUID_LOCAL(CoreClusterType.TEZ, FsType.LOCAL),
     DRUID(CoreClusterType.TEZ, FsType.HDFS),
     DRUID_KAFKA(CoreClusterType.TEZ, FsType.HDFS),
-    KAFKA(CoreClusterType.TEZ, FsType.HDFS);
+    KAFKA(CoreClusterType.TEZ, FsType.HDFS),
+    KUDU(CoreClusterType.TEZ, FsType.LOCAL);
 
     private final CoreClusterType coreClusterType;
     private final FsType defaultFsType;
@@ -162,6 +163,8 @@ public class QTestMiniClusters {
         return DRUID_KAFKA;
       } else if (type.equals("kafka")) {
         return KAFKA;
+      } else if (type.equals("kudu")) {
+        return KUDU;
       } else {
         throw new RuntimeException(String.format("cannot recognize MiniClusterType from '%s'", type));
       }
diff --git a/kudu-handler/pom.xml b/kudu-handler/pom.xml
new file mode 100644
index 0000000..4082840
--- /dev/null
+++ b/kudu-handler/pom.xml
@@ -0,0 +1,155 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hive</groupId>
+    <artifactId>hive</artifactId>
+    <version>4.0.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>hive-kudu-handler</artifactId>
+  <packaging>jar</packaging>
+  <name>Hive Kudu Handler</name>
+
+  <properties>
+    <hive.path.to.root>..</hive.path.to.root>
+    <exclude.tests>None</exclude.tests>
+  </properties>
+
+  <dependencies>
+    <!-- dependencies are always listed in sorted order by groupId, artifactId -->
+    <!-- intra-project -->
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-exec</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <!-- inter-project -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+      <version>${hadoop.version}</version>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <scope>provided</scope>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${hadoop.version}</version>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kudu</groupId>
+      <artifactId>kudu-client</artifactId>
+      <version>${kudu.version}</version>
+    </dependency>
+    <!-- test inter-project -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kudu</groupId>
+      <artifactId>kudu-test-utils</artifactId>
+      <version>${kudu.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <sourceDirectory>${basedir}/src/java</sourceDirectory>
+    <testSourceDirectory>${basedir}/src/test</testSourceDirectory>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <excludes>
+            <exclude>${exclude.tests}</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <!-- Set the os.detected.classifier property based on the Maven detected OS
+       because Hive's version of Maven doesn't support the os-maven-plugin.  -->
+  <profiles>
+    <profile>
+      <id>kudu-linux</id>
+      <activation>
+        <os>
+          <family>Unix</family>
+        </os>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.kudu</groupId>
+          <artifactId>kudu-binary</artifactId>
+          <version>${kudu.version}</version>
+          <classifier>linux-x86_64</classifier>
+          <scope>test</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
+      <id>kudu-mac</id>
+      <activation>
+        <os>
+          <family>mac</family>
+        </os>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.kudu</groupId>
+          <artifactId>kudu-binary</artifactId>
+          <version>${kudu.version}</version>
+          <classifier>osx-x86_64</classifier>
+          <scope>test</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
+      <id>kudu-windows</id>
+      <activation>
+        <os>
+          <family>Windows</family>
+        </os>
+      </activation>
+      <properties>
+        <!-- Kudu tests do not support Windows. -->
+        <exclude.tests>**/*.java</exclude.tests>
+      </properties>
+    </profile>
+  </profiles>
+</project>
diff --git a/kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduHiveUtils.java b/kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduHiveUtils.java
new file mode 100644
index 0000000..3b89e5d
--- /dev/null
+++ b/kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduHiveUtils.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.kudu;
+
+import java.io.IOException;
+import java.security.AccessController;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import javax.security.auth.Subject;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.kudu.ColumnTypeAttributes;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.KuduClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_MASTER_ADDRS_KEY;
+
+/**
+ * A collection of static utility methods for the Kudu Hive integration.
+ * This is useful for code sharing.
+ */
+public final class KuduHiveUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KuduHiveUtils.class);
+
+  private static final Text KUDU_TOKEN_KIND = new Text("kudu-authn-data");
+
+  private KuduHiveUtils() {}
+
+  /**
+   * Returns the union of the configuration and table properties with the
+   * table properties taking precedence.
+   */
+  public static Configuration createOverlayedConf(Configuration conf, Properties tblProps) {
+    Configuration newConf = new Configuration(conf);
+    for (Map.Entry<Object, Object> prop : tblProps.entrySet()) {
+      newConf.set((String) prop.getKey(), (String) prop.getValue());
+    }
+    return newConf;
+  }
+
+  public static String getMasterAddresses(Configuration conf) throws IOException {
+    // Load the default configuration.
+    String masterAddresses = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_KUDU_MASTER_ADDRESSES_DEFAULT);
+    if (StringUtils.isEmpty(masterAddresses)) {
+      throw new IOException("Kudu master addresses are not specified with " +
+          HiveConf.ConfVars.HIVE_KUDU_MASTER_ADDRESSES_DEFAULT.varname);
+    }
+    // Override with the table configuration if it exists.
+    if (!StringUtils.isEmpty(conf.get(KUDU_MASTER_ADDRS_KEY))) {
+      masterAddresses = conf.get(KUDU_MASTER_ADDRS_KEY);
+    }
+    return masterAddresses;
+  }
+
+  public static KuduClient getKuduClient(Configuration conf) throws IOException {
+    String masterAddresses = getMasterAddresses(conf);
+    KuduClient client = new KuduClient.KuduClientBuilder(masterAddresses).build();
+    importCredentialsFromCurrentSubject(client);
+    return client;
+  }
+
+  public static void importCredentialsFromCurrentSubject(KuduClient client) {
+    Subject subj = Subject.getSubject(AccessController.getContext());
+    if (subj == null) {
+      return;
+    }
+    Text service = new Text(client.getMasterAddressesAsString());
+    // Find the Hadoop credentials stored within the JAAS subject.
+    Set<Credentials> credSet = subj.getPrivateCredentials(Credentials.class);
+    for (Credentials creds : credSet) {
+      for (Token<?> tok : creds.getAllTokens()) {
+        if (!tok.getKind().equals(KUDU_TOKEN_KIND)) {
+          continue;
+        }
+        // Only import credentials relevant to the service corresponding to
+        // 'client'. This is necessary if we want to support a job which
+        // reads from one cluster and writes to another.
+        if (!tok.getService().equals(service)) {
+          LOG.debug("Not importing credentials for service " + service +
+              "(expecting service " + service + ")");
+          continue;
+        }
+        LOG.debug("Importing credentials for service " + service);
+        client.importAuthenticationCredentials(tok.getPassword());
+        return;
+      }
+    }
+  }
+
+  /* This method converts a Kudu type to to the corresponding Hive type */
+  public static PrimitiveTypeInfo toHiveType(Type kuduType, ColumnTypeAttributes attributes)
+      throws SerDeException {
+    switch (kuduType) {
+    case BOOL:
+      return TypeInfoFactory.booleanTypeInfo;
+    case INT8:
+      return TypeInfoFactory.byteTypeInfo;
+    case INT16:
+      return TypeInfoFactory.shortTypeInfo;
+    case INT32:
+      return TypeInfoFactory.intTypeInfo;
+    case INT64:
+      return TypeInfoFactory.longTypeInfo;
+    case UNIXTIME_MICROS:
+      return TypeInfoFactory.timestampTypeInfo;
+    case DECIMAL:
+      return TypeInfoFactory.getDecimalTypeInfo(attributes.getPrecision(), attributes.getScale());
+    case FLOAT:
+      return TypeInfoFactory.floatTypeInfo;
+    case DOUBLE:
+      return TypeInfoFactory.doubleTypeInfo;
+    case STRING:
+      return TypeInfoFactory.stringTypeInfo;
+    case BINARY:
+      return TypeInfoFactory.binaryTypeInfo;
+    default:
+      throw new SerDeException("Unsupported column type: " + kuduType);
+    }
+  }
+}
diff --git a/kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduInputFormat.java b/kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduInputFormat.java
new file mode 100644
index 0000000..b6aa662
--- /dev/null
+++ b/kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduInputFormat.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.kudu;
+
+import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_TABLE_NAME_KEY;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.kudu.client.Bytes;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduPredicate;
+import org.apache.kudu.client.KuduScanToken;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.LocatedTablet;
+import org.apache.kudu.client.RowResult;
+
+/**
+ * A Kudu InputFormat implementation for use by Hive.
+ */
+public class KuduInputFormat extends InputFormat<NullWritable, KuduWritable>
+    implements org.apache.hadoop.mapred.InputFormat<NullWritable, KuduWritable> {
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException {
+    return computeSplits(context.getConfiguration()).stream()
+        .map(is -> (InputSplit) is)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf conf, int numSplits)
+      throws IOException {
+    List<KuduInputSplit> splits = computeSplits(conf);
+    return splits.toArray(new org.apache.hadoop.mapred.InputSplit[0]);
+  }
+
+  private List<KuduInputSplit> computeSplits(Configuration conf) throws IOException {
+    try (KuduClient client = KuduHiveUtils.getKuduClient(conf)) {
+      // Hive depends on FileSplits so we get the dummy Path for the Splits.
+      Job job = Job.getInstance(conf);
+      JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job);
+      Path[] paths = FileInputFormat.getInputPaths(jobContext);
+      Path dummyPath = paths[0];
+
+      String tableName = conf.get(KUDU_TABLE_NAME_KEY);
+      if (StringUtils.isEmpty(tableName)) {
+        throw new IllegalArgumentException(KUDU_TABLE_NAME_KEY + " is not set.");
+      }
+      if (!client.tableExists(tableName)) {
+        throw new IllegalArgumentException("Kudu table does not exist: " + tableName);
+      }
+
+      KuduTable table = client.openTable(tableName);
+      List<KuduPredicate> predicates = KuduPredicateHandler.getPredicates(conf, table.getSchema());
+
+      KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table)
+          .setProjectedColumnNames(getProjectedColumns(conf));
+
+      for (KuduPredicate predicate : predicates) {
+        tokenBuilder.addPredicate(predicate);
+      }
+      List<KuduScanToken> tokens = tokenBuilder.build();
+
+      List<KuduInputSplit> splits = new ArrayList<>(tokens.size());
+      for (KuduScanToken token : tokens) {
+        List<String> locations = new ArrayList<>(token.getTablet().getReplicas().size());
+        for (LocatedTablet.Replica replica : token.getTablet().getReplicas()) {
+          locations.add(replica.getRpcHost());
+        }
+        splits.add(new KuduInputSplit(token, dummyPath, locations.toArray(new String[0])));
+      }
+      return splits;
+    }
+  }
+
+  private List<String> getProjectedColumns(Configuration conf) throws IOException {
+    String[] columnNamesArr = conf.getStrings(serdeConstants.LIST_COLUMNS);
+    if (null == columnNamesArr) {
+      throw new IOException(
+          "Hive column names must be provided to InputFormat in the Configuration");
+    }
+    List<String> columns = new ArrayList<>(Arrays.asList(columnNamesArr));
+    VirtualColumn.removeVirtualColumns(columns);
+    return columns;
+  }
+
+  @Override
+  public RecordReader<NullWritable, KuduWritable> createRecordReader(InputSplit split,
+                                                                     TaskAttemptContext context) {
+    Preconditions.checkArgument(split instanceof KuduInputSplit);
+    // Will be initialized via the initialize method.
+    return new KuduRecordReader();
+  }
+
+  @Override
+  public org.apache.hadoop.mapred.RecordReader<NullWritable, KuduWritable> getRecordReader(
+      org.apache.hadoop.mapred.InputSplit split, JobConf conf, Reporter reporter)
+      throws IOException {
+    Preconditions.checkArgument(split instanceof KuduInputSplit);
+    KuduRecordReader recordReader = new KuduRecordReader();
+    recordReader.initialize((KuduInputSplit) split, conf);
+    return recordReader;
+  }
+
+  /**
+   * An InputSplit represents the data to be processed by an individual Mapper.
+   * This is effectively a wrapper around a Kudu scan token.
+   */
+  static class KuduInputSplit extends FileSplit implements org.apache.hadoop.mapred.InputSplit {
+    /** The scan token that the split will use to scan the Kudu table. */
+    private byte[] serializedScanToken;
+
+    /** Tablet server locations which host the tablet to be scanned. */
+    private String[] locations;
+
+    @SuppressWarnings("unused") // required for deserialization.
+    KuduInputSplit() {
+      super(null, 0, 0, (String[]) null);
+    }
+
+    KuduInputSplit(KuduScanToken token, Path dummyPath, String[] locations) throws IOException {
+      super(dummyPath, 0, 0, locations);
+      this.serializedScanToken = token.serialize();
+      this.locations = locations;
+    }
+
+    byte[] getSerializedScanToken() {
+      return serializedScanToken;
+    }
+
+    @Override
+    public long getLength() {
+      return 0;
+    }
+
+    @Override
+    public String[] getLocations() {
+      return locations;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      super.readFields(in);
+      serializedScanToken = Bytes.readByteArray(in);
+      locations = new String[in.readInt()];
+      for (int i = 0; i < locations.length; i++) {
+        byte[] str = Bytes.readByteArray(in);
+        locations[i] = Bytes.getString(str);
+      }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      super.write(out);
+      Bytes.writeByteArray(out, serializedScanToken);
+      out.writeInt(locations.length);
+      for (String location : locations) {
+        byte[] str = Bytes.fromString(location);
+        Bytes.writeByteArray(out, str);
+      }
+    }
+  }
+
+  /**
+   * A RecordReader that reads the Kudu rows from a KuduInputSplit.
+   */
+  static class KuduRecordReader extends RecordReader<NullWritable, KuduWritable>
+      implements org.apache.hadoop.mapred.RecordReader<NullWritable, KuduWritable> {
+
+    private volatile boolean initialized = false;
+    private KuduClient client;
+    private KuduScanner scanner;
+    private Iterator<RowResult> iterator;
+    private RowResult currentValue;
+    private KuduWritable currentWritable;
+    private long pos;
+
+    KuduRecordReader() {}
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException {
+      Preconditions.checkArgument(split instanceof KuduInputSplit);
+      initialize((KuduInputSplit) split, context.getConfiguration());
+    }
+
+    private synchronized void initialize(KuduInputSplit split, Configuration conf)
+        throws IOException {
+      if (!initialized) {
+        byte[] serializedScanToken = split.getSerializedScanToken();
+        client = KuduHiveUtils.getKuduClient(conf);
+        scanner = KuduScanToken.deserializeIntoScanner(serializedScanToken, client);
+        iterator = scanner.iterator();
+        currentValue = null;
+        currentWritable = new KuduWritable(scanner.getProjectionSchema().newPartialRow());
+        pos = 0;
+        initialized = true;
+      }
+    }
+
+    @Override
+    public boolean nextKeyValue() {
+      if (iterator.hasNext()) {
+        currentValue = iterator.next();
+        currentWritable.setRow(currentValue);
+        pos++;
+        return true;
+      }
+      currentValue = null;
+      return false;
+    }
+
+    @Override
+    public NullWritable getCurrentKey() {
+      return NullWritable.get();
+    }
+
+    @Override
+    public KuduWritable getCurrentValue() {
+      Preconditions.checkNotNull(currentValue);
+      return currentWritable;
+    }
+
+    @Override
+    public boolean next(NullWritable nullWritable, KuduWritable kuduWritable) {
+      if (nextKeyValue()) {
+        kuduWritable.setRow(currentValue);
+        return true;
+      }
+      return false;
+    }
+
+    @Override public NullWritable createKey() {
+      return NullWritable.get();
+    }
+
+    @Override public KuduWritable createValue() {
+      return new KuduWritable(scanner.getProjectionSchema().newPartialRow());
+    }
+
+    @Override
+    public void close() throws IOException {
+      try {
+        scanner.close();
+      } catch (KuduException e) {
+        throw new IOException(e);
+      }
+      client.shutdown();
+    }
+
+    @Override
+    public long getPos() {
+      return pos;
+    }
+
+    @Override
+    public float getProgress() {
+      return 0;
+    }
+  }
+}
diff --git a/kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduOutputFormat.java b/kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduOutputFormat.java
new file mode 100644
index 0000000..abb0ed3
--- /dev/null
+++ b/kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduOutputFormat.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.kudu;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.Progressable;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.RowError;
+import org.apache.kudu.client.RowErrorsAndOverflowStatus;
+import org.apache.kudu.client.SessionConfiguration.FlushMode;
+
+import static org.apache.hadoop.hive.kudu.KuduHiveUtils.createOverlayedConf;
+import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_TABLE_NAME_KEY;
+
+/**
+ * A Kudu OutputFormatKuduPredicateHandler implementation for use by Hive.
+ */
+public class KuduOutputFormat extends OutputFormat<NullWritable, KuduWritable>
+    implements HiveOutputFormat<NullWritable, KuduWritable> {
+
+  @Override
+  public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+                                                           Class valueClass, boolean isCompressed,
+                                                           Properties tableProperties,
+                                                           Progressable progress)
+      throws IOException {
+    return new KuduRecordWriter(createOverlayedConf(jc, tableProperties));
+  }
+
+
+  @Override
+  public org.apache.hadoop.mapred.RecordWriter<NullWritable, KuduWritable> getRecordWriter(
+      FileSystem ignored, JobConf job, String name, Progressable progress)
+      throws IOException {
+    return new KuduRecordWriter(job);
+  }
+
+  @Override
+  public RecordWriter<NullWritable, KuduWritable> getRecordWriter(TaskAttemptContext context)
+      throws IOException {
+    return new KuduRecordWriter(context.getConfiguration());
+  }
+
+  @Override
+  public void checkOutputSpecs(FileSystem ignored, JobConf job) {
+    // Not doing any check.
+  }
+
+  @Override
+  public void checkOutputSpecs(JobContext context) {
+    // Not doing any check.
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
+    return new KuduOuputComitter();
+  }
+
+  static class KuduRecordWriter extends RecordWriter<NullWritable, KuduWritable>
+      implements FileSinkOperator.RecordWriter,
+      org.apache.hadoop.mapred.RecordWriter<NullWritable, KuduWritable> {
+    private KuduClient client;
+    private KuduTable table;
+    private KuduSession session;
+
+    KuduRecordWriter(Configuration conf) throws IOException {
+      this.client = KuduHiveUtils.getKuduClient(conf);
+
+      String tableName = conf.get(KUDU_TABLE_NAME_KEY);
+      if (StringUtils.isEmpty(tableName)) {
+        throw new IllegalArgumentException(KUDU_TABLE_NAME_KEY + " is not set.");
+      }
+      if (!client.tableExists(tableName)) {
+        throw new IllegalArgumentException("Kudu table does not exist: " + tableName);
+      }
+
+      this.table = client.openTable(tableName);
+      this.session = client.newSession();
+      this.session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND);
+    }
+
+    @Override
+    public void write(Writable row) throws IOException {
+      Preconditions.checkArgument(row instanceof KuduWritable);
+      Operation op = table.newUpsert();
+      ((KuduWritable) row).populateRow(op.getRow());
+      session.apply(op);
+    }
+
+    @Override
+    public void write(NullWritable key, KuduWritable value) throws IOException {
+      write(value);
+    }
+
+    @Override
+    public void close(boolean abort) throws IOException {
+      session.close();
+      processErrors();
+      client.close();
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException {
+      close(false);
+    }
+
+    @Override
+    public void close(Reporter reporter) throws IOException {
+      close(false);
+    }
+
+    private void processErrors() throws IOException {
+      RowErrorsAndOverflowStatus pendingErrors = session.getPendingErrors();
+      if (pendingErrors.getRowErrors().length != 0) {
+        RowError[] errors = pendingErrors.getRowErrors();
+        // Build a sample of error strings.
+        int sampleSize = 5;
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < errors.length; i++) {
+          if (i == sampleSize) {
+            break;
+          }
+          sb.append(errors[i].getErrorStatus().toString());
+        }
+        if (pendingErrors.isOverflowed()) {
+          throw new IOException(
+              "PendingErrors overflowed. Failed to write at least " + errors.length + " rows " +
+                  "to Kudu; Sample errors: " + sb.toString());
+        } else {
+          throw new IOException(
+              "Failed to write " + errors.length + " rows to Kudu; Sample errors: " +
+                  sb.toString());
+        }
+      }
+    }
+  }
+
+  /**
+   * A dummy committer class that does not do anything.
+   */
+  static class KuduOuputComitter extends OutputCommitter {
+    @Override
+    public void setupJob(JobContext jobContext) {
+      // do nothing.
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskAttemptContext) {
+      // do nothing.
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
+      return false;
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskAttemptContext) {
+      // do nothing.
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext taskAttemptContext) {
+      // do nothing.
+    }
+  }
+}
diff --git a/kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduPredicateHandler.java b/kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduPredicateHandler.java
new file mode 100644
index 0000000..4807f53
--- /dev/null
+++ b/kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduPredicateHandler.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.kudu;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.Timestamp;
+import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
+import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
+import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
+import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler.DecomposedPredicate;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualNS;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.KuduPredicate;
+import org.apache.kudu.client.KuduPredicate.ComparisonOp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Contains static methods for decomposing predicate/filter expressions and
+ * getting the equivalent Kudu predicates.
+ */
+public final class KuduPredicateHandler {
+  static final Logger LOG = LoggerFactory.getLogger(KuduPredicateHandler.class);
+
+  private KuduPredicateHandler() {}
+
+  /**
+   * Analyzes the predicates and return the portion of it which
+   * cannot be evaluated by Kudu during table access.
+   *
+   * @param predicateExpr predicate to be decomposed
+   * @param schema the schema of the Kudu table
+   * @return decomposed form of predicate, or null if no pushdown is possible at all
+   */
+  public static DecomposedPredicate decompose(ExprNodeDesc predicateExpr, Schema schema) {
+    IndexPredicateAnalyzer analyzer = newAnalyzer(schema);
+    List<IndexSearchCondition> sConditions = new ArrayList<>();
+    ExprNodeDesc residualPredicate = analyzer.analyzePredicate(predicateExpr, sConditions);
+
+    // Nothing to decompose.
+    if (sConditions.size() == 0) {
+      return null;
+    }
+
+    DecomposedPredicate decomposedPredicate = new DecomposedPredicate();
+    decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(sConditions);
+    decomposedPredicate.residualPredicate = (ExprNodeGenericFuncDesc) residualPredicate;
+    return decomposedPredicate;
+  }
+
+  private static IndexPredicateAnalyzer newAnalyzer(Schema schema) {
+    IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer();
+
+    // Register comparison operators which can be satisfied by Kudu predicates.
+    // Note: We are unable to decompose GenericUDFOPNull, GenericUDFOPNotNull, GenericUDFIn
+    // predicates even though they are pushed to Kudu because the IndexPredicateAnalyzer only
+    // supports GenericUDFBaseCompare UDFs.
+    // We can also handle some NOT predicates but the IndexPredicateAnalyzer also does
+    // not support this.
+    analyzer.addComparisonOp(GenericUDFOPEqual.class.getName());
+    analyzer.addComparisonOp(GenericUDFOPEqualNS.class.getName());
+    analyzer.addComparisonOp(GenericUDFOPGreaterThan.class.getName());
+    analyzer.addComparisonOp(GenericUDFOPEqualOrGreaterThan.class.getName());
+    analyzer.addComparisonOp(GenericUDFOPLessThan.class.getName());
+    analyzer.addComparisonOp(GenericUDFOPEqualOrLessThan.class.getName());
+
+    // Set the column names that can be satisfied.
+    for (ColumnSchema col : schema.getColumns()) {
+      // Skip binary columns because binary predicates are not supported. (HIVE-11370)
+      if (col.getType() != Type.BINARY) {
+        analyzer.allowColumnName(col.getName());
+      }
+    }
+
+    return analyzer;
+  }
+
+  /**
+   * Returns the list of Kudu predicates from the passed configuration.
+   *
+   * @param conf the execution configuration
+   * @param schema the schema of the Kudu table
+   * @return the list of Kudu predicates
+   */
+  public static List<KuduPredicate> getPredicates(Configuration conf, Schema schema) {
+    SearchArgument sarg = ConvertAstToSearchArg.createFromConf(conf);
+    if (sarg == null) {
+      return new ArrayList<>();
+    }
+    return toKuduPredicates(sarg, schema);
+  }
+
+  /**
+   * Translate the search argument to the KuduPredicates.
+   */
+  private static List<KuduPredicate> toKuduPredicates(SearchArgument sarg, Schema schema) {
+    List<KuduPredicate> results = new ArrayList<>();
+    try {
+      translate(sarg.getExpression(), sarg.getLeaves(), false, schema, results);
+    } catch(Exception ex) {
+      LOG.warn("Exception while generating Kudu predicates. Predicates will not be pushed", ex);
+      return Collections.emptyList();
+    }
+    return results;
+  }
+
+  private static void translate(ExpressionTree root, List<PredicateLeaf> leaves, boolean isNot,
+                                Schema schema, List<KuduPredicate> results) {
+    switch (root.getOperator()) {
+    case OR:
+      if (isNot) {
+        // Converts to an AND: not (A or B) = not A and not B
+        for(ExpressionTree child: root.getChildren()) {
+          translate(child, leaves, isNot, schema, results);
+        }
+      } else {
+        // Kudu doesn't support OR predicates.
+        return;
+      }
+    case AND:
+      if (isNot) {
+        // Converts to an OR: not (A and B) = not A or not B
+        // Kudu doesn't support OR predicates.
+        return;
+      } else {
+        for(ExpressionTree child: root.getChildren()) {
+          translate(child, leaves, isNot, schema, results);
+        }
+        return;
+      }
+    case NOT:
+      // Kudu doesn't support general NOT predicates, but some NOT operators
+      // can be converted into Kudu predicates. See leafToPredicates below.
+      translate(root.getChildren().get(0), leaves, !isNot, schema, results);
+      return;
+    case LEAF:
+      PredicateLeaf leaf = leaves.get(root.getLeaf());
+      if (schema.hasColumn(leaf.getColumnName())) {
+        results.addAll(leafToPredicates(leaf, isNot, schema));
+      }
+      return;
+    case CONSTANT:
+      return; // no predicate for constant
+    default:
+      throw new IllegalStateException("Unknown operator: " + root.getOperator());
+    }
+  }
+
+  private static List<KuduPredicate> leafToPredicates(PredicateLeaf leaf, boolean isNot, Schema schema) {
+    ColumnSchema column = schema.getColumn(leaf.getColumnName());
+    Object value = leaf.getLiteral();
+
+    switch (leaf.getOperator()) {
+    case EQUALS:
+      if (isNot) {
+        // Kudu doesn't support NOT EQUALS.
+        return Collections.emptyList();
+      } else {
+        return Collections.singletonList(KuduPredicate.newComparisonPredicate(column,
+            ComparisonOp.EQUAL, toKuduValue(value, column)));
+      }
+    case LESS_THAN:
+      if (isNot) {
+        return Collections.singletonList(KuduPredicate.newComparisonPredicate(column,
+            ComparisonOp.GREATER_EQUAL, toKuduValue(value, column)));
+      } else {
+        return Collections.singletonList(KuduPredicate.newComparisonPredicate(column,
+            ComparisonOp.LESS, toKuduValue(value, column)));
+      }
+    case LESS_THAN_EQUALS:
+      if (isNot) {
+        return Collections.singletonList(KuduPredicate.newComparisonPredicate(column,
+            ComparisonOp.GREATER, toKuduValue(value, column)));
+      } else {
+        return Collections.singletonList(KuduPredicate.newComparisonPredicate(column,
+            ComparisonOp.LESS_EQUAL, toKuduValue(value, column)));
+      }
+    case IS_NULL:
+      if (isNot) {
+        return Collections.singletonList(KuduPredicate.newIsNotNullPredicate(column));
+      } else {
+        return Collections.singletonList(KuduPredicate.newIsNullPredicate(column));
+      }
+    case IN:
+      if (isNot) {
+        // Kudu doesn't support NOT IN.
+        return Collections.emptyList();
+      } else {
+        List<Object> values = leaf.getLiteralList().stream()
+            .map((Object v) -> toKuduValue(v, column))
+            .collect(Collectors.toList());
+        return Collections.singletonList(KuduPredicate.newInListPredicate(column, values));
+      }
+    case BETWEEN:
+      List<Object> values = leaf.getLiteralList();
+      Object leftValue = toKuduValue(values.get(0), column);
+      Object rightValue = toKuduValue(values.get(1), column);
+      if (isNot) {
+        return Arrays.asList(
+            KuduPredicate.newComparisonPredicate(column, ComparisonOp.LESS, leftValue),
+            KuduPredicate.newComparisonPredicate(column, ComparisonOp.GREATER, rightValue)
+        );
+      } else {
+        return Arrays.asList(
+            KuduPredicate.newComparisonPredicate(column, ComparisonOp.GREATER_EQUAL, leftValue),
+            KuduPredicate.newComparisonPredicate(column, ComparisonOp.LESS_EQUAL, rightValue)
+        );
+      }
+    case NULL_SAFE_EQUALS:
+      // Kudu doesn't support null value predicates.
+      return Collections.emptyList();
+    default:
+      throw new RuntimeException("Unhandled operator: " + leaf.getOperator());
+    }
+  }
+
+  // Converts Hive value objects to the value Objects expected by Kudu.
+  private static Object toKuduValue(Object value, ColumnSchema column) {
+    if (value instanceof HiveDecimalWritable) {
+      return ((HiveDecimalWritable) value).getHiveDecimal().bigDecimalValue();
+    } else if (value instanceof Timestamp) {
+      // Calling toSqlTimestamp and using the addTimestamp API ensures we properly
+      // convert Hive localDateTime to UTC.
+      return ((Timestamp) value).toSqlTimestamp();
+    } else if (value instanceof Double && column.getType() == Type.FLOAT) {
+      // Hive search arguments use Double for both FLOAT and DOUBLE columns.
+      // Down convert to match the FLOAT columns type.
+      return ((Double) value).floatValue();
+    } else {
+      return value;
+    }
+  }
+}
diff --git a/kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduSerDe.java b/kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduSerDe.java
new file mode 100644
index 0000000..33c3e0e
--- /dev/null
+++ b/kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduSerDe.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.kudu;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.Timestamp;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeSpec;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritableV2;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.PartialRow;
+
+import static org.apache.hadoop.hive.kudu.KuduHiveUtils.createOverlayedConf;
+import static org.apache.hadoop.hive.kudu.KuduHiveUtils.toHiveType;
+import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_TABLE_NAME_KEY;
+
+/**
+ * A Kudu serializer and deserializer to support reading and writing Kudu data from Hive.
+ */
+@SerDeSpec(schemaProps = { KuduStorageHandler.KUDU_TABLE_NAME_KEY })
+public class KuduSerDe extends AbstractSerDe {
+
+  private ObjectInspector objectInspector;
+  private Schema schema;
+
+  @SuppressWarnings("unused")
+  public KuduSerDe() {}
+
+  @Override
+  public void initialize(Configuration sysConf, Properties tblProps)
+      throws SerDeException {
+    Configuration conf = createOverlayedConf(sysConf, tblProps);
+    String tableName = conf.get(KuduStorageHandler.KUDU_TABLE_NAME_KEY);
+    if (StringUtils.isEmpty(tableName)) {
+      throw new SerDeException(KUDU_TABLE_NAME_KEY + " is not set.");
+    }
+    try (KuduClient client = KuduHiveUtils.getKuduClient(conf)) {
+      if (!client.tableExists(tableName)) {
+        throw new SerDeException("Kudu table does not exist: " + tableName);
+      }
+      schema = client.openTable(tableName).getSchema();
+    } catch (IOException ex) {
+      throw new SerDeException(ex);
+    }
+    this.objectInspector = createObjectInspector(schema);
+  }
+
+  private static ObjectInspector createObjectInspector(Schema schema) throws SerDeException {
+    Preconditions.checkNotNull(schema);
+    List<String> fieldNames = new ArrayList<>();
+    List<ObjectInspector> fieldInspectors = new ArrayList<>();
+    List<String> fieldComments = new ArrayList<>();
+    for (int i = 0; i < schema.getColumnCount(); i++) {
+      ColumnSchema col = schema.getColumnByIndex(i);
+      PrimitiveTypeInfo typeInfo = toHiveType(col.getType(), col.getTypeAttributes());
+      fieldNames.add(col.getName());
+      fieldInspectors.add(
+          PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(typeInfo));
+      fieldComments.add(col.getComment());
+    }
+    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldInspectors,
+        fieldComments);
+  }
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  @Override
+  public ObjectInspector getObjectInspector() {
+    return objectInspector;
+  }
+
+  @Override
+  public Class<? extends Writable> getSerializedClass() {
+    return KuduWritable.class;
+  }
+
+  /**
+   * Serialize an object by navigating inside the Object with the ObjectInspector.
+   */
+  @Override
+  public KuduWritable serialize(Object obj, ObjectInspector objectInspector) throws SerDeException {
+    Preconditions.checkArgument(objectInspector.getCategory() == Category.STRUCT);
+
+    StructObjectInspector soi = (StructObjectInspector) objectInspector;
+    List<Object> writableObj = soi.getStructFieldsDataAsList(obj);
+    List<? extends StructField> fields = soi.getAllStructFieldRefs();
+
+    PartialRow row = schema.newPartialRow();
+    for (int i = 0; i < schema.getColumnCount(); i++) {
+      StructField field = fields.get(i);
+      Object value = writableObj.get(i);
+
+      if (value == null) {
+        row.setNull(i);
+      } else {
+        Type type = schema.getColumnByIndex(i).getType();
+        ObjectInspector inspector = field.getFieldObjectInspector();
+        switch (type) {
+        case BOOL:
+          boolean boolVal = ((BooleanObjectInspector) inspector).get(value);
+          row.addBoolean(i, boolVal);
+          break;
+        case INT8:
+          byte byteVal = ((ByteObjectInspector) inspector).get(value);
+          row.addByte(i, byteVal);
+          break;
+        case INT16:
+          short shortVal = ((ShortObjectInspector) inspector).get(value);
+          row.addShort(i, shortVal);
+          break;
+        case INT32:
+          int intVal = ((IntObjectInspector) inspector).get(value);
+          row.addInt(i, intVal);
+          break;
+        case INT64:
+          long longVal = ((LongObjectInspector) inspector).get(value);
+          row.addLong(i, longVal);
+          break;
+        case UNIXTIME_MICROS:
+          // Calling toSqlTimestamp and using the addTimestamp API ensures we properly
+          // convert Hive localDateTime to UTC.
+          java.sql.Timestamp timestampVal = ((TimestampObjectInspector) inspector)
+              .getPrimitiveJavaObject(value).toSqlTimestamp();
+          row.addTimestamp(i, timestampVal);
+          break;
+        case DECIMAL:
+          HiveDecimal decimalVal = ((HiveDecimalObjectInspector) inspector)
+              .getPrimitiveJavaObject(value);
+          row.addDecimal(i, decimalVal.bigDecimalValue());
+          break;
+        case FLOAT:
+          float floatVal = ((FloatObjectInspector) inspector).get(value);
+          row.addFloat(i, floatVal);
+          break;
+        case DOUBLE:
+          double doubleVal = ((DoubleObjectInspector) inspector).get(value);
+          row.addDouble(i, doubleVal);
+          break;
+        case STRING:
+          String stringVal = ((StringObjectInspector) inspector).getPrimitiveJavaObject(value);
+          row.addString(i, stringVal);
+          break;
+        case BINARY:
+          byte[] bytesVal = ((BinaryObjectInspector) inspector).getPrimitiveJavaObject(value);
+          row.addBinary(i, bytesVal);
+          break;
+        default:
+          throw new SerDeException("Unsupported column type: " + type.name());
+        }
+      }
+    }
+    return new KuduWritable(row);
+  }
+
+  /**
+   * Deserialize an object out of a Writable blob.
+   */
+  @Override
+  public Object deserialize(Writable writable) throws SerDeException {
+    KuduWritable input = (KuduWritable) writable;
+    List<Object> output = new ArrayList<>();
+    for(int i  = 0; i < schema.getColumnCount(); i++) {
+      // If the column isn't set, skip it.
+      if (!input.isSet(i)) {
+        continue;
+      }
+      Object javaObj = input.getValueObject(i);
+      ColumnSchema col = schema.getColumnByIndex(i);
+      PrimitiveTypeInfo typeInfo = toHiveType(col.getType(), col.getTypeAttributes());
+      if (javaObj == null) {
+        output.add(null);
+      } else {
+        switch (typeInfo.getPrimitiveCategory()) {
+        case BOOLEAN:
+          output.add(new BooleanWritable((boolean) javaObj));
+          break;
+        case BYTE:
+          output.add(new ByteWritable((byte) javaObj));
+          break;
+        case SHORT:
+          output.add(new ShortWritable((short) javaObj));
+          break;
+        case INT:
+          output.add(new IntWritable((int) javaObj));
+          break;
+        case LONG:
+          output.add(new LongWritable((long) javaObj));
+          break;
+        case TIMESTAMP:
+          java.sql.Timestamp sqlTs = (java.sql.Timestamp) javaObj;
+          Timestamp hiveTs = Timestamp.ofEpochMilli(sqlTs.getTime(), sqlTs.getNanos());
+          output.add(new TimestampWritableV2(hiveTs));
+          break;
+        case DECIMAL:
+          HiveDecimal hiveDecimal = HiveDecimal.create((BigDecimal) javaObj);
+          output.add(new HiveDecimalWritable(hiveDecimal));
+          break;
+        case FLOAT:
+          output.add(new FloatWritable((float) javaObj));
+          break;
+        case DOUBLE:
+          output.add(new DoubleWritable((double) javaObj));
+          break;
+        case STRING:
+          output.add(new Text((String) javaObj));
+          break;
+        case BINARY:
+          output.add(new BytesWritable((byte[]) javaObj));
+          break;
+        default:
+          throw new SerDeException("Unsupported type: " + typeInfo.getPrimitiveCategory());
+        }
+      }
+    }
+    return output;
+  }
+
+  @Override
+  public SerDeStats getSerDeStats() {
+    // No support for statistics. That seems to be a popular answer.
+    return null;
+  }
+}
+
+
diff --git a/kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduStorageHandler.java b/kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduStorageHandler.java
new file mode 100644
index 0000000..a9c6968
--- /dev/null
+++ b/kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduStorageHandler.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.kudu;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.kudu.KuduOutputFormat.KuduRecordWriter;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
+import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider;
+import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.kudu.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides a HiveStorageHandler implementation for Apache Kudu.
+ */
+public class KuduStorageHandler extends DefaultStorageHandler implements HiveStoragePredicateHandler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KuduStorageHandler.class);
+
+  private static final String KUDU_PROPERTY_PREFIX = "kudu.";
+
+  /** Table Properties. Used in the hive table definition when creating a new table. */
+  public static final String KUDU_TABLE_ID_KEY = KUDU_PROPERTY_PREFIX + "table_id";
+  public static final String KUDU_TABLE_NAME_KEY = KUDU_PROPERTY_PREFIX + "table_name";
+  public static final String KUDU_MASTER_ADDRS_KEY = KUDU_PROPERTY_PREFIX + "master_addresses";
+  public static final List<String> KUDU_TABLE_PROPERTIES =
+      Arrays.asList(KUDU_TABLE_ID_KEY, KUDU_TABLE_NAME_KEY, KUDU_MASTER_ADDRS_KEY);
+
+  private Configuration conf;
+
+  @Override
+  public Class<? extends InputFormat> getInputFormatClass() {
+    return KuduInputFormat.class;
+  }
+
+  @Override
+  public Class<? extends OutputFormat> getOutputFormatClass() {
+    return KuduOutputFormat.class;
+  }
+
+  @Override
+  public Class<? extends AbstractSerDe> getSerDeClass() {
+    return KuduSerDe.class;
+  }
+
+  @Override
+  public HiveMetaHook getMetaHook() {
+    return null;
+  }
+
+  @Override
+  public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException {
+    return new DefaultHiveAuthorizationProvider();
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void configureInputJobProperties(TableDesc tableDesc,
+      Map<String, String> jobProperties) {
+    configureJobProperties(tableDesc, jobProperties);
+  }
+
+  @Override
+  public void configureOutputJobProperties(TableDesc tableDesc,
+      Map<String, String> jobProperties) {
+    configureJobProperties(tableDesc, jobProperties);
+  }
+
+  @Override
+  public void configureTableJobProperties(TableDesc tableDesc,
+      Map<String, String> jobProperties) {
+    configureJobProperties(tableDesc, jobProperties);
+  }
+
+  @Override
+  public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
+    // Copied from the DruidStorageHandler.
+    if (UserGroupInformation.isSecurityEnabled()) {
+      // AM can not do Kerberos Auth so will do the input split generation in the HS2
+      LOG.debug("Setting {} to {} to enable split generation on HS2",
+          HiveConf.ConfVars.HIVE_AM_SPLIT_GENERATION.toString(),
+          Boolean.FALSE.toString());
+      jobConf.set(HiveConf.ConfVars.HIVE_AM_SPLIT_GENERATION.toString(), Boolean.FALSE.toString());
+    }
+    try {
+      addDependencyJars(jobConf, KuduRecordWriter.class);
+    } catch (IOException e) {
+      Throwables.propagate(e);
+    }
+  }
+
+  // Copied from the DruidStorageHandler.
+  private static void addDependencyJars(Configuration conf, Class<?>... classes)
+      throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    Set<String> jars = new HashSet<>(conf.getStringCollection("tmpjars"));
+    for (Class<?> clazz : classes) {
+      if (clazz == null) {
+        continue;
+      }
+      final String path = Utilities.jarFinderGetJar(clazz);
+      if (path == null) {
+        throw new RuntimeException("Could not find jar for class " + clazz +
+            " in order to ship it to the cluster.");
+      }
+      if (!localFs.exists(new Path(path))) {
+        throw new RuntimeException("Could not validate jar file " + path + " for class " + clazz);
+      }
+      jars.add(path);
+    }
+    if (jars.isEmpty()) {
+      return;
+    }
+    //noinspection ToArrayCallWithZeroLengthArrayArgument
+    conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()])));
+  }
+
+  private void configureJobProperties(TableDesc tableDesc,
+      Map<String, String> jobProperties) {
+
+    Properties tblProps = tableDesc.getProperties();
+    copyPropertiesFromTable(jobProperties, tblProps);
+  }
+
+  private void copyPropertiesFromTable(Map<String, String> jobProperties, Properties tblProps) {
+    for (String propToCopy : KUDU_TABLE_PROPERTIES) {
+      if (tblProps.containsKey(propToCopy)) {
+        String value = tblProps.getProperty(propToCopy);
+        conf.set(propToCopy, value);
+        jobProperties.put(propToCopy, value);
+      }
+    }
+  }
+
+  /**
+   * Gives the storage handler a chance to decompose a predicate.
+   * The storage handler should analyze the predicate and return the portion of it which
+   * cannot be evaluated during table access.
+   *
+   * @param jobConf contains a job configuration matching the one that will later be passed
+   *               to getRecordReader and getSplits
+   * @param deserializer deserializer which will be used when fetching rows
+   * @param predicate predicate to be decomposed
+   * @return decomposed form of predicate, or null if no pushdown is possible at all
+   */
+  @Override
+  public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer,
+                                                ExprNodeDesc predicate) {
+    Preconditions.checkArgument(deserializer instanceof KuduSerDe);
+    KuduSerDe serDe = (KuduSerDe) deserializer;
+    Schema schema = serDe.getSchema();
+    return KuduPredicateHandler.decompose(predicate, schema);
+  }
+
+  /**
+   * Used to fetch runtime information about storage handler during DESCRIBE EXTENDED statement.
+   */
+  @Override
+  public StorageHandlerInfo getStorageHandlerInfo(Table table) throws MetaException {
+    return null;
+  }
+}
diff --git a/kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduWritable.java b/kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduWritable.java
new file mode 100644
index 0000000..20e37f9
--- /dev/null
+++ b/kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduWritable.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.kudu;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.io.Writable;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.RowResult;
+
+/**
+ * A Writable representation of a Kudu Row.
+ * The row may be either a PartialRow or a RowResult.
+ */
+public class KuduWritable implements Writable {
+
+  private PartialRow partialRow;
+  private RowResult rowResult;
+
+  public KuduWritable(PartialRow row) {
+    this.partialRow = row;
+    this.rowResult = null;
+  }
+
+  public void setRow(RowResult rowResult) {
+    this.rowResult = rowResult;
+    this.partialRow = null;
+  }
+
+  public PartialRow getPartialRow() {
+    Preconditions.checkNotNull(partialRow);
+    return partialRow;
+  }
+
+  public RowResult getRowResult() {
+    Preconditions.checkNotNull(rowResult);
+    return rowResult;
+  }
+
+  public Object getValueObject(int colIndex) {
+    if (partialRow != null) {
+      return partialRow.getObject(colIndex);
+    } else {
+      return rowResult.getObject(colIndex);
+    }
+  }
+
+  public Object getValueObject(String colName) {
+    if (partialRow != null) {
+      return partialRow.getObject(colName);
+    } else {
+      return rowResult.getObject(colName);
+    }
+  }
+
+  public boolean isSet(int colIndex) {
+    if (partialRow != null) {
+      return partialRow.isSet(colIndex);
+    } else {
+      // RowResult columns are always set.
+      return true;
+    }
+  }
+
+  public boolean isSet(String colName) {
+    if (partialRow != null) {
+      return partialRow.isSet(colName);
+    } else {
+      // RowResult columns are always set.
+      return true;
+    }
+  }
+
+  public void populateRow(PartialRow row) {
+    Schema schema = row.getSchema();
+    for (int i = 0; i < schema.getColumnCount(); i++) {
+      ColumnSchema col = schema.getColumnByIndex(i);
+      if (isSet(col.getName())) {
+        Object value = getValueObject(col.getName());
+        row.addObject(i, value);
+      }
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void write(DataOutput out) {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/kudu-handler/src/java/org/apache/hadoop/hive/kudu/package-info.java b/kudu-handler/src/java/org/apache/hadoop/hive/kudu/package-info.java
new file mode 100644
index 0000000..1c933da
--- /dev/null
+++ b/kudu-handler/src/java/org/apache/hadoop/hive/kudu/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Serde, InputFormat, and OutputFormat support for connecting Hive to Kudu tables.
+ */
+package org.apache.hadoop.hive.kudu;
diff --git a/kudu-handler/src/test/org/apache/hadoop/hive/kudu/KuduTestUtils.java b/kudu-handler/src/test/org/apache/hadoop/hive/kudu/KuduTestUtils.java
new file mode 100644
index 0000000..f24da5f
--- /dev/null
+++ b/kudu-handler/src/test/org/apache/hadoop/hive/kudu/KuduTestUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.kudu;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnTypeAttributes;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+
+import java.util.Arrays;
+
+/**
+ * Utilities for shared kudu-handler testing logic.
+ */
+public final class KuduTestUtils {
+
+  private KuduTestUtils() {}
+
+  public static Schema getAllTypesSchema() {
+    return new org.apache.kudu.Schema(Arrays.asList(
+        new ColumnSchema.ColumnSchemaBuilder("key", Type.INT8).key(true)
+            .comment("The key column.").build(),
+        new ColumnSchema.ColumnSchemaBuilder("int16", Type.INT16).build(),
+        new ColumnSchema.ColumnSchemaBuilder("int32", Type.INT32).build(),
+        new ColumnSchema.ColumnSchemaBuilder("int64", Type.INT64).build(),
+        new ColumnSchema.ColumnSchemaBuilder("bool", Type.BOOL).build(),
+        new ColumnSchema.ColumnSchemaBuilder("float", Type.FLOAT).build(),
+        new ColumnSchema.ColumnSchemaBuilder("double", Type.DOUBLE).build(),
+        new ColumnSchema.ColumnSchemaBuilder("string", Type.STRING).build(),
+        new ColumnSchema.ColumnSchemaBuilder("binary", Type.BINARY).build(),
+        new ColumnSchema.ColumnSchemaBuilder("timestamp", Type.UNIXTIME_MICROS).build(),
+        new ColumnSchema.ColumnSchemaBuilder("decimal", Type.DECIMAL)
+            .typeAttributes(new ColumnTypeAttributes.ColumnTypeAttributesBuilder()
+                .precision(5).scale(3).build())
+            .build(),
+        new ColumnSchema.ColumnSchemaBuilder("null", Type.STRING).nullable(true).build(),
+        new ColumnSchema.ColumnSchemaBuilder("default", Type.INT32).defaultValue(1).build()
+    ));
+  }
+
+}
diff --git a/kudu-handler/src/test/org/apache/hadoop/hive/kudu/TestKuduInputFormat.java b/kudu-handler/src/test/org/apache/hadoop/hive/kudu/TestKuduInputFormat.java
new file mode 100644
index 0000000..feb6f75
--- /dev/null
+++ b/kudu-handler/src/test/org/apache/hadoop/hive/kudu/TestKuduInputFormat.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.kudu;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.kudu.KuduInputFormat.KuduInputSplit;
+import org.apache.hadoop.hive.kudu.KuduInputFormat.KuduRecordReader;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.shaded.com.google.common.collect.ImmutableList;
+import org.apache.kudu.test.KuduTestHarness;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hive.kudu.KuduHiveUtils.toHiveType;
+import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_MASTER_ADDRS_KEY;
+import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_TABLE_NAME_KEY;
+import static org.apache.hadoop.hive.kudu.KuduTestUtils.getAllTypesSchema;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the KuduInputFormat implementation.
+ */
+public class TestKuduInputFormat {
+
+  private static final String TABLE_NAME = "default.TestKuduInputFormat";
+
+  private static final Schema SCHEMA = getAllTypesSchema();
+
+  private static final Configuration BASE_CONF = new Configuration();
+
+  private static final long NOW_MS = System.currentTimeMillis();
+
+  private static final PartialRow ROW;
+  static {
+    ROW = SCHEMA.newPartialRow();
+    ROW.addByte("key", (byte) 1);
+    ROW.addShort("int16", (short) 1);
+    ROW.addInt("int32", 1);
+    ROW.addLong("int64", 1L);
+    ROW.addBoolean("bool", true);
+    ROW.addFloat("float", 1.1f);
+    ROW.addDouble("double", 1.1d);
+    ROW.addString("string", "one");
+    ROW.addBinary("binary", "one".getBytes(UTF_8));
+    ROW.addTimestamp("timestamp", new Timestamp(NOW_MS));
+    ROW.addDecimal("decimal", new BigDecimal("1.111"));
+    ROW.setNull("null");
+    // Not setting the "default" column.
+  }
+
+  @Rule
+  public KuduTestHarness harness = new KuduTestHarness();
+
+  @Before
+  public void setUp() throws Exception {
+    // Set the base configuration values.
+    BASE_CONF.set(KUDU_MASTER_ADDRS_KEY, harness.getMasterAddressesAsString());
+    BASE_CONF.set(KUDU_TABLE_NAME_KEY, TABLE_NAME);
+    BASE_CONF.set(FileInputFormat.INPUT_DIR, "dummy");
+
+    // Create the test Kudu table.
+    CreateTableOptions options = new CreateTableOptions()
+        .setRangePartitionColumns(ImmutableList.of("key"));
+    harness.getClient().createTable(TABLE_NAME, SCHEMA, options);
+
+    // Insert a test row.
+    KuduTable table = harness.getClient().openTable(TABLE_NAME);
+    KuduSession session = harness.getClient().newSession();
+    Insert insert = table.newInsert();
+    PartialRow insertRow = insert.getRow();
+    // Use KuduWritable, to populate the insert row.
+    new KuduWritable(ROW).populateRow(insertRow);
+    session.apply(insert);
+    session.close();
+  }
+
+  @Test
+  public void testAllColumns() throws Exception {
+    KuduInputFormat input = new KuduInputFormat();
+
+    JobConf jobConf = new JobConf(BASE_CONF);
+    String columnsStr = SCHEMA.getColumns().stream()
+        .map(ColumnSchema::getName)
+        .collect(Collectors.joining(","));
+    jobConf.set(serdeConstants.LIST_COLUMNS, columnsStr);
+
+    InputSplit[] splits = input.getSplits(jobConf, 1);
+    assertEquals(1, splits.length);
+    KuduInputSplit split = (KuduInputSplit) splits[0];
+
+    KuduRecordReader reader =
+        (KuduRecordReader) input.getRecordReader(split, jobConf, null);
+    assertTrue(reader.nextKeyValue());
+    RowResult value = reader.getCurrentValue().getRowResult();
+    verfiyRow(value);
+    assertFalse(reader.nextKeyValue());
+  }
+
+  @Test
+  public void testProjection() throws Exception {
+    KuduInputFormat input = new KuduInputFormat();
+
+
+    JobConf jobConf = new JobConf(BASE_CONF);
+    jobConf.set(serdeConstants.LIST_COLUMNS, "bool,key");
+
+    InputSplit[] splits = input.getSplits(jobConf, 1);
+    assertEquals(1, splits.length);
+    KuduInputSplit split = (KuduInputSplit) splits[0];
+
+    KuduRecordReader reader =
+        (KuduRecordReader) input.getRecordReader(split, jobConf, null);
+    assertTrue(reader.nextKeyValue());
+    RowResult value = reader.getCurrentValue().getRowResult();
+    assertEquals(2, value.getSchema().getColumnCount());
+    assertTrue(value.getBoolean(0));
+    assertEquals((byte) 1, value.getByte(1));
+    assertFalse(reader.nextKeyValue());
+  }
+
+  @Test
+  public void testMissingTable() throws Exception {
+    KuduInputFormat input = new KuduInputFormat();
+
+    JobConf jobConf = new JobConf(BASE_CONF);
+    jobConf.unset(KUDU_TABLE_NAME_KEY);
+    jobConf.set(serdeConstants.LIST_COLUMNS, "key");
+
+    try {
+      input.getSplits(jobConf, 1);
+      fail("Should fail on missing master addresses");
+    } catch (IllegalArgumentException ex) {
+      assertThat(ex.getMessage(), containsString("kudu.table_name is not set"));
+    }
+  }
+
+  @Test
+  public void testBadTable() throws Exception {
+    KuduInputFormat input = new KuduInputFormat();
+
+    JobConf jobConf = new JobConf(BASE_CONF);
+    jobConf.set(KUDU_TABLE_NAME_KEY, "default.notatable");
+    jobConf.set(serdeConstants.LIST_COLUMNS, "key");
+
+    try {
+      input.getSplits(jobConf, 1);
+      fail("Should fail on a bad table");
+    } catch (IllegalArgumentException ex) {
+      assertThat(ex.getMessage(),
+          containsString("Kudu table does not exist: default.notatable"));
+    }
+  }
+
+  @Test
+  public void testMissingColumn() throws Exception {
+    KuduInputFormat input = new KuduInputFormat();
+
+    JobConf jobConf = new JobConf(BASE_CONF);
+    jobConf.set(serdeConstants.LIST_COLUMNS, "missing");
+
+    try {
+      input.getSplits(jobConf, 1);
+      fail("Should fail on missing column");
+    } catch (IllegalArgumentException ex) {
+      assertThat(ex.getMessage(), containsString("Unknown column: missing"));
+    }
+  }
+
+  @Test
+  public void testMultipleSplits() throws Exception {
+    String tableName = "default.twoPartitionTable";
+    Schema schema = new Schema(Arrays.asList(
+        new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build(),
+        new ColumnSchema.ColumnSchemaBuilder("string", Type.STRING).build()
+    ));
+    CreateTableOptions options = new CreateTableOptions()
+        .addHashPartitions(Collections.singletonList("key"), 2);
+    harness.getClient().createTable(tableName, schema, options);
+
+    // Insert multiple test rows.
+    KuduTable table = harness.getClient().openTable(tableName);
+    KuduSession session = harness.getClient().newSession();
+    Insert insert1 = table.newInsert();
+    PartialRow row1 = insert1.getRow();
+    row1.addInt("key", 1);
+    row1.addString("string", "one");
+    session.apply(insert1);
+    Insert insert2 = table.newInsert();
+    PartialRow row2 = insert2.getRow();
+    row2.addInt("key", 2);
+    row2.addString("string", "two");
+    session.apply(insert2);
+    session.close();
+
+    KuduInputFormat input = new KuduInputFormat();
+
+    JobConf jobConf = new JobConf(BASE_CONF);
+    jobConf.set(KUDU_TABLE_NAME_KEY, tableName);
+    jobConf.set(serdeConstants.LIST_COLUMNS, "key");
+
+    InputSplit[] splits = input.getSplits(jobConf, 1);
+    assertEquals(2, splits.length);
+  }
+
+  @Test
+  public void testPredicate() throws Exception {
+    // Insert a second test row that will be filtered out.
+    KuduTable table = harness.getClient().openTable(TABLE_NAME);
+    KuduSession session = harness.getClient().newSession();
+    Insert insert = table.newInsert();
+    PartialRow row = insert.getRow();
+    row.addByte("key", (byte) 2);
+    row.addShort("int16", (short) 2);
+    row.addInt("int32", 2);
+    row.addLong("int64", 2L);
+    row.addBoolean("bool", false);
+    row.addFloat("float", 2.2f);
+    row.addDouble("double", 2.2d);
+    row.addString("string", "two");
+    row.addBinary("binary", "two".getBytes(UTF_8));
+    row.addTimestamp("timestamp", new Timestamp(NOW_MS + 1));
+    row.addDecimal("decimal", new BigDecimal("2.222"));
+    row.setNull("null");
+    // Not setting the "default" column.
+    session.apply(insert);
+    session.close();
+
+    KuduInputFormat input = new KuduInputFormat();
+
+    // Test an equality predicate for each column.
+    for (ColumnSchema col : SCHEMA.getColumns()) {
+      // Skip null and default columns because they don't have a value to use.
+      // Skip binary columns because binary predicates are not supported. (HIVE-11370)
+      if (col.getName().equals("null") || col.getName().equals("default") ||
+          col.getName().equals("binary")) {
+        continue;
+      }
+
+      JobConf jobConf = new JobConf(BASE_CONF);
+      String columnsStr = SCHEMA.getColumns().stream()
+          .map(ColumnSchema::getName)
+          .collect(Collectors.joining(","));
+      jobConf.set(serdeConstants.LIST_COLUMNS, columnsStr);
+
+      PrimitiveTypeInfo typeInfo = toHiveType(col.getType(), col.getTypeAttributes());
+      ExprNodeDesc colExpr =  new ExprNodeColumnDesc(typeInfo, col.getName(), null, false);
+      ExprNodeDesc constExpr = new ExprNodeConstantDesc(typeInfo, ROW.getObject(col.getName()));
+      List<ExprNodeDesc> children = Lists.newArrayList();
+      children.add(colExpr);
+      children.add(constExpr);
+      ExprNodeGenericFuncDesc predicateExpr =
+          new ExprNodeGenericFuncDesc(typeInfo, new GenericUDFOPEqual(), children);
+
+      String filterExpr = SerializationUtilities.serializeExpression(predicateExpr);
+      jobConf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
+
+      InputSplit[] splits = input.getSplits(jobConf, 1);
+      assertEquals(1, splits.length);
+      KuduInputSplit split = (KuduInputSplit) splits[0];
+
+      KuduRecordReader reader =
+          (KuduRecordReader) input.getRecordReader(split, jobConf, null);
+      assertTrue(reader.nextKeyValue());
+      RowResult value = reader.getCurrentValue().getRowResult();
+      verfiyRow(value);
+      assertFalse("Extra row on column: " + col.getName(), reader.nextKeyValue());
+    }
+  }
+  private void verfiyRow(RowResult value) {
+    assertEquals(SCHEMA.getColumnCount(), value.getSchema().getColumnCount());
+    assertEquals(ROW.getByte(0), value.getByte(0));
+    assertEquals(ROW.getShort(1), value.getShort(1));
+    assertEquals(ROW.getInt(2), value.getInt(2));
+    assertEquals(ROW.getLong(3), value.getLong(3));
+    assertEquals(ROW.getBoolean(4), value.getBoolean(4));
+    assertEquals(ROW.getFloat(5), value.getFloat(5), 0);
+    assertEquals(ROW.getDouble(6), value.getDouble(6), 0);
+    assertEquals(ROW.getString(7), value.getString(7));
+    assertArrayEquals(ROW.getBinaryCopy(8), value.getBinaryCopy(8));
+    assertEquals(ROW.getTimestamp(9), value.getTimestamp(9));
+    assertEquals(ROW.getDecimal(10), value.getDecimal(10));
+    assertTrue(value.isNull(11));
+    assertEquals(1, value.getInt(12)); // default.
+  }
+}
diff --git a/kudu-handler/src/test/org/apache/hadoop/hive/kudu/TestKuduOutputFormat.java b/kudu-handler/src/test/org/apache/hadoop/hive/kudu/TestKuduOutputFormat.java
new file mode 100644
index 0000000..8a1cf26
--- /dev/null
+++ b/kudu-handler/src/test/org/apache/hadoop/hive/kudu/TestKuduOutputFormat.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.kudu;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.kudu.KuduOutputFormat.KuduRecordWriter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.shaded.com.google.common.collect.ImmutableList;
+import org.apache.kudu.test.KuduTestHarness;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_MASTER_ADDRS_KEY;
+import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_TABLE_NAME_KEY;
+import static org.apache.hadoop.hive.kudu.KuduTestUtils.getAllTypesSchema;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the KuduOutputFormat implementation.
+ */
+public class TestKuduOutputFormat {
+
+  private static final String TABLE_NAME = "default.TestKuduOutputFormat";
+
+  private static final Schema SCHEMA = getAllTypesSchema();
+
+  private static final Configuration BASE_CONF = new Configuration();
+
+  private static final Properties TBL_PROPS = new Properties();
+
+  private static final long NOW_MS = System.currentTimeMillis();
+
+  @Rule
+  public KuduTestHarness harness = new KuduTestHarness();
+
+  @Before
+  public void setUp() throws Exception {
+    // Set the base configuration values.
+    BASE_CONF.set(KUDU_MASTER_ADDRS_KEY, harness.getMasterAddressesAsString());
+    TBL_PROPS.setProperty(KUDU_TABLE_NAME_KEY, TABLE_NAME);
+
+    // Create the test Kudu table.
+    CreateTableOptions options = new CreateTableOptions()
+        .setRangePartitionColumns(ImmutableList.of("key"));
+    harness.getClient().createTable(TABLE_NAME, SCHEMA, options);
+  }
+
+  @Test
+  public void testGoodRow() throws Exception {
+    KuduOutputFormat outputFormat = new KuduOutputFormat();
+
+    KuduRecordWriter writer =
+        (KuduRecordWriter) outputFormat.getHiveRecordWriter(new JobConf(BASE_CONF),
+            null, null, false, TBL_PROPS, null);
+
+    // Write a good row.
+    try {
+      PartialRow row = SCHEMA.newPartialRow();
+      row.addByte("key", (byte) 1);
+      row.addShort("int16", (short) 1);
+      row.addInt("int32", 1);
+      row.addLong("int64", 1L);
+      row.addBoolean("bool", true);
+      row.addFloat("float", 1.1f);
+      row.addDouble("double", 1.1d);
+      row.addString("string", "one");
+      row.addBinary("binary", "one".getBytes(UTF_8));
+      row.addTimestamp("timestamp", new Timestamp(NOW_MS));
+      row.addDecimal("decimal", new BigDecimal("1.111"));
+      row.setNull("null");
+      // Not setting the "default" column.
+      KuduWritable writable = new KuduWritable(row);
+      writer.write(writable);
+    } finally {
+      writer.close(false);
+    }
+
+    // Verify the written row.
+    KuduClient client = harness.getClient();
+    KuduTable table = client.openTable(TABLE_NAME);
+    KuduScanner scanner = client.newScannerBuilder(table).build();
+
+    List<RowResult> results = new ArrayList<>();
+    for (RowResult result : scanner) {
+      results.add(result);
+    }
+    assertEquals(1, results.size());
+    RowResult result = results.get(0);
+
+    assertEquals((byte) 1, result.getByte(0));
+    assertEquals((short) 1, result.getShort(1));
+    assertEquals(1, result.getInt(2));
+    assertEquals(1L, result.getLong(3));
+    assertTrue(result.getBoolean(4));
+    assertEquals(1.1f, result.getFloat(5), 0);
+    assertEquals(1.1d, result.getDouble(6), 0);
+    assertEquals("one", result.getString(7));
+    assertEquals("one", new String(result.getBinaryCopy(8), UTF_8));
+    assertEquals(NOW_MS, result.getTimestamp(9).getTime());
+    assertEquals(new BigDecimal("1.111"), result.getDecimal(10));
+    assertTrue(result.isNull(11));
+    assertEquals(1, result.getInt(12)); // default.
+  }
+
+  @Test
+  public void testBadRow() throws Exception {
+    KuduOutputFormat outputFormat = new KuduOutputFormat();
+
+    KuduRecordWriter writer =
+        (KuduRecordWriter) outputFormat.getHiveRecordWriter(new JobConf(BASE_CONF),
+            null, null, false, TBL_PROPS, null);
+
+    // Write an empty row.
+    try {
+      PartialRow row = SCHEMA.newPartialRow();
+      KuduWritable writable = new KuduWritable(row);
+      writer.write(writable);
+    } catch (KuduException ex) {
+      assertThat(ex.getMessage(), containsString("Primary key column key is not set"));
+    } finally {
+      writer.close(false);
+    }
+  }
+
+  @Test
+  public void testMissingTable() throws Exception {
+    KuduOutputFormat outputFormat = new KuduOutputFormat();
+
+    Properties tblProps = new Properties();
+
+    try {
+      outputFormat.getHiveRecordWriter(new JobConf(BASE_CONF),
+          null, null, false, tblProps, null);
+      fail("Should fail on missing table");
+    } catch (IllegalArgumentException ex) {
+      assertThat(ex.getMessage(), containsString("kudu.table_name is not set"));
+    }
+  }
+
+  @Test
+  public void testBadTable() throws Exception {
+    KuduOutputFormat outputFormat = new KuduOutputFormat();
+
+    Properties tblProps = new Properties();
+    tblProps.setProperty(KUDU_TABLE_NAME_KEY, "default.notatable");
+
+    try {
+      outputFormat.getHiveRecordWriter(new JobConf(BASE_CONF),
+          null, null, false, tblProps, null);
+      fail("Should fail on a bad table");
+    } catch (IllegalArgumentException ex) {
+      assertThat(ex.getMessage(),
+          containsString("Kudu table does not exist: default.notatable"));
+    }
+  }
+}
diff --git a/kudu-handler/src/test/org/apache/hadoop/hive/kudu/TestKuduPredicateHandler.java b/kudu-handler/src/test/org/apache/hadoop/hive/kudu/TestKuduPredicateHandler.java
new file mode 100644
index 0000000..ad09c5b
--- /dev/null
+++ b/kudu-handler/src/test/org/apache/hadoop/hive/kudu/TestKuduPredicateHandler.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.kudu;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFIn;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNot;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotNull;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduPredicate;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.shaded.com.google.common.collect.ImmutableList;
+import org.apache.kudu.test.KuduTestHarness;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.List;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hive.kudu.KuduHiveUtils.toHiveType;
+import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_MASTER_ADDRS_KEY;
+import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_TABLE_NAME_KEY;
+import static org.apache.hadoop.hive.kudu.KuduTestUtils.getAllTypesSchema;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the KuduPredicateHandler implementation.
+ */
+public class TestKuduPredicateHandler {
+
+  private static final String TABLE_NAME = "default.TestKuduPredicateHandler";
+
+  private static final Schema SCHEMA = getAllTypesSchema();
+
+  private static final Configuration BASE_CONF = new Configuration();
+
+  private static final long NOW_MS = System.currentTimeMillis();
+
+  private static final PartialRow ROW;
+  static {
+    ROW = SCHEMA.newPartialRow();
+    ROW.addByte("key", (byte) 1);
+    ROW.addShort("int16", (short) 1);
+    ROW.addInt("int32", 1);
+    ROW.addLong("int64", 1L);
+    ROW.addBoolean("bool", true);
+    ROW.addFloat("float", 1.1f);
+    ROW.addDouble("double", 1.1d);
+    ROW.addString("string", "one");
+    ROW.addBinary("binary", "one".getBytes(UTF_8));
+    ROW.addTimestamp("timestamp", new Timestamp(NOW_MS));
+    ROW.addDecimal("decimal", new BigDecimal("1.111"));
+    ROW.setNull("null");
+    // Not setting the "default" column.
+  }
+
+  private static final List<GenericUDF> COMPARISON_UDFS = Arrays.asList(
+      new GenericUDFOPEqual(),
+      new GenericUDFOPLessThan(),
+      new GenericUDFOPEqualOrLessThan(),
+      new GenericUDFOPGreaterThan(),
+      new GenericUDFOPEqualOrGreaterThan()
+  );
+
+  private static final List<GenericUDF> NULLABLE_UDFS = Arrays.asList(
+      new GenericUDFOPNull(),
+      new GenericUDFOPNotNull()
+  );
+
+  @Rule
+  public KuduTestHarness harness = new KuduTestHarness();
+
+  @Before
+  public void setUp() throws Exception {
+    // Set the base configuration values.
+    BASE_CONF.set(KUDU_MASTER_ADDRS_KEY, harness.getMasterAddressesAsString());
+    BASE_CONF.set(KUDU_TABLE_NAME_KEY, TABLE_NAME);
+    BASE_CONF.set(FileInputFormat.INPUT_DIR, "dummy");
+
+    // Create the test Kudu table.
+    CreateTableOptions options = new CreateTableOptions()
+        .setRangePartitionColumns(ImmutableList.of("key"));
+    harness.getClient().createTable(TABLE_NAME, SCHEMA, options);
+  }
+
+  @Test
+  public void testComparisonPredicates() throws Exception {
+    for (ColumnSchema col : SCHEMA.getColumns()) {
+      // Skip null and default columns because they don't have a value to use.
+      if (col.getName().equals("null") || col.getName().equals("default")) {
+        continue;
+      }
+      PrimitiveTypeInfo typeInfo = toHiveType(col.getType(), col.getTypeAttributes());
+      ExprNodeDesc colExpr =  new ExprNodeColumnDesc(typeInfo, col.getName(), null, false);
+      ExprNodeDesc constExpr = new ExprNodeConstantDesc(typeInfo, ROW.getObject(col.getName()));
+      List<ExprNodeDesc> children = Lists.newArrayList();
+      children.add(colExpr);
+      children.add(constExpr);
+      for (GenericUDF udf : COMPARISON_UDFS) {
+        ExprNodeGenericFuncDesc predicateExpr =
+            new ExprNodeGenericFuncDesc(typeInfo, udf, children);
+
+        // Verify KuduPredicateHandler.decompose
+        HiveStoragePredicateHandler.DecomposedPredicate decompose =
+            KuduPredicateHandler.decompose(predicateExpr, SCHEMA);
+
+        // Binary predicates are not supported. (HIVE-11370)
+        if (col.getName().equals("binary")) {
+          assertNull(decompose);
+        } else {
+          assertNotNull(String.format("Unsupported comparison UDF and type (%s, %s)", udf, typeInfo),
+              decompose);
+          assertNotNull(String.format("Unsupported comparison UDF and type (%s, %s)", udf, typeInfo),
+              decompose.pushedPredicate);
+          assertNull(String.format("Unsupported comparison UDF and type (%s, %s)", udf, typeInfo),
+              decompose.residualPredicate);
+
+          List<KuduPredicate> predicates = expressionToPredicates(predicateExpr);
+          assertEquals(1, predicates.size());
+          scanWithPredicates(predicates);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testNotComparisonPredicates() throws Exception {
+    for (ColumnSchema col : SCHEMA.getColumns()) {
+      // Skip null and default columns because they don't have a value to use.
+      // Skip binary columns because binary predicates are not supported. (HIVE-11370)
+      if (col.getName().equals("null") || col.getName().equals("default") ||
+          col.getName().equals("binary")) {
+        continue;
+      }
+      PrimitiveTypeInfo typeInfo = toHiveType(col.getType(), col.getTypeAttributes());
+      ExprNodeDesc colExpr =  new ExprNodeColumnDesc(typeInfo, col.getName(), null, false);
+      ExprNodeDesc constExpr = new ExprNodeConstantDesc(typeInfo, ROW.getObject(col.getName()));
+      List<ExprNodeDesc> children = Lists.newArrayList();
+      children.add(colExpr);
+      children.add(constExpr);
+
+      for (GenericUDF udf : COMPARISON_UDFS) {
+        ExprNodeGenericFuncDesc childExpr =
+            new ExprNodeGenericFuncDesc(typeInfo, udf, children);
+
+        List<ExprNodeDesc> notChildren = Lists.newArrayList();
+        notChildren.add(childExpr);
+        ExprNodeGenericFuncDesc predicateExpr = new ExprNodeGenericFuncDesc(typeInfo,
+            new GenericUDFOPNot(), notChildren);
+
+        // Verify KuduPredicateHandler.decompose
+        HiveStoragePredicateHandler.DecomposedPredicate decompose =
+            KuduPredicateHandler.decompose(predicateExpr, SCHEMA);
+        // See note in KuduPredicateHandler.newAnalyzer.
+        assertNull(decompose);
+
+        List<KuduPredicate> predicates = expressionToPredicates(predicateExpr);
+        if (udf instanceof GenericUDFOPEqual) {
+          // Kudu doesn't support !=.
+          assertTrue(predicates.isEmpty());
+        } else {
+          assertEquals(1, predicates.size());
+          scanWithPredicates(predicates);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testInPredicates() throws Exception {
+    PrimitiveTypeInfo typeInfo = toHiveType(Type.STRING, null);
+    ExprNodeDesc colExpr =  new ExprNodeColumnDesc(typeInfo, "string", null, false);
+    ExprNodeConstantDesc constDesc = new ExprNodeConstantDesc("Alpha");
+    ExprNodeConstantDesc constDesc2 = new ExprNodeConstantDesc("Bravo");
+    List<ExprNodeDesc> children = Lists.newArrayList();
+    children.add(colExpr);
+    children.add(constDesc);
+    children.add(constDesc2);
+
+    ExprNodeGenericFuncDesc predicateExpr =
+        new ExprNodeGenericFuncDesc(typeInfo, new GenericUDFIn(), children);
+
+    // Verify KuduPredicateHandler.decompose
+    HiveStoragePredicateHandler.DecomposedPredicate decompose =
+        KuduPredicateHandler.decompose(predicateExpr, SCHEMA);
+    // See note in KuduPredicateHandler.newAnalyzer.
+    assertNull(decompose);
+
+    List<KuduPredicate> predicates = expressionToPredicates(predicateExpr);
+    assertEquals(1, predicates.size());
+    scanWithPredicates(predicates);
+
+    // Also test NOT IN.
+    List<ExprNodeDesc> notChildren = Lists.newArrayList();
+    notChildren.add(predicateExpr);
+    ExprNodeGenericFuncDesc notPredicateExpr = new ExprNodeGenericFuncDesc(typeInfo,
+        new GenericUDFOPNot(), notChildren);
+
+    // Verify KuduPredicateHandler.decompose
+    HiveStoragePredicateHandler.DecomposedPredicate decomposeNot =
+        KuduPredicateHandler.decompose(notPredicateExpr, SCHEMA);
+    // See note in KuduPredicateHandler.newAnalyzer.
+    assertNull(decomposeNot);
+
+    List<KuduPredicate> notPredicates = expressionToPredicates(notPredicateExpr);
+    assertEquals(0, notPredicates.size());
+  }
+
+  @Test
+  public void testNullablePredicates() throws Exception {
+    PrimitiveTypeInfo typeInfo = toHiveType(Type.STRING, null);
+    ExprNodeDesc colExpr =  new ExprNodeColumnDesc(typeInfo, "null", null, false);
+    List<ExprNodeDesc> children = Lists.newArrayList();
+    children.add(colExpr);
+
+    for (GenericUDF udf : NULLABLE_UDFS) {
+      ExprNodeGenericFuncDesc predicateExpr =
+          new ExprNodeGenericFuncDesc(typeInfo, udf, children);
+
+      // Verify KuduPredicateHandler.decompose
+      HiveStoragePredicateHandler.DecomposedPredicate decompose =
+          KuduPredicateHandler.decompose(predicateExpr, SCHEMA);
+      // See note in KuduPredicateHandler.newAnalyzer.
+      assertNull(decompose);
+
+      List<KuduPredicate> predicates = expressionToPredicates(predicateExpr);
+      assertEquals(1, predicates.size());
+      scanWithPredicates(predicates);
+    }
+  }
+
+  @Test
+  public void testAndPredicates() throws Exception {
+    for (ColumnSchema col : SCHEMA.getColumns()) {
+      // Skip null and default columns because they don't have a value to use.
+      if (col.getName().equals("null") || col.getName().equals("default")) {
+        continue;
+      }
+      PrimitiveTypeInfo typeInfo = toHiveType(col.getType(), col.getTypeAttributes());
+      ExprNodeDesc colExpr =  new ExprNodeColumnDesc(typeInfo, col.getName(), null, false);
+      ExprNodeDesc constExpr = new ExprNodeConstantDesc(typeInfo, ROW.getObject(col.getName()));
+      List<ExprNodeDesc> children = Lists.newArrayList();
+      children.add(colExpr);
+      children.add(constExpr);
+
+      ExprNodeGenericFuncDesc gePredicateExpr =
+          new ExprNodeGenericFuncDesc(typeInfo, new GenericUDFOPEqualOrGreaterThan(), children);
+      ExprNodeGenericFuncDesc lePredicateExpr =
+          new ExprNodeGenericFuncDesc(typeInfo, new GenericUDFOPEqualOrLessThan(), children);
+
+      List<ExprNodeDesc> andChildren = Lists.newArrayList();
+      andChildren.add(gePredicateExpr);
+      andChildren.add(lePredicateExpr);
+      ExprNodeGenericFuncDesc andPredicateExpr = new ExprNodeGenericFuncDesc(typeInfo,
+          new GenericUDFOPAnd(), andChildren);
+
+      // Verify KuduPredicateHandler.decompose
+      HiveStoragePredicateHandler.DecomposedPredicate decompose =
+          KuduPredicateHandler.decompose(andPredicateExpr, SCHEMA);
+
+      // Binary predicates are not supported. (HIVE-11370)
+      if (col.getName().equals("binary")) {
+        assertNull(decompose);
+      } else {
+        assertNotNull(decompose);
+        assertNotNull(decompose.pushedPredicate);
+        assertNull(decompose.residualPredicate);
+
+        List<KuduPredicate> predicates = expressionToPredicates(decompose.pushedPredicate);
+        assertEquals(2, predicates.size());
+        scanWithPredicates(predicates);
+
+        // Also test NOT AND.
+        List<ExprNodeDesc> notChildren = Lists.newArrayList();
+        notChildren.add(andPredicateExpr);
+        ExprNodeGenericFuncDesc notPredicateExpr = new ExprNodeGenericFuncDesc(typeInfo,
+            new GenericUDFOPNot(), notChildren);
+
+        // Verify KuduPredicateHandler.decompose
+        HiveStoragePredicateHandler.DecomposedPredicate decomposeNot =
+            KuduPredicateHandler.decompose(notPredicateExpr, SCHEMA);
+        // See note in KuduPredicateHandler.newAnalyzer.
+        assertNull(decomposeNot);
+
+        List<KuduPredicate> notPredicates = expressionToPredicates(notPredicateExpr);
+        assertEquals(0, notPredicates.size());
+      }
+    }
+  }
+
+  @Test
+  public void testOrPredicates() throws Exception {
+    for (ColumnSchema col : SCHEMA.getColumns()) {
+      // Skip null and default columns because they don't have a value to use.
+      // Skip binary columns because binary predicates are not supported. (HIVE-11370)
+      if (col.getName().equals("null") || col.getName().equals("default") ||
+          col.getName().equals("binary")) {
+        continue;
+      }
+      PrimitiveTypeInfo typeInfo = toHiveType(col.getType(), col.getTypeAttributes());
+      ExprNodeDesc colExpr =  new ExprNodeColumnDesc(typeInfo, col.getName(), null, false);
+      ExprNodeDesc constExpr = new ExprNodeConstantDesc(typeInfo, ROW.getObject(col.getName()));
+      List<ExprNodeDesc> children = Lists.newArrayList();
+      children.add(colExpr);
+      children.add(constExpr);
+
+      ExprNodeGenericFuncDesc gePredicateExpr =
+          new ExprNodeGenericFuncDesc(typeInfo, new GenericUDFOPEqualOrGreaterThan(), children);
+      ExprNodeGenericFuncDesc lePredicateExpr =
+          new ExprNodeGenericFuncDesc(typeInfo, new GenericUDFOPEqualOrLessThan(), children);
+
+      List<ExprNodeDesc> orChildren = Lists.newArrayList();
+      orChildren.add(gePredicateExpr);
+      orChildren.add(lePredicateExpr);
+      ExprNodeGenericFuncDesc predicateExpr = new ExprNodeGenericFuncDesc(typeInfo,
+          new GenericUDFOPOr(), orChildren);
+
+      // Verify KuduPredicateHandler.decompose
+      HiveStoragePredicateHandler.DecomposedPredicate decompose =
+          KuduPredicateHandler.decompose(predicateExpr, SCHEMA);
+      // OR predicates are currently not supported.
+      assertNull(decompose);
+      List<KuduPredicate> predicates = expressionToPredicates(predicateExpr);
+      assertEquals(0, predicates.size());
+
+      // Also test NOT OR.
+      List<ExprNodeDesc> notChildren = Lists.newArrayList();
+      notChildren.add(predicateExpr);
+      ExprNodeGenericFuncDesc notPredicateExpr = new ExprNodeGenericFuncDesc(typeInfo,
+          new GenericUDFOPNot(), notChildren);
+
+      // Verify KuduPredicateHandler.decompose
+      HiveStoragePredicateHandler.DecomposedPredicate decomposeNot =
+          KuduPredicateHandler.decompose(notPredicateExpr, SCHEMA);
+      // See note in KuduPredicateHandler.newAnalyzer.
+      assertNull(decomposeNot);
+
+      List<KuduPredicate> notPredicates = expressionToPredicates(notPredicateExpr);
+      assertEquals(2, notPredicates.size());
+    }
+  }
+
+  @Test
+  public void testMixedPredicates() throws Exception {
+    for (ColumnSchema col : SCHEMA.getColumns()) {
+      // Skip null and default columns because they don't have a value to use.
+      // Skip binary columns because binary predicates are not supported. (HIVE-11370)
+      if (col.getName().equals("null") || col.getName().equals("default") ||
+          col.getName().equals("binary")) {
+        continue;
+      }
+      PrimitiveTypeInfo typeInfo = toHiveType(col.getType(), col.getTypeAttributes());
+      ExprNodeDesc colExpr =  new ExprNodeColumnDesc(typeInfo, col.getName(), null, false);
+      ExprNodeDesc constExpr = new ExprNodeConstantDesc(typeInfo, ROW.getObject(col.getName()));
+      List<ExprNodeDesc> children = Lists.newArrayList();
+      children.add(colExpr);
+      children.add(constExpr);
+
+      ExprNodeGenericFuncDesc supportedPredicateExpr =
+          new ExprNodeGenericFuncDesc(typeInfo, new GenericUDFOPEqualOrGreaterThan(), children);
+      ExprNodeGenericFuncDesc unsupportedPredicateExpr =
+          new ExprNodeGenericFuncDesc(typeInfo, new GenericUDFOPUnsupported(), children);
+
+      List<ExprNodeDesc> andChildren = Lists.newArrayList();
+      andChildren.add(supportedPredicateExpr);
+      andChildren.add(unsupportedPredicateExpr);
+      ExprNodeGenericFuncDesc andPredicateExpr = new ExprNodeGenericFuncDesc(typeInfo,
+          new GenericUDFOPAnd(), andChildren);
+
+      // Verify KuduPredicateHandler.decompose
+      HiveStoragePredicateHandler.DecomposedPredicate decompose =
+          KuduPredicateHandler.decompose(andPredicateExpr, SCHEMA);
+      assertNotNull(decompose);
+      assertNotNull(decompose.pushedPredicate);
+      assertNotNull(decompose.residualPredicate);
+
+      List<KuduPredicate> predicates = expressionToPredicates(decompose.pushedPredicate);
+      assertEquals(1, predicates.size());
+      scanWithPredicates(predicates);
+    }
+  }
+
+  private List<KuduPredicate> expressionToPredicates(ExprNodeGenericFuncDesc predicateExpr) {
+    String filterExpr = SerializationUtilities.serializeExpression(predicateExpr);
+    Configuration conf = new Configuration();
+    conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr);
+    return KuduPredicateHandler.getPredicates(conf, SCHEMA);
+  }
+
+  private void scanWithPredicates(List<KuduPredicate> predicates)
+      throws KuduException {
+    // Scan the table with the predicate to be sure there are no exceptions.
+    KuduClient client = harness.getClient();
+    KuduTable table = client.openTable(TABLE_NAME);
+    KuduScanner.KuduScannerBuilder builder = client.newScannerBuilder(table);
+    for (KuduPredicate predicate : predicates) {
+      builder.addPredicate(predicate);
+    }
+    KuduScanner scanner = builder.build();
+    while (scanner.hasMoreRows()) {
+      scanner.nextRows();
+    }
+  }
+
+  // Wrapper implementation to simplify testing unsupported UDFs.
+  private class GenericUDFOPUnsupported extends GenericUDFBaseCompare {
+    GenericUDFOPUnsupported() {
+      this.opName = "UNSUPPORTED";
+      this.opDisplayName = "UNSUPPORTED";
+    }
+
+    @Override
+    public Object evaluate(DeferredObject[] arguments) {
+      return null;
+    }
+
+    @Override
+    public GenericUDF flip() {
+      return new GenericUDFOPUnsupported();
+    }
+
+    @Override
+    public GenericUDF negative() {
+      return new GenericUDFOPUnsupported();
+    }
+  }
+}
diff --git a/kudu-handler/src/test/org/apache/hadoop/hive/kudu/TestKuduSerDe.java b/kudu-handler/src/test/org/apache/hadoop/hive/kudu/TestKuduSerDe.java
new file mode 100644
index 0000000..1713a50
--- /dev/null
+++ b/kudu-handler/src/test/org/apache/hadoop/hive/kudu/TestKuduSerDe.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.kudu;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.shaded.com.google.common.collect.ImmutableList;
+import org.apache.kudu.test.KuduTestHarness;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_MASTER_ADDRS_KEY;
+import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_TABLE_NAME_KEY;
+import static org.apache.hadoop.hive.kudu.KuduTestUtils.getAllTypesSchema;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the KuduSerDe implementation.
+ */
+public class TestKuduSerDe {
+
+  private static final String TABLE_NAME = "default.TestKuduSerDe";
+
+  private static final Schema SCHEMA = getAllTypesSchema();
+
+  private static final Configuration BASE_CONF = new Configuration();
+
+  private static final Properties TBL_PROPS = new Properties();
+
+  private static final long NOW_MS = System.currentTimeMillis();
+
+  @Rule
+  public KuduTestHarness harness = new KuduTestHarness();
+
+  @Before
+  public void setUp() throws Exception {
+    // Set the base configuration values.
+    BASE_CONF.set(KUDU_MASTER_ADDRS_KEY, harness.getMasterAddressesAsString());
+    TBL_PROPS.setProperty(KUDU_TABLE_NAME_KEY, TABLE_NAME);
+
+    // Create the test Kudu table.
+    CreateTableOptions options = new CreateTableOptions()
+        .setRangePartitionColumns(ImmutableList.of("key"));
+    harness.getClient().createTable(TABLE_NAME, SCHEMA, options);
+  }
+
+  @Test
+  public void testSerDeRoundTrip() throws Exception {
+    KuduSerDe serDe = new KuduSerDe();
+    serDe.initialize(BASE_CONF, TBL_PROPS);
+
+    PartialRow before = SCHEMA.newPartialRow();
+    before.addByte("key", (byte) 1);
+    before.addShort("int16", (short) 1);
+    before.addInt("int32", 1);
+    before.addLong("int64", 1L);
+    before.addBoolean("bool", true);
+    before.addFloat("float", 1.1f);
+    before.addDouble("double", 1.1d);
+    before.addString("string", "one");
+    before.addBinary("binary", "one".getBytes(UTF_8));
+    before.addTimestamp("timestamp", new Timestamp(NOW_MS));
+    before.addDecimal("decimal", new BigDecimal("1.111"));
+    before.setNull("null");
+    before.addInt("default", 1);
+    KuduWritable beforeWritable = new KuduWritable(before);
+    Object object = serDe.deserialize(beforeWritable);
+
+
+    // Capitalized `key` field to check for field case insensitivity.
+    List<String> fieldNames = Arrays.asList("KEY", "int16", "int32", "int64", "bool", "float",
+        "double", "string", "binary", "timestamp", "decimal", "null", "default");
+    List<ObjectInspector> ois = Arrays.asList(
+        PrimitiveObjectInspectorFactory.writableByteObjectInspector,
+        PrimitiveObjectInspectorFactory.writableShortObjectInspector,
+        PrimitiveObjectInspectorFactory.writableIntObjectInspector,
+        PrimitiveObjectInspectorFactory.writableLongObjectInspector,
+        PrimitiveObjectInspectorFactory.writableBooleanObjectInspector,
+        PrimitiveObjectInspectorFactory.writableFloatObjectInspector,
+        PrimitiveObjectInspectorFactory.writableDoubleObjectInspector,
+        PrimitiveObjectInspectorFactory.writableStringObjectInspector,
+        PrimitiveObjectInspectorFactory.writableBinaryObjectInspector,
+        PrimitiveObjectInspectorFactory.writableTimestampObjectInspector,
+        PrimitiveObjectInspectorFactory.writableHiveDecimalObjectInspector,
+        PrimitiveObjectInspectorFactory.writableStringObjectInspector,
+        PrimitiveObjectInspectorFactory.writableIntObjectInspector
+        // the "default" column is not set.
+    );
+    StandardStructObjectInspector objectInspector =
+        ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, ois);
+    KuduWritable afterWritable = serDe.serialize(object, objectInspector);
+    PartialRow after = afterWritable.getPartialRow();
+
+    for (int i = 0; i < SCHEMA.getColumnCount(); i++) {
+
+      if (SCHEMA.getColumnByIndex(i).getType() == Type.BINARY) {
+        assertArrayEquals("Columns not equal at index: " + i,
+            before.getBinaryCopy(i), after.getBinaryCopy(i));
+      } else {
+        assertEquals("Columns not equal at index: " + i,
+            before.getObject(i), after.getObject(i));
+      }
+    }
+  }
+
+  @Test
+  public void testMissingMasters() throws Exception {
+    KuduSerDe serDe = new KuduSerDe();
+
+    Configuration conf = new Configuration(BASE_CONF);
+    conf.set(HiveConf.ConfVars.HIVE_KUDU_MASTER_ADDRESSES_DEFAULT.varname, "");
+    conf.unset(KUDU_MASTER_ADDRS_KEY);
+
+    try {
+      serDe.initialize(conf, TBL_PROPS);
+      fail("Should fail on missing table");
+    } catch (SerDeException ex) {
+      assertThat(ex.getMessage(),
+          containsString("Kudu master addresses are not specified with hive.kudu.master.addresses.default"));
+    }
+  }
+
+  @Test
+  public void testMissingTable() throws Exception {
+    KuduSerDe serDe = new KuduSerDe();
+
+    Properties tblProps = new Properties();
+
+    try {
+      serDe.initialize(BASE_CONF, tblProps);
+      fail("Should fail on missing table");
+    } catch (SerDeException ex) {
+      assertThat(ex.getMessage(), containsString("kudu.table_name is not set"));
+    }
+  }
+
+  @Test
+  public void testBadTable() throws Exception {
+    KuduSerDe serDe = new KuduSerDe();
+
+    Properties tblProps = new Properties();
+    tblProps.setProperty(KUDU_TABLE_NAME_KEY, "default.notatable");
+
+    try {
+      serDe.initialize(BASE_CONF, tblProps);
+      fail("Should fail on a bad table");
+    } catch (SerDeException ex) {
+      assertThat(ex.getMessage(),
+          containsString("Kudu table does not exist: default.notatable"));
+    }
+  }
+}
diff --git a/kudu-handler/src/test/org/apache/hadoop/hive/kudu/package-info.java b/kudu-handler/src/test/org/apache/hadoop/hive/kudu/package-info.java
new file mode 100644
index 0000000..1c933da
--- /dev/null
+++ b/kudu-handler/src/test/org/apache/hadoop/hive/kudu/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Serde, InputFormat, and OutputFormat support for connecting Hive to Kudu tables.
+ */
+package org.apache.hadoop.hive.kudu;
diff --git a/kudu-handler/src/test/queries/negative/kudu_config.q b/kudu-handler/src/test/queries/negative/kudu_config.q
new file mode 100644
index 0000000..fc91c07
--- /dev/null
+++ b/kudu-handler/src/test/queries/negative/kudu_config.q
@@ -0,0 +1,6 @@
+-- unset the default master addresses config to validate handling.
+set hive.kudu.master.addresses.default=;
+
+CREATE EXTERNAL TABLE kv_table(key int, value string)
+STORED BY 'org.apache.hadoop.hive.kudu.KuduStorageHandler'
+TBLPROPERTIES ("kudu.table_name" = "default.kudu_kv");
diff --git a/kudu-handler/src/test/queries/positive/kudu_complex_queries.q b/kudu-handler/src/test/queries/positive/kudu_complex_queries.q
new file mode 100644
index 0000000..a2d8071
--- /dev/null
+++ b/kudu-handler/src/test/queries/positive/kudu_complex_queries.q
@@ -0,0 +1,45 @@
+--! qt:dataset:src
+
+DROP TABLE IF EXISTS kv_table;
+CREATE EXTERNAL TABLE kv_table(key int, value string)
+STORED BY 'org.apache.hadoop.hive.kudu.KuduStorageHandler'
+TBLPROPERTIES ("kudu.table_name" = "default.kudu_kv");
+
+INSERT INTO TABLE kv_table
+SELECT key, value FROM src;
+
+ANALYZE TABLE kv_table COMPUTE STATISTICS;
+
+ANALYZE TABLE kv_table COMPUTE STATISTICS FOR COLUMNS key, value;
+
+EXPLAIN
+SELECT y.*
+FROM
+(SELECT kv_table.* FROM kv_table) x
+JOIN
+(SELECT src.* FROM src) y
+ON (x.key = y.key)
+ORDER BY key, value LIMIT 10;
+
+SELECT y.*
+FROM
+(SELECT kv_table.* FROM kv_table) x
+JOIN
+(SELECT src.* FROM src) y
+ON (x.key = y.key)
+ORDER BY key, value LIMIT 10;
+
+EXPLAIN
+SELECT max(k.key) as max, min(k.value) as min
+FROM kv_table k JOIN src s ON (k.key = s.key)
+WHERE k.key > 100 and k.key % 2 = 0
+GROUP BY k.key
+ORDER BY min
+LIMIT 10;
+
+SELECT max(k.key) as max, min(k.value) as min
+FROM kv_table k JOIN src s ON (k.key = s.key)
+WHERE k.key > 100 and k.key % 2 = 0
+GROUP BY k.key
+ORDER BY min
+LIMIT 10;
\ No newline at end of file
diff --git a/kudu-handler/src/test/queries/positive/kudu_queries.q b/kudu-handler/src/test/queries/positive/kudu_queries.q
new file mode 100644
index 0000000..cd4acad
--- /dev/null
+++ b/kudu-handler/src/test/queries/positive/kudu_queries.q
@@ -0,0 +1,177 @@
+-- Create table specifying columns.
+-- Note: Kudu is the source of truth for schema.
+DROP TABLE IF EXISTS kv_table;
+CREATE EXTERNAL TABLE kv_table(key int, value string)
+STORED BY 'org.apache.hadoop.hive.kudu.KuduStorageHandler'
+TBLPROPERTIES ("kudu.table_name" = "default.kudu_kv");
+
+DESCRIBE EXTENDED kv_table;
+
+-- Verify INSERT support.
+INSERT INTO TABLE kv_table VALUES
+(1, "1"), (2, "2");
+
+SELECT * FROM kv_table;
+SELECT count(*) FROM kv_table;
+SELECT count(*) FROM kv_table LIMIT 1;
+SELECT count(1) FROM kv_table;
+
+-- Verify projection and case insensitivity.
+SELECT kEy FROM kv_table;
+
+DROP TABLE kv_table;
+
+-- Create table without specifying columns.
+-- Note: Kudu is the source of truth for schema.
+DROP TABLE IF EXISTS all_types_table;
+CREATE EXTERNAL TABLE all_types_table
+STORED BY 'org.apache.hadoop.hive.kudu.KuduStorageHandler'
+TBLPROPERTIES ("kudu.table_name" = "default.kudu_all_types");
+
+DESCRIBE EXTENDED all_types_table;
+
+INSERT INTO TABLE all_types_table VALUES
+(1, 1, 1, 1, true, 1.1, 1.1, "one", 'one', '2001-01-01 01:11:11', 1.111, null, 1),
+(2, 2, 2, 2, false, 2.2, 2.2, "two", 'two', '2001-01-01 01:12:12', 2.222, "not null", 2);
+
+SELECT * FROM all_types_table;
+SELECT count(*) FROM all_types_table;
+
+-- Verify comparison predicates on byte.
+EXPLAIN SELECT key FROM all_types_table WHERE key = 1;
+SELECT key FROM all_types_table WHERE key = 1;
+SELECT key FROM all_types_table WHERE key != 1;
+SELECT key FROM all_types_table WHERE key > 1;
+SELECT key FROM all_types_table WHERE key >= 1;
+SELECT key FROM all_types_table WHERE key < 2;
+SELECT key FROM all_types_table WHERE key <= 2;
+
+-- Verify comparison predicates on short.
+EXPLAIN SELECT key FROM all_types_table WHERE int16 = 1;
+SELECT key FROM all_types_table WHERE int16 = 1;
+SELECT key FROM all_types_table WHERE int16 != 1;
+SELECT key FROM all_types_table WHERE int16 > 1;
+SELECT key FROM all_types_table WHERE int16 >= 1;
+SELECT key FROM all_types_table WHERE int16 < 2;
+SELECT key FROM all_types_table WHERE int16 <= 2;
+
+-- Verify comparison predicates on int.
+EXPLAIN SELECT key FROM all_types_table WHERE int32 = 1;
+SELECT key FROM all_types_table WHERE int32 = 1;
+SELECT key FROM all_types_table WHERE int32 != 1;
+SELECT key FROM all_types_table WHERE int32 > 1;
+SELECT key FROM all_types_table WHERE int32 >= 1;
+SELECT key FROM all_types_table WHERE int32 < 2;
+SELECT key FROM all_types_table WHERE int32 <= 2;
+
+-- Verify comparison predicates on long.
+EXPLAIN SELECT key FROM all_types_table WHERE int64 = 1;
+SELECT key FROM all_types_table WHERE int64 = 1;
+SELECT key FROM all_types_table WHERE int64 != 1;
+SELECT key FROM all_types_table WHERE int64 > 1;
+SELECT key FROM all_types_table WHERE int64 >= 1;
+SELECT key FROM all_types_table WHERE int64 < 2;
+SELECT key FROM all_types_table WHERE int64 <= 2;
+
+-- Verify comparison predicates on boolean.
+EXPLAIN SELECT key FROM all_types_table WHERE bool = true;
+SELECT key FROM all_types_table WHERE bool = true;
+SELECT key FROM all_types_table WHERE bool != true;
+SELECT key FROM all_types_table WHERE bool > true;
+SELECT key FROM all_types_table WHERE bool >= true;
+SELECT key FROM all_types_table WHERE bool < false;
+SELECT key FROM all_types_table WHERE bool <= false;
+
+-- Verify comparison predicates on string.
+-- Note: string is escaped because it's a reserved word.
+EXPLAIN SELECT key FROM all_types_table WHERE `string` = "one";
+SELECT key FROM all_types_table WHERE `string` = "one";
+SELECT key FROM all_types_table WHERE `string` != "one";
+SELECT key FROM all_types_table WHERE `string` > "one";
+SELECT key FROM all_types_table WHERE `string` >= "one";
+SELECT key FROM all_types_table WHERE `string` < "two";
+SELECT key FROM all_types_table WHERE `string` <= "two";
+
+-- Verify comparison predicates on binary.
+-- Note: binary is escaped because it's a reserved word.
+EXPLAIN SELECT key FROM all_types_table WHERE `binary` = cast ('one' as binary);
+SELECT key FROM all_types_table WHERE `binary` = cast ('one' as binary);
+SELECT key FROM all_types_table WHERE `binary` != cast ('one' as binary);
+SELECT key FROM all_types_table WHERE `binary` > cast ('one' as binary);
+SELECT key FROM all_types_table WHERE `binary` >= cast ('one' as binary);
+SELECT key FROM all_types_table WHERE `binary` < cast ('two' as binary);
+SELECT key FROM all_types_table WHERE `binary` <= cast ('two' as binary);
+
+-- Verify comparison predicates on timestamp.
+-- Note: timestamp is escaped because it's a reserved word.
+EXPLAIN SELECT key FROM all_types_table WHERE `timestamp` = '2001-01-01 01:11:11';
+SELECT key FROM all_types_table WHERE `timestamp` = '2001-01-01 01:11:11';
+SELECT key FROM all_types_table WHERE `timestamp` != '2001-01-01 01:11:11';
+SELECT key FROM all_types_table WHERE `timestamp` > '2001-01-01 01:11:11';
+SELECT key FROM all_types_table WHERE `timestamp` >= '2001-01-01 01:11:11';
+SELECT key FROM all_types_table WHERE `timestamp` < '2001-01-01 01:12:12';
+SELECT key FROM all_types_table WHERE `timestamp` <= '2001-01-01 01:12:12';
+
+-- Verify comparison predicates on float.
+-- Note: float is escaped because it's a reserved word.
+EXPLAIN SELECT key FROM all_types_table WHERE `float` = 1.1;
+SELECT key FROM all_types_table WHERE `float` = 1.1;
+SELECT key FROM all_types_table WHERE `float` != 1.1;
+SELECT key FROM all_types_table WHERE `float` > 1.1;
+SELECT key FROM all_types_table WHERE `float` >= 1.1;
+SELECT key FROM all_types_table WHERE `float` < 2.2;
+SELECT key FROM all_types_table WHERE `float` <= 2.2;
+
+-- Verify comparison predicates on double.
+-- Note: double is escaped because it's a reserved word.
+EXPLAIN SELECT key FROM all_types_table WHERE `double` = 1.1;
+SELECT key FROM all_types_table WHERE `double` = 1.1;
+SELECT key FROM all_types_table WHERE `double` != 1.1;
+SELECT key FROM all_types_table WHERE `double` > 1.1;
+SELECT key FROM all_types_table WHERE `double` >= 1.1;
+SELECT key FROM all_types_table WHERE `double` < 2.2;
+SELECT key FROM all_types_table WHERE `double` <= 2.2;
+
+-- Verify comparison predicates on decimal.
+-- Note: decimal is escaped because it's a reserved word.
+EXPLAIN SELECT key FROM all_types_table WHERE `decimal` = 1.111;
+SELECT key FROM all_types_table WHERE `decimal` = 1.111;
+SELECT key FROM all_types_table WHERE `decimal` != 1.111;
+SELECT key FROM all_types_table WHERE `decimal` > 1.111;
+SELECT key FROM all_types_table WHERE `decimal` >= 1.111;
+SELECT key FROM all_types_table WHERE `decimal` < 2.222;
+SELECT key FROM all_types_table WHERE `decimal` <= 2.222;
+
+-- Verify null predicates.
+-- Note: null is escaped because it's a reserved word.
+EXPLAIN SELECT key FROM all_types_table WHERE `null` IS NOT NULL;
+SELECT key FROM all_types_table WHERE `null` IS NOT NULL;
+SELECT key FROM all_types_table WHERE `null` IS NULL;
+SELECT key FROM all_types_table WHERE `null` = NULL;
+SELECT key FROM all_types_table WHERE `null` <=> NULL;
+
+-- Verify AND predicates.
+EXPLAIN SELECT key FROM all_types_table WHERE `null` IS NULL AND key < 2;
+SELECT key FROM all_types_table WHERE `null` IS NULL AND key < 2;
+
+-- Verify OR predicates.
+EXPLAIN SELECT key FROM all_types_table WHERE key = 1 OR `string` = "one";
+SELECT key FROM all_types_table WHERE key = 1 OR `string` = "one";
+SELECT key FROM all_types_table WHERE key = 1 OR `string` = "two";
+
+-- Verify various other filters.
+SELECT key FROM all_types_table WHERE string IN ("one", "missing");
+SELECT key FROM all_types_table WHERE string NOT IN ("one", "missing");
+SELECT key FROM all_types_table WHERE key BETWEEN -1 and 1;
+SELECT key FROM all_types_table WHERE `string` LIKE "%n%";
+
+-- Verify statistics
+ANALYZE TABLE all_types_table COMPUTE STATISTICS FOR COLUMNS;
+EXPLAIN SELECT * FROM all_types_table;
+
+-- Verify show and describe capabilities.
+SHOW CREATE TABLE all_types_table;
+DESCRIBE all_types_table;
+DESCRIBE FORMATTED all_types_table;
+
+DROP TABLE all_types_table;
\ No newline at end of file
diff --git a/kudu-handler/src/test/results/negative/kudu_config.q.out b/kudu-handler/src/test/results/negative/kudu_config.q.out
new file mode 100644
index 0000000..066aad4
--- /dev/null
+++ b/kudu-handler/src/test/results/negative/kudu_config.q.out
@@ -0,0 +1,7 @@
+PREHOOK: query: CREATE EXTERNAL TABLE kv_table(key int, value string)
+STORED BY 'org.apache.hadoop.hive.kudu.KuduStorageHandler'
+TBLPROPERTIES ("kudu.table_name" = "default.kudu_kv")
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@kv_table
+FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.ddl.DDLTask. java.lang.RuntimeException: MetaException(message:org.apache.hadoop.hive.serde2.SerDeException java.io.IOException: Kudu master addresses are not specified with hive.kudu.master.addresses.default)
diff --git a/kudu-handler/src/test/results/positive/kudu_complex_queries.q.out b/kudu-handler/src/test/results/positive/kudu_complex_queries.q.out
new file mode 100644
index 0000000..db208b3
--- /dev/null
+++ b/kudu-handler/src/test/results/positive/kudu_complex_queries.q.out
@@ -0,0 +1,366 @@
+PREHOOK: query: DROP TABLE IF EXISTS kv_table
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE IF EXISTS kv_table
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE EXTERNAL TABLE kv_table(key int, value string)
+STORED BY 'org.apache.hadoop.hive.kudu.KuduStorageHandler'
+TBLPROPERTIES ("kudu.table_name" = "default.kudu_kv")
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@kv_table
+POSTHOOK: query: CREATE EXTERNAL TABLE kv_table(key int, value string)
+STORED BY 'org.apache.hadoop.hive.kudu.KuduStorageHandler'
+TBLPROPERTIES ("kudu.table_name" = "default.kudu_kv")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@kv_table
+PREHOOK: query: INSERT INTO TABLE kv_table
+SELECT key, value FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@kv_table
+POSTHOOK: query: INSERT INTO TABLE kv_table
+SELECT key, value FROM src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@kv_table
+PREHOOK: query: ANALYZE TABLE kv_table COMPUTE STATISTICS
+PREHOOK: type: QUERY
+PREHOOK: Input: default@kv_table
+PREHOOK: Output: default@kv_table
+POSTHOOK: query: ANALYZE TABLE kv_table COMPUTE STATISTICS
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@kv_table
+POSTHOOK: Output: default@kv_table
+PREHOOK: query: ANALYZE TABLE kv_table COMPUTE STATISTICS FOR COLUMNS key, value
+PREHOOK: type: ANALYZE_TABLE
+PREHOOK: Input: default@kv_table
+PREHOOK: Output: default@kv_table
+#### A masked pattern was here ####
+POSTHOOK: query: ANALYZE TABLE kv_table COMPUTE STATISTICS FOR COLUMNS key, value
+POSTHOOK: type: ANALYZE_TABLE
+POSTHOOK: Input: default@kv_table
+POSTHOOK: Output: default@kv_table
+#### A masked pattern was here ####
+PREHOOK: query: EXPLAIN
+SELECT y.*
+FROM
+(SELECT kv_table.* FROM kv_table) x
+JOIN
+(SELECT src.* FROM src) y
+ON (x.key = y.key)
+ORDER BY key, value LIMIT 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@kv_table
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: EXPLAIN
+SELECT y.*
+FROM
+(SELECT kv_table.* FROM kv_table) x
+JOIN
+(SELECT src.* FROM src) y
+ON (x.key = y.key)
+ORDER BY key, value LIMIT 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@kv_table
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: kv_table
+                  filterExpr: UDFToDouble(key) is not null (type: boolean)
+                  Statistics: Num rows: 309 Data size: 1236 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: UDFToDouble(key) is not null (type: boolean)
+                    Statistics: Num rows: 309 Data size: 1236 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 309 Data size: 1236 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: UDFToDouble(_col0) (type: double)
+                        sort order: +
+                        Map-reduce partition columns: UDFToDouble(_col0) (type: double)
+                        Statistics: Num rows: 309 Data size: 1236 Basic stats: COMPLETE Column stats: COMPLETE
+            Execution mode: vectorized
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  alias: src
+                  filterExpr: UDFToDouble(key) is not null (type: boolean)
+                  Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: UDFToDouble(key) is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: UDFToDouble(_col0) (type: double)
+                        sort order: +
+                        Map-reduce partition columns: UDFToDouble(_col0) (type: double)
+                        Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                        value expressions: _col0 (type: string), _col1 (type: string)
+            Execution mode: vectorized
+        Reducer 2 
+            Reduce Operator Tree:
+              Merge Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 UDFToDouble(_col0) (type: double)
+                  1 UDFToDouble(_col0) (type: double)
+                outputColumnNames: _col1, _col2
+                Statistics: Num rows: 488 Data size: 86864 Basic stats: COMPLETE Column stats: COMPLETE
+                Select Operator
+                  expressions: _col1 (type: string), _col2 (type: string)
+                  outputColumnNames: _col0, _col1
+                  Statistics: Num rows: 488 Data size: 86864 Basic stats: COMPLETE Column stats: COMPLETE
+                  Reduce Output Operator
+                    key expressions: _col0 (type: string), _col1 (type: string)
+                    sort order: ++
+                    Statistics: Num rows: 488 Data size: 86864 Basic stats: COMPLETE Column stats: COMPLETE
+                    TopN Hash Memory Usage: 0.1
+        Reducer 3 
+            Execution mode: vectorized
+            Reduce Operator Tree:
+              Select Operator
+                expressions: KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1 (type: string)
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 488 Data size: 86864 Basic stats: COMPLETE Column stats: COMPLETE
+                Limit
+                  Number of rows: 10
+                  Statistics: Num rows: 10 Data size: 1780 Basic stats: COMPLETE Column stats: COMPLETE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 10 Data size: 1780 Basic stats: COMPLETE Column stats: COMPLETE
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: 10
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: SELECT y.*
+FROM
+(SELECT kv_table.* FROM kv_table) x
+JOIN
+(SELECT src.* FROM src) y
+ON (x.key = y.key)
+ORDER BY key, value LIMIT 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@kv_table
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT y.*
+FROM
+(SELECT kv_table.* FROM kv_table) x
+JOIN
+(SELECT src.* FROM src) y
+ON (x.key = y.key)
+ORDER BY key, value LIMIT 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@kv_table
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+0	val_0
+0	val_0
+0	val_0
+10	val_10
+100	val_100
+100	val_100
+103	val_103
+103	val_103
+104	val_104
+104	val_104
+PREHOOK: query: EXPLAIN
+SELECT max(k.key) as max, min(k.value) as min
+FROM kv_table k JOIN src s ON (k.key = s.key)
+WHERE k.key > 100 and k.key % 2 = 0
+GROUP BY k.key
+ORDER BY min
+LIMIT 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@kv_table
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: EXPLAIN
+SELECT max(k.key) as max, min(k.value) as min
+FROM kv_table k JOIN src s ON (k.key = s.key)
+WHERE k.key > 100 and k.key % 2 = 0
+GROUP BY k.key
+ORDER BY min
+LIMIT 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@kv_table
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: k
+                  filterExpr: (key > 100) (type: boolean)
+                  Statistics: Num rows: 309 Data size: 29355 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: (((key % 2) = 0) and UDFToDouble(key) is not null) (type: boolean)
+                    Statistics: Num rows: 154 Data size: 14630 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: int), value (type: string), UDFToDouble(key) (type: double)
+                      outputColumnNames: _col0, _col1, _col2
+                      Statistics: Num rows: 154 Data size: 15862 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col2 (type: double)
+                        sort order: +
+                        Map-reduce partition columns: _col2 (type: double)
+                        Statistics: Num rows: 154 Data size: 15862 Basic stats: COMPLETE Column stats: COMPLETE
+                        value expressions: _col0 (type: int), _col1 (type: string)
+            Execution mode: vectorized
+        Map 5 
+            Map Operator Tree:
+                TableScan
+                  alias: s
+                  filterExpr: UDFToDouble(key) is not null (type: boolean)
+                  Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: UDFToDouble(key) is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: UDFToDouble(key) (type: double)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 4000 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: double)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: double)
+                        Statistics: Num rows: 500 Data size: 4000 Basic stats: COMPLETE Column stats: COMPLETE
+            Execution mode: vectorized
+        Reducer 2 
+            Reduce Operator Tree:
+              Merge Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col2 (type: double)
+                  1 _col0 (type: double)
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 243 Data size: 23085 Basic stats: COMPLETE Column stats: COMPLETE
+                Group By Operator
+                  aggregations: max(_col0), min(_col1)
+                  keys: _col0 (type: int)
+                  minReductionHashAggr: 0.5020576
+                  mode: hash
+                  outputColumnNames: _col0, _col1, _col2
+                  Statistics: Num rows: 121 Data size: 23232 Basic stats: COMPLETE Column stats: COMPLETE
+                  Reduce Output Operator
+                    key expressions: _col0 (type: int)
+                    sort order: +
+                    Map-reduce partition columns: _col0 (type: int)
+                    Statistics: Num rows: 121 Data size: 23232 Basic stats: COMPLETE Column stats: COMPLETE
+                    value expressions: _col1 (type: int), _col2 (type: string)
+        Reducer 3 
+            Execution mode: vectorized
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: max(VALUE._col0), min(VALUE._col1)
+                keys: KEY._col0 (type: int)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1, _col2
+                Statistics: Num rows: 121 Data size: 23232 Basic stats: COMPLETE Column stats: COMPLETE
+                Select Operator
+                  expressions: _col1 (type: int), _col2 (type: string)
+                  outputColumnNames: _col0, _col1
+                  Statistics: Num rows: 121 Data size: 22748 Basic stats: COMPLETE Column stats: COMPLETE
+                  Reduce Output Operator
+                    key expressions: _col1 (type: string)
+                    sort order: +
+                    Statistics: Num rows: 121 Data size: 22748 Basic stats: COMPLETE Column stats: COMPLETE
+                    TopN Hash Memory Usage: 0.1
+                    value expressions: _col0 (type: int)
+        Reducer 4 
+            Execution mode: vectorized
+            Reduce Operator Tree:
+              Select Operator
+                expressions: VALUE._col0 (type: int), KEY.reducesinkkey0 (type: string)
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 121 Data size: 22748 Basic stats: COMPLETE Column stats: COMPLETE
+                Limit
+                  Number of rows: 10
+                  Statistics: Num rows: 10 Data size: 1880 Basic stats: COMPLETE Column stats: COMPLETE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 10 Data size: 1880 Basic stats: COMPLETE Column stats: COMPLETE
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: 10
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: SELECT max(k.key) as max, min(k.value) as min
+FROM kv_table k JOIN src s ON (k.key = s.key)
+WHERE k.key > 100 and k.key % 2 = 0
+GROUP BY k.key
+ORDER BY min
+LIMIT 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@kv_table
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT max(k.key) as max, min(k.value) as min
+FROM kv_table k JOIN src s ON (k.key = s.key)
+WHERE k.key > 100 and k.key % 2 = 0
+GROUP BY k.key
+ORDER BY min
+LIMIT 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@kv_table
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+104	val_104
+114	val_114
+116	val_116
+118	val_118
+120	val_120
+126	val_126
+128	val_128
+134	val_134
+136	val_136
+138	val_138
diff --git a/kudu-handler/src/test/results/positive/kudu_queries.q.out b/kudu-handler/src/test/results/positive/kudu_queries.q.out
new file mode 100644
index 0000000..4ec8c94
--- /dev/null
+++ b/kudu-handler/src/test/results/positive/kudu_queries.q.out
@@ -0,0 +1,1358 @@
+PREHOOK: query: DROP TABLE IF EXISTS kv_table
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE IF EXISTS kv_table
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE EXTERNAL TABLE kv_table(key int, value string)
+STORED BY 'org.apache.hadoop.hive.kudu.KuduStorageHandler'
+TBLPROPERTIES ("kudu.table_name" = "default.kudu_kv")
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@kv_table
+POSTHOOK: query: CREATE EXTERNAL TABLE kv_table(key int, value string)
+STORED BY 'org.apache.hadoop.hive.kudu.KuduStorageHandler'
+TBLPROPERTIES ("kudu.table_name" = "default.kudu_kv")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@kv_table
+PREHOOK: query: DESCRIBE EXTENDED kv_table
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@kv_table
+POSTHOOK: query: DESCRIBE EXTENDED kv_table
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@kv_table
+key                 	int                 	                    
+value               	string              	                    
+	 	 
+#### A masked pattern was here ####
+PREHOOK: query: INSERT INTO TABLE kv_table VALUES
+(1, "1"), (2, "2")
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@kv_table
+POSTHOOK: query: INSERT INTO TABLE kv_table VALUES
+(1, "1"), (2, "2")
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@kv_table
+PREHOOK: query: SELECT * FROM kv_table
+PREHOOK: type: QUERY
+PREHOOK: Input: default@kv_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM kv_table
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@kv_table
+#### A masked pattern was here ####
+1	1
+2	2
+PREHOOK: query: SELECT count(*) FROM kv_table
+PREHOOK: type: QUERY
+PREHOOK: Input: default@kv_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT count(*) FROM kv_table
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@kv_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT count(*) FROM kv_table LIMIT 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@kv_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT count(*) FROM kv_table LIMIT 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@kv_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT count(1) FROM kv_table
+PREHOOK: type: QUERY
+PREHOOK: Input: default@kv_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT count(1) FROM kv_table
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@kv_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT kEy FROM kv_table
+PREHOOK: type: QUERY
+PREHOOK: Input: default@kv_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT kEy FROM kv_table
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@kv_table
+#### A masked pattern was here ####
+1
+2
+PREHOOK: query: DROP TABLE kv_table
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@kv_table
+PREHOOK: Output: default@kv_table
+POSTHOOK: query: DROP TABLE kv_table
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@kv_table
+POSTHOOK: Output: default@kv_table
+PREHOOK: query: DROP TABLE IF EXISTS all_types_table
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE IF EXISTS all_types_table
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE EXTERNAL TABLE all_types_table
+STORED BY 'org.apache.hadoop.hive.kudu.KuduStorageHandler'
+TBLPROPERTIES ("kudu.table_name" = "default.kudu_all_types")
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@all_types_table
+POSTHOOK: query: CREATE EXTERNAL TABLE all_types_table
+STORED BY 'org.apache.hadoop.hive.kudu.KuduStorageHandler'
+TBLPROPERTIES ("kudu.table_name" = "default.kudu_all_types")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@all_types_table
+PREHOOK: query: DESCRIBE EXTENDED all_types_table
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@all_types_table
+POSTHOOK: query: DESCRIBE EXTENDED all_types_table
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@all_types_table
+key                 	tinyint             	The key column.     
+int16               	smallint            	                    
+int32               	int                 	                    
+int64               	bigint              	                    
+bool                	boolean             	                    
+float               	float               	                    
+double              	double              	                    
+string              	string              	                    
+binary              	binary              	                    
+timestamp           	timestamp           	                    
+decimal             	decimal(5,3)        	                    
+null                	string              	                    
+default             	int                 	                    
+	 	 
+#### A masked pattern was here ####
+PREHOOK: query: INSERT INTO TABLE all_types_table VALUES
+(1, 1, 1, 1, true, 1.1, 1.1, "one", 'one', '2001-01-01 01:11:11', 1.111, null, 1),
+(2, 2, 2, 2, false, 2.2, 2.2, "two", 'two', '2001-01-01 01:12:12', 2.222, "not null", 2)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@all_types_table
+POSTHOOK: query: INSERT INTO TABLE all_types_table VALUES
+(1, 1, 1, 1, true, 1.1, 1.1, "one", 'one', '2001-01-01 01:11:11', 1.111, null, 1),
+(2, 2, 2, 2, false, 2.2, 2.2, "two", 'two', '2001-01-01 01:12:12', 2.222, "not null", 2)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@all_types_table
+PREHOOK: query: SELECT * FROM all_types_table
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM all_types_table
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2	2	2	2	false	2.2	2.2	two	two	2001-01-01 01:12:12	2.222	not null	2
+1	1	1	1	true	1.1	1.1	one	one	2001-01-01 01:11:11	1.111	NULL	1
+PREHOOK: query: SELECT count(*) FROM all_types_table
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT count(*) FROM all_types_table
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE key = 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE key = 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        TableScan
+          alias: all_types_table
+          filterExpr: (key = 1Y) (type: boolean)
+          Select Operator
+            expressions: 1Y (type: tinyint)
+            outputColumnNames: _col0
+            ListSink
+
+PREHOOK: query: SELECT key FROM all_types_table WHERE key = 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE key = 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE key != 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE key != 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT key FROM all_types_table WHERE key > 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE key > 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT key FROM all_types_table WHERE key >= 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE key >= 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE key < 2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE key < 2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE key <= 2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE key <= 2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+1
+PREHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE int16 = 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE int16 = 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        TableScan
+          alias: all_types_table
+          filterExpr: (int16 = 1S) (type: boolean)
+          Select Operator
+            expressions: key (type: tinyint)
+            outputColumnNames: _col0
+            ListSink
+
+PREHOOK: query: SELECT key FROM all_types_table WHERE int16 = 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE int16 = 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE int16 != 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE int16 != 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT key FROM all_types_table WHERE int16 > 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE int16 > 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT key FROM all_types_table WHERE int16 >= 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE int16 >= 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE int16 < 2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE int16 < 2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE int16 <= 2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE int16 <= 2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+1
+PREHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE int32 = 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE int32 = 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        TableScan
+          alias: all_types_table
+          filterExpr: (int32 = 1) (type: boolean)
+          Select Operator
+            expressions: key (type: tinyint)
+            outputColumnNames: _col0
+            ListSink
+
+PREHOOK: query: SELECT key FROM all_types_table WHERE int32 = 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE int32 = 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE int32 != 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE int32 != 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT key FROM all_types_table WHERE int32 > 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE int32 > 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT key FROM all_types_table WHERE int32 >= 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE int32 >= 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE int32 < 2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE int32 < 2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE int32 <= 2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE int32 <= 2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+1
+PREHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE int64 = 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE int64 = 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        TableScan
+          alias: all_types_table
+          filterExpr: (int64 = 1L) (type: boolean)
+          Select Operator
+            expressions: key (type: tinyint)
+            outputColumnNames: _col0
+            ListSink
+
+PREHOOK: query: SELECT key FROM all_types_table WHERE int64 = 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE int64 = 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE int64 != 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE int64 != 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT key FROM all_types_table WHERE int64 > 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE int64 > 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT key FROM all_types_table WHERE int64 >= 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE int64 >= 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE int64 < 2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE int64 < 2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE int64 <= 2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE int64 <= 2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+1
+PREHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE bool = true
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE bool = true
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        TableScan
+          alias: all_types_table
+          Filter Operator
+            predicate: bool (type: boolean)
+            Select Operator
+              expressions: key (type: tinyint)
+              outputColumnNames: _col0
+              ListSink
+
+PREHOOK: query: SELECT key FROM all_types_table WHERE bool = true
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE bool = true
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE bool != true
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE bool != true
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT key FROM all_types_table WHERE bool > true
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE bool > true
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+PREHOOK: query: SELECT key FROM all_types_table WHERE bool >= true
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE bool >= true
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE bool < false
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE bool < false
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+PREHOOK: query: SELECT key FROM all_types_table WHERE bool <= false
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE bool <= false
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE `string` = "one"
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE `string` = "one"
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        TableScan
+          alias: all_types_table
+          filterExpr: (string = 'one') (type: boolean)
+          Select Operator
+            expressions: key (type: tinyint)
+            outputColumnNames: _col0
+            ListSink
+
+PREHOOK: query: SELECT key FROM all_types_table WHERE `string` = "one"
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `string` = "one"
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE `string` != "one"
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `string` != "one"
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT key FROM all_types_table WHERE `string` > "one"
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `string` > "one"
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT key FROM all_types_table WHERE `string` >= "one"
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `string` >= "one"
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE `string` < "two"
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `string` < "two"
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE `string` <= "two"
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `string` <= "two"
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+1
+PREHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE `binary` = cast ('one' as binary)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE `binary` = cast ('one' as binary)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        TableScan
+          alias: all_types_table
+          filterExpr: (binary = 6F6E65) (type: boolean)
+          Filter Operator
+            predicate: (binary = 6F6E65) (type: boolean)
+            Select Operator
+              expressions: key (type: tinyint)
+              outputColumnNames: _col0
+              ListSink
+
+PREHOOK: query: SELECT key FROM all_types_table WHERE `binary` = cast ('one' as binary)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `binary` = cast ('one' as binary)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE `binary` != cast ('one' as binary)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `binary` != cast ('one' as binary)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT key FROM all_types_table WHERE `binary` > cast ('one' as binary)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `binary` > cast ('one' as binary)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT key FROM all_types_table WHERE `binary` >= cast ('one' as binary)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `binary` >= cast ('one' as binary)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE `binary` < cast ('two' as binary)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `binary` < cast ('two' as binary)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE `binary` <= cast ('two' as binary)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `binary` <= cast ('two' as binary)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+1
+PREHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE `timestamp` = '2001-01-01 01:11:11'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE `timestamp` = '2001-01-01 01:11:11'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        TableScan
+          alias: all_types_table
+          filterExpr: (timestamp = TIMESTAMP'2001-01-01 01:11:11') (type: boolean)
+          Select Operator
+            expressions: key (type: tinyint)
+            outputColumnNames: _col0
+            ListSink
+
+PREHOOK: query: SELECT key FROM all_types_table WHERE `timestamp` = '2001-01-01 01:11:11'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `timestamp` = '2001-01-01 01:11:11'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE `timestamp` != '2001-01-01 01:11:11'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `timestamp` != '2001-01-01 01:11:11'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT key FROM all_types_table WHERE `timestamp` > '2001-01-01 01:11:11'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `timestamp` > '2001-01-01 01:11:11'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT key FROM all_types_table WHERE `timestamp` >= '2001-01-01 01:11:11'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `timestamp` >= '2001-01-01 01:11:11'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE `timestamp` < '2001-01-01 01:12:12'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `timestamp` < '2001-01-01 01:12:12'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE `timestamp` <= '2001-01-01 01:12:12'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `timestamp` <= '2001-01-01 01:12:12'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+1
+PREHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE `float` = 1.1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE `float` = 1.1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        TableScan
+          alias: all_types_table
+          filterExpr: (float = 1.1) (type: boolean)
+          Select Operator
+            expressions: key (type: tinyint)
+            outputColumnNames: _col0
+            ListSink
+
+PREHOOK: query: SELECT key FROM all_types_table WHERE `float` = 1.1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `float` = 1.1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE `float` != 1.1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `float` != 1.1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT key FROM all_types_table WHERE `float` > 1.1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `float` > 1.1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT key FROM all_types_table WHERE `float` >= 1.1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `float` >= 1.1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE `float` < 2.2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `float` < 2.2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE `float` <= 2.2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `float` <= 2.2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+1
+PREHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE `double` = 1.1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE `double` = 1.1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        TableScan
+          alias: all_types_table
+          filterExpr: (double = 1.1D) (type: boolean)
+          Select Operator
+            expressions: key (type: tinyint)
+            outputColumnNames: _col0
+            ListSink
+
+PREHOOK: query: SELECT key FROM all_types_table WHERE `double` = 1.1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `double` = 1.1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE `double` != 1.1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `double` != 1.1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT key FROM all_types_table WHERE `double` > 1.1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `double` > 1.1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT key FROM all_types_table WHERE `double` >= 1.1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `double` >= 1.1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE `double` < 2.2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `double` < 2.2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE `double` <= 2.2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `double` <= 2.2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+1
+PREHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE `decimal` = 1.111
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE `decimal` = 1.111
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        TableScan
+          alias: all_types_table
+          filterExpr: (decimal = 1.111) (type: boolean)
+          Select Operator
+            expressions: key (type: tinyint)
+            outputColumnNames: _col0
+            ListSink
+
+PREHOOK: query: SELECT key FROM all_types_table WHERE `decimal` = 1.111
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `decimal` = 1.111
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE `decimal` != 1.111
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `decimal` != 1.111
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT key FROM all_types_table WHERE `decimal` > 1.111
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `decimal` > 1.111
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT key FROM all_types_table WHERE `decimal` >= 1.111
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `decimal` >= 1.111
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE `decimal` < 2.222
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `decimal` < 2.222
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE `decimal` <= 2.222
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `decimal` <= 2.222
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+1
+PREHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE `null` IS NOT NULL
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE `null` IS NOT NULL
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        TableScan
+          alias: all_types_table
+          filterExpr: null is not null (type: boolean)
+          Filter Operator
+            predicate: null is not null (type: boolean)
+            Select Operator
+              expressions: key (type: tinyint)
+              outputColumnNames: _col0
+              ListSink
+
+PREHOOK: query: SELECT key FROM all_types_table WHERE `null` IS NOT NULL
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `null` IS NOT NULL
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT key FROM all_types_table WHERE `null` IS NULL
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `null` IS NULL
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE `null` = NULL
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `null` = NULL
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+PREHOOK: query: SELECT key FROM all_types_table WHERE `null` <=> NULL
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `null` <=> NULL
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+1
+PREHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE `null` IS NULL AND key < 2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE `null` IS NULL AND key < 2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        TableScan
+          alias: all_types_table
+          filterExpr: (key < 2Y) (type: boolean)
+          Filter Operator
+            predicate: null is null (type: boolean)
+            Select Operator
+              expressions: key (type: tinyint)
+              outputColumnNames: _col0
+              ListSink
+
+PREHOOK: query: SELECT key FROM all_types_table WHERE `null` IS NULL AND key < 2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `null` IS NULL AND key < 2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE key = 1 OR `string` = "one"
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: EXPLAIN SELECT key FROM all_types_table WHERE key = 1 OR `string` = "one"
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        TableScan
+          alias: all_types_table
+          filterExpr: ((key = 1Y) or (string = 'one')) (type: boolean)
+          Filter Operator
+            predicate: ((key = 1Y) or (string = 'one')) (type: boolean)
+            Select Operator
+              expressions: key (type: tinyint)
+              outputColumnNames: _col0
+              ListSink
+
+PREHOOK: query: SELECT key FROM all_types_table WHERE key = 1 OR `string` = "one"
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE key = 1 OR `string` = "one"
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE key = 1 OR `string` = "two"
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE key = 1 OR `string` = "two"
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE string IN ("one", "missing")
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE string IN ("one", "missing")
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE string NOT IN ("one", "missing")
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE string NOT IN ("one", "missing")
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+2
+PREHOOK: query: SELECT key FROM all_types_table WHERE key BETWEEN -1 and 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE key BETWEEN -1 and 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: SELECT key FROM all_types_table WHERE `string` LIKE "%n%"
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT key FROM all_types_table WHERE `string` LIKE "%n%"
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+1
+PREHOOK: query: ANALYZE TABLE all_types_table COMPUTE STATISTICS FOR COLUMNS
+PREHOOK: type: ANALYZE_TABLE
+PREHOOK: Input: default@all_types_table
+PREHOOK: Output: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: ANALYZE TABLE all_types_table COMPUTE STATISTICS FOR COLUMNS
+POSTHOOK: type: ANALYZE_TABLE
+POSTHOOK: Input: default@all_types_table
+POSTHOOK: Output: default@all_types_table
+#### A masked pattern was here ####
+PREHOOK: query: EXPLAIN SELECT * FROM all_types_table
+PREHOOK: type: QUERY
+PREHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+POSTHOOK: query: EXPLAIN SELECT * FROM all_types_table
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@all_types_table
+#### A masked pattern was here ####
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        TableScan
+          alias: all_types_table
+          Select Operator
+            expressions: key (type: tinyint), int16 (type: smallint), int32 (type: int), int64 (type: bigint), bool (type: boolean), float (type: float), double (type: double), string (type: string), binary (type: binary), timestamp (type: timestamp), decimal (type: decimal(5,3)), null (type: string), default (type: int)
+            outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12
+            ListSink
+
+PREHOOK: query: SHOW CREATE TABLE all_types_table
+PREHOOK: type: SHOW_CREATETABLE
+PREHOOK: Input: default@all_types_table
+POSTHOOK: query: SHOW CREATE TABLE all_types_table
+POSTHOOK: type: SHOW_CREATETABLE
+POSTHOOK: Input: default@all_types_table
+CREATE EXTERNAL TABLE `all_types_table`(
+  `key` tinyint COMMENT 'The key column.', 
+  `int16` smallint COMMENT '', 
+  `int32` int COMMENT '', 
+  `int64` bigint COMMENT '', 
+  `bool` boolean COMMENT '', 
+  `float` float COMMENT '', 
+  `double` double COMMENT '', 
+  `string` string COMMENT '', 
+  `binary` binary COMMENT '', 
+  `timestamp` timestamp COMMENT '', 
+  `decimal` decimal(5,3) COMMENT '', 
+  `null` string COMMENT '', 
+  `default` int COMMENT '')
+ROW FORMAT SERDE 
+  'org.apache.hadoop.hive.kudu.KuduSerDe' 
+STORED BY 
+  'org.apache.hadoop.hive.kudu.KuduStorageHandler' 
+WITH SERDEPROPERTIES ( 
+  'serialization.format'='1')
+LOCATION
+#### A masked pattern was here ####
+TBLPROPERTIES (
+  'bucketing_version'='2', 
+  'kudu.table_name'='default.kudu_all_types', 
+#### A masked pattern was here ####
+PREHOOK: query: DESCRIBE all_types_table
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@all_types_table
+POSTHOOK: query: DESCRIBE all_types_table
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@all_types_table
+key                 	tinyint             	The key column.     
+int16               	smallint            	                    
+int32               	int                 	                    
+int64               	bigint              	                    
+bool                	boolean             	                    
+float               	float               	                    
+double              	double              	                    
+string              	string              	                    
+binary              	binary              	                    
+timestamp           	timestamp           	                    
+decimal             	decimal(5,3)        	                    
+null                	string              	                    
+default             	int                 	                    
+PREHOOK: query: DESCRIBE FORMATTED all_types_table
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@all_types_table
+POSTHOOK: query: DESCRIBE FORMATTED all_types_table
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@all_types_table
+# col_name            	data_type           	comment             
+key                 	tinyint             	The key column.     
+int16               	smallint            	                    
+int32               	int                 	                    
+int64               	bigint              	                    
+bool                	boolean             	                    
+float               	float               	                    
+double              	double              	                    
+string              	string              	                    
+binary              	binary              	                    
+timestamp           	timestamp           	                    
+decimal             	decimal(5,3)        	                    
+null                	string              	                    
+default             	int                 	                    
+	 	 
+# Detailed Table Information	 	 
+Database:           	default             	 
+#### A masked pattern was here ####
+Retention:          	0                   	 
+#### A masked pattern was here ####
+Table Type:         	EXTERNAL_TABLE      	 
+Table Parameters:	 	 
+	COLUMN_STATS_ACCURATE	{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"binary\":\"true\",\"bool\":\"true\",\"decimal\":\"true\",\"default\":\"true\",\"double\":\"true\",\"float\":\"true\",\"int16\":\"true\",\"int32\":\"true\",\"int64\":\"true\",\"key\":\"true\",\"null\":\"true\",\"string\":\"true\",\"timestamp\":\"true\"}}
+	EXTERNAL            	TRUE                
+	bucketing_version   	2                   
+	kudu.table_name     	default.kudu_all_types
+#### A masked pattern was here ####
+	numFiles            	0                   
+	numRows             	2                   
+	rawDataSize         	0                   
+	storage_handler     	org.apache.hadoop.hive.kudu.KuduStorageHandler
+	totalSize           	0                   
+#### A masked pattern was here ####
+	 	 
+# Storage Information	 	 
+SerDe Library:      	org.apache.hadoop.hive.kudu.KuduSerDe	 
+InputFormat:        	null                	 
+OutputFormat:       	null                	 
+Compressed:         	No                  	 
+Num Buckets:        	-1                  	 
+Bucket Columns:     	[]                  	 
+Sort Columns:       	[]                  	 
+Storage Desc Params:	 	 
+	serialization.format	1                   
+PREHOOK: query: DROP TABLE all_types_table
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@all_types_table
+PREHOOK: Output: default@all_types_table
+POSTHOOK: query: DROP TABLE all_types_table
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@all_types_table
+POSTHOOK: Output: default@all_types_table
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyAuxJars.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyAuxJars.java
index 7b2e32b..7e849bf 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyAuxJars.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyAuxJars.java
@@ -46,7 +46,8 @@ class AsyncTaskCopyAuxJars implements Callable<Void> {
   private static final String[] DEFAULT_AUX_CLASSES =
       new String[] {"org.apache.hive.hcatalog.data.JsonSerDe", "org.apache.hadoop.hive.druid.DruidStorageHandler",
           "org.apache.hive.storage.jdbc.JdbcStorageHandler", "org.apache.commons.dbcp.BasicDataSourceFactory",
-          "org.apache.commons.pool.impl.GenericObjectPool", "org.apache.hadoop.hive.kafka.KafkaStorageHandler"};
+          "org.apache.commons.pool.impl.GenericObjectPool", "org.apache.hadoop.hive.kafka.KafkaStorageHandler",
+          "org.apache.hadoop.hive.kudu.KuduStorageHandler"};
   private static final String HBASE_SERDE_CLASS = "org.apache.hadoop.hive.hbase.HBaseSerDe";
 
   private final LlapServiceCommandLine cl;
diff --git a/pom.xml b/pom.xml
index 4549d48..996a6ad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,6 +59,7 @@
     <module>shims</module>
     <module>spark-client</module>
     <module>kryo-registrator</module>
+    <module>kudu-handler</module>
     <module>testutils</module>
     <module>packaging</module>
     <module>standalone-metastore</module>
@@ -183,6 +184,7 @@
     <json.version>1.8</json.version>
     <junit.version>4.11</junit.version>
     <kryo.version>4.0.2</kryo.version>
+    <kudu.version>1.10.0</kudu.version>
     <!-- Leaving libfb303 at 0.9.3 regardless of libthrift: As per THRIFT-4613 The Apache Thrift project does not publish items related to fb303 at this point -->
     <libfb303.version>0.9.3</libfb303.version>
     <libthrift.version>0.9.3-1</libthrift.version>