You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ab...@apache.org on 2019/10/23 08:46:06 UTC

[kudu] branch master updated (4343fb7 -> 768b688)

This is an automated email from the ASF dual-hosted git repository.

abukor pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 4343fb7  KUDU-1938 Add C++ client support for VARCHAR pt 2
     new 9067963  KUDU-1938 Add non-copy VARCHAR setters pt 3
     new e2b2fb2  KUDU-1938 [java] Add support for VARCHAR pt 4
     new 85fe726  KUDU-1938 [hms] Add HMS support for VARCHAR pt 5
     new 768b688  KUDU-1938 Add range partition support pt 6

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/main/protobuf/backup.proto                 |  1 +
 .../org/apache/kudu/backup/TableMetadata.scala     |  7 ++
 .../main/java/org/apache/kudu/ColumnSchema.java    | 19 ++++-
 .../java/org/apache/kudu/ColumnTypeAttributes.java | 45 ++++++++++--
 .../src/main/java/org/apache/kudu/Type.java        |  5 +-
 .../apache/kudu/client/ColumnRangePredicate.java   |  5 +-
 .../java/org/apache/kudu/client/KeyEncoder.java    |  9 ++-
 .../java/org/apache/kudu/client/KuduPredicate.java |  8 ++-
 .../java/org/apache/kudu/client/Operation.java     |  3 +-
 .../java/org/apache/kudu/client/PartialRow.java    | 84 +++++++++++++++++++++-
 .../org/apache/kudu/client/ProtobufHelper.java     |  8 +++
 .../java/org/apache/kudu/client/RowResult.java     | 37 +++++++++-
 .../CharUtil.java}                                 | 27 ++++---
 .../java/org/apache/kudu/util/DataGenerator.java   |  4 ++
 .../java/org/apache/kudu/util/SchemaGenerator.java | 14 ++++
 .../java/org/apache/kudu/TestColumnSchema.java     | 20 ++++++
 .../org/apache/kudu/client/TestKeyEncoding.java    | 27 +++++--
 .../org/apache/kudu/client/TestKuduClient.java     | 57 +++++++++++++++
 .../org/apache/kudu/client/TestKuduPredicate.java  | 29 ++++++++
 .../org/apache/kudu/client/TestPartialRow.java     | 22 +++++-
 .../java/org/apache/kudu/client/TestRowResult.java |  6 ++
 .../org/apache/kudu/client/TestScanPredicate.java  | 34 ++++++++-
 .../spark/tools/DistributedDataGenerator.scala     |  2 +
 .../org/apache/kudu/spark/kudu/RowConverter.scala  | 10 ++-
 .../org/apache/kudu/spark/kudu/SparkUtil.scala     |  1 +
 .../apache/kudu/spark/kudu/DefaultSourceTest.scala |  2 +-
 .../apache/kudu/spark/kudu/KuduContextTest.scala   |  4 +-
 .../org/apache/kudu/spark/kudu/KuduTestSuite.scala |  8 +++
 .../java/org/apache/kudu/test/ClientTestUtil.java  | 24 ++++++-
 src/kudu/common/partial_row-test.cc                | 20 ++++++
 src/kudu/common/partial_row.cc                     | 16 +++++
 src/kudu/common/partial_row.h                      | 52 ++++++++++++++
 src/kudu/common/partition-test.cc                  | 54 ++++++++++++++
 src/kudu/common/partition.cc                       | 11 +++
 src/kudu/common/partition.h                        |  1 +
 src/kudu/hms/hms_catalog-test.cc                   |  1 +
 src/kudu/hms/hms_catalog.cc                        |  2 +
 src/kudu/integration-tests/varchar-itest.cc        | 62 +++++++++++++++-
 38 files changed, 690 insertions(+), 51 deletions(-)
 copy java/kudu-client/src/main/java/org/apache/kudu/{client/ListTabletsResponse.java => util/CharUtil.java} (63%)


[kudu] 03/04: KUDU-1938 [hms] Add HMS support for VARCHAR pt 5

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 85fe726794c1f0f4cc4a3add4afbdf14fc3544ce
Author: Attila Bukor <ab...@apache.org>
AuthorDate: Wed Sep 25 17:13:36 2019 +0200

    KUDU-1938 [hms] Add HMS support for VARCHAR pt 5
    
    Change-Id: If929bd14eb8d1b2e54ea7892c2f85a3b1305c05d
    Reviewed-on: http://gerrit.cloudera.org:8080/14303
    Reviewed-by: Grant Henke <gr...@apache.org>
    Tested-by: Kudu Jenkins
---
 src/kudu/hms/hms_catalog-test.cc | 1 +
 src/kudu/hms/hms_catalog.cc      | 2 ++
 2 files changed, 3 insertions(+)

diff --git a/src/kudu/hms/hms_catalog-test.cc b/src/kudu/hms/hms_catalog-test.cc
index 405b26f..45c9732 100644
--- a/src/kudu/hms/hms_catalog-test.cc
+++ b/src/kudu/hms/hms_catalog-test.cc
@@ -203,6 +203,7 @@ class HmsCatalogTest : public KuduTest {
     b.AddColumn("decimal32_val", DataType::DECIMAL32);
     b.AddColumn("decimal64_val", DataType::DECIMAL64);
     b.AddColumn("decimal128_val", DataType::DECIMAL128);
+    b.AddColumn("varchar_val", DataType::VARCHAR);
     return b.Build();
   }
 
diff --git a/src/kudu/hms/hms_catalog.cc b/src/kudu/hms/hms_catalog.cc
index ccf22ee..a01c4da 100644
--- a/src/kudu/hms/hms_catalog.cc
+++ b/src/kudu/hms/hms_catalog.cc
@@ -330,6 +330,8 @@ string column_to_field_type(const ColumnSchema& column) {
     case DOUBLE: return "double";
     case STRING: return "string";
     case BINARY: return "binary";
+    case VARCHAR: return Substitute("varchar($0)",
+                                    column.type_attributes().length);
     case UNIXTIME_MICROS: return "timestamp";
     default: LOG(FATAL) << "unhandled column type: " << column.TypeToString();
   }


[kudu] 01/04: KUDU-1938 Add non-copy VARCHAR setters pt 3

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 9067963c94a5263e5c1cb0c87d839ac7b6f0f449
Author: Attila Bukor <ab...@apache.org>
AuthorDate: Tue Sep 24 15:36:08 2019 +0200

    KUDU-1938 Add non-copy VARCHAR setters pt 3
    
    Apache Impala uses KuduPartialRow API to determine which partition a row
    will be inserted to distribute the data between executors optimally.
    
    For this purpose the copy is unnecessary and it should be fast. This
    commit adds NoCopyUnsafe variants for this purpose which expect the data
    to already be truncated (which it is in Impala's case) and only check
    that the value's length is lower than the highest possible upper bound:
    val.size() < max_length*4 bytes (the maximum size of an UTF8 character)
    to avoid having to count each character manually.
    
    Change-Id: I1f2aba098d649eb94e0314f6606cc33600e8d766
    Reviewed-on: http://gerrit.cloudera.org:8080/13928
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Reviewed-by: Grant Henke <gr...@apache.org>
    Tested-by: Kudu Jenkins
---
 src/kudu/common/partial_row-test.cc | 20 ++++++++++++++
 src/kudu/common/partial_row.cc      | 16 ++++++++++++
 src/kudu/common/partial_row.h       | 52 +++++++++++++++++++++++++++++++++++++
 3 files changed, 88 insertions(+)

diff --git a/src/kudu/common/partial_row-test.cc b/src/kudu/common/partial_row-test.cc
index ecf600e..a62f309 100644
--- a/src/kudu/common/partial_row-test.cc
+++ b/src/kudu/common/partial_row-test.cc
@@ -278,6 +278,26 @@ TEST_F(PartialRowTest, UnitTest) {
 
   s = row.SetVarchar("varchar_val", "123456789\xF0\x9F\xA6\x8C ABCDEF");
   EXPECT_EQ("varchar varchar_val=\"123456789\xF0\x9F\xA6\x8C\"", row.ToString());
+
+  s = row.SetVarcharNoCopyUnsafe("varchar_val", "varchar");
+  EXPECT_EQ("varchar varchar_val=\"varchar\"", row.ToString());
+
+  std::string utf8_char_4byte = "\xf3\xa0\x87\xa1";
+  std::string test_string = utf8_char_4byte;
+  for (auto i = 0; i < 9; ++i) {
+    test_string += utf8_char_4byte;
+  }
+
+  std::string expected_string = "varchar varchar_val=\"";
+  expected_string += test_string + "\"";
+
+  s = row.SetVarcharNoCopyUnsafe("varchar_val", test_string);
+  EXPECT_EQ(expected_string, row.ToString());
+
+  test_string += utf8_char_4byte;
+
+  s = row.SetVarcharNoCopyUnsafe("varchar_val", test_string);
+  EXPECT_TRUE(s.IsInvalidArgument());
 }
 
 TEST_F(PartialRowTest, TestCopy) {
diff --git a/src/kudu/common/partial_row.cc b/src/kudu/common/partial_row.cc
index f99dd58..2467423 100644
--- a/src/kudu/common/partial_row.cc
+++ b/src/kudu/common/partial_row.cc
@@ -386,6 +386,22 @@ Status KuduPartialRow::SetStringNoCopy(int col_idx, const Slice& val) {
   return Set<TypeTraits<STRING> >(col_idx, val, false);
 }
 
+Status KuduPartialRow::SetVarcharNoCopyUnsafe(const Slice& col_name, const Slice& val) {
+  int col_idx;
+  RETURN_NOT_OK(schema_->FindColumn(col_name, &col_idx));
+  return SetVarcharNoCopyUnsafe(col_idx, val);
+}
+
+Status KuduPartialRow::SetVarcharNoCopyUnsafe(int col_idx, const Slice& val) {
+  auto col = schema_->column(col_idx);
+  if (val.size() > col.type_attributes().length * 4) {
+    return Status::InvalidArgument(
+        Substitute("Value too long, limit is $0 characters",
+                   col.type_attributes().length));
+  }
+  return Set<TypeTraits<VARCHAR> >(col_idx, val);
+}
+
 template<typename T>
 Status KuduPartialRow::SetSliceCopy(const Slice& col_name, const Slice& val) {
   int col_idx;
diff --git a/src/kudu/common/partial_row.h b/src/kudu/common/partial_row.h
index ec33879..ac2816b 100644
--- a/src/kudu/common/partial_row.h
+++ b/src/kudu/common/partial_row.h
@@ -281,6 +281,30 @@ class KUDU_EXPORT KuduPartialRow {
   Status SetStringNoCopy(const Slice& col_name, const Slice& val) WARN_UNUSED_RESULT;
   ///@}
 
+  /// @name [Advanced][Unstable] Setter for varchar columns by name (non-copying).
+  ///
+  /// Set the varchar value for a column by name, not copying the
+  /// specified data.
+  ///
+  /// This method expects the values to be truncated already and they only do a
+  /// basic validation that the data is not larger than the maximum column
+  /// length (as indicated by the schema) multiplied by 4, as that's the upper
+  /// limit if only 4-byte UTF8 characters are used. This is subject to change in
+  /// the future.
+  ///
+  /// @note The specified data must remain valid until the corresponding
+  ///   RPC calls are completed to be able to access error buffers,
+  ///   if any errors happened (the errors can be fetched using the
+  ///   KuduSession::GetPendingErrors() method).
+  ///
+  /// @param [in] col_name
+  ///   Name of the target column.
+  /// @param [in] val
+  ///   The value to set.
+  /// @return Operation result status.
+  ///
+  Status SetVarcharNoCopyUnsafe(const Slice& col_name, const Slice& val) WARN_UNUSED_RESULT;
+
   /// @name Setters for binary/string columns by index (non-copying).
   ///
   /// Set the binary/string value for a column by index, not copying the
@@ -307,6 +331,34 @@ class KUDU_EXPORT KuduPartialRow {
   Status SetStringNoCopy(int col_idx, const Slice& val) WARN_UNUSED_RESULT;
   ///@}
 
+  /// @name [Advanced][Unstable] Setter for varchar columns by index (non-copying).
+  ///
+  /// Set the varchar value for a column by index, not copying the specified data.
+  ///
+  /// This method expects the values to be truncated already and they only do a
+  /// basic validation that the data is not larger than the maximum column
+  /// length (as indicated by the schema) multiplied by 4, as that's the upper
+  /// limit if only 4-byte UTF8 characters are used. This is subject to change in
+  /// the future.
+  ///
+  /// This setter is the same as the corresponding column-name-based setter,
+  /// but with numeric column indexes. This is faster since it avoids
+  /// hashmap lookups, so should be preferred in performance-sensitive code
+  /// (e.g. bulk loaders).
+  ///
+  /// @note The specified data must remain valid until the corresponding
+  ///   RPC calls are completed to be able to access error buffers,
+  ///   if any errors happened (the errors can be fetched using the
+  ///   KuduSession::GetPendingErrors() method).
+  ///
+  /// @param [in] col_idx
+  ///   The index of the target column.
+  /// @param [in] val
+  ///   The value to set.
+  /// @return Operation result status.
+  ///
+  Status SetVarcharNoCopyUnsafe(int col_idx, const Slice& val) WARN_UNUSED_RESULT;
+
   /// Set column value to @c NULL; the column is identified by its name.
   ///
   /// This will only succeed on nullable columns. Use Unset() to restore


[kudu] 02/04: KUDU-1938 [java] Add support for VARCHAR pt 4

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit e2b2fb25335fe517e46cdd61924698b36e9cc652
Author: Attila Bukor <ab...@apache.org>
AuthorDate: Tue Sep 24 18:51:52 2019 +0200

    KUDU-1938 [java] Add support for VARCHAR pt 4
    
    Adds support for VARCHAR type to the Java and Spark clients. The
    kudu-client only changes would break tests in kudu-spark and kudu-backup
    so this patch also incorporates changes in these subprojects.
    
    Change-Id: I03edf5e65409e895512d5cd81a607180632e8995
    Reviewed-on: http://gerrit.cloudera.org:8080/14050
    Reviewed-by: Grant Henke <gr...@apache.org>
    Tested-by: Kudu Jenkins
---
 .../src/main/protobuf/backup.proto                 |  1 +
 .../org/apache/kudu/backup/TableMetadata.scala     |  7 ++
 .../main/java/org/apache/kudu/ColumnSchema.java    | 19 ++++-
 .../java/org/apache/kudu/ColumnTypeAttributes.java | 45 ++++++++++--
 .../src/main/java/org/apache/kudu/Type.java        |  5 +-
 .../apache/kudu/client/ColumnRangePredicate.java   |  5 +-
 .../java/org/apache/kudu/client/KeyEncoder.java    |  9 ++-
 .../java/org/apache/kudu/client/KuduPredicate.java |  8 ++-
 .../java/org/apache/kudu/client/Operation.java     |  3 +-
 .../java/org/apache/kudu/client/PartialRow.java    | 84 +++++++++++++++++++++-
 .../org/apache/kudu/client/ProtobufHelper.java     |  8 +++
 .../java/org/apache/kudu/client/RowResult.java     | 37 +++++++++-
 .../main/java/org/apache/kudu/util/CharUtil.java   | 38 ++++++++++
 .../java/org/apache/kudu/util/DataGenerator.java   |  4 ++
 .../java/org/apache/kudu/util/SchemaGenerator.java | 14 ++++
 .../java/org/apache/kudu/TestColumnSchema.java     | 20 ++++++
 .../org/apache/kudu/client/TestKeyEncoding.java    | 27 +++++--
 .../org/apache/kudu/client/TestKuduClient.java     | 57 +++++++++++++++
 .../org/apache/kudu/client/TestKuduPredicate.java  | 29 ++++++++
 .../org/apache/kudu/client/TestPartialRow.java     | 22 +++++-
 .../java/org/apache/kudu/client/TestRowResult.java |  6 ++
 .../org/apache/kudu/client/TestScanPredicate.java  | 34 ++++++++-
 .../spark/tools/DistributedDataGenerator.scala     |  2 +
 .../org/apache/kudu/spark/kudu/RowConverter.scala  | 10 ++-
 .../org/apache/kudu/spark/kudu/SparkUtil.scala     |  1 +
 .../apache/kudu/spark/kudu/DefaultSourceTest.scala |  2 +-
 .../apache/kudu/spark/kudu/KuduContextTest.scala   |  4 +-
 .../org/apache/kudu/spark/kudu/KuduTestSuite.scala |  8 +++
 .../java/org/apache/kudu/test/ClientTestUtil.java  | 24 ++++++-
 29 files changed, 500 insertions(+), 33 deletions(-)

diff --git a/java/kudu-backup-common/src/main/protobuf/backup.proto b/java/kudu-backup-common/src/main/protobuf/backup.proto
index 5711e46..ef8678a 100644
--- a/java/kudu-backup-common/src/main/protobuf/backup.proto
+++ b/java/kudu-backup-common/src/main/protobuf/backup.proto
@@ -31,6 +31,7 @@ import "google/protobuf/wrappers.proto";
 message ColumnTypeAttributesMetadataPB {
   int32 precision = 1;
   int32 scale = 2;
+  int32 length = 3;
 }
 
 // Maps to the ColumnSchema class.
diff --git a/java/kudu-backup-common/src/main/scala/org/apache/kudu/backup/TableMetadata.scala b/java/kudu-backup-common/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
index 6fc49d3..8bff6a1 100644
--- a/java/kudu-backup-common/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
+++ b/java/kudu-backup-common/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
@@ -110,6 +110,7 @@ object TableMetadata {
       .newBuilder()
       .setPrecision(attributes.getPrecision)
       .setScale(attributes.getScale)
+      .setLength(attributes.getLength)
       .build()
   }
 
@@ -215,6 +216,7 @@ object TableMetadata {
           new ColumnTypeAttributesBuilder()
             .precision(attributes.getPrecision)
             .scale(attributes.getScale)
+            .length(attributes.getLength)
             .build()
         )
       }
@@ -234,6 +236,7 @@ object TableMetadata {
       case Type.INT64 | Type.UNIXTIME_MICROS => row.getLong(columnName)
       case Type.FLOAT => row.getFloat(columnName)
       case Type.DOUBLE => row.getDouble(columnName)
+      case Type.VARCHAR => row.getVarchar(columnName)
       case Type.STRING => row.getString(columnName)
       case Type.BINARY => row.getBinary(columnName)
       case Type.DECIMAL => row.getDecimal(columnName)
@@ -252,6 +255,7 @@ object TableMetadata {
         row.addLong(columnName, value.asInstanceOf[Long])
       case Type.FLOAT => row.addFloat(columnName, value.asInstanceOf[Float])
       case Type.DOUBLE => row.addDouble(columnName, value.asInstanceOf[Double])
+      case Type.VARCHAR => row.addVarchar(columnName, value.asInstanceOf[String])
       case Type.STRING => row.addString(columnName, value.asInstanceOf[String])
       case Type.BINARY =>
         row.addBinary(columnName, value.asInstanceOf[Array[Byte]])
@@ -278,6 +282,8 @@ object TableMetadata {
         String.valueOf(value.asInstanceOf[Float])
       case Type.DOUBLE =>
         String.valueOf(value.asInstanceOf[Double])
+      case Type.VARCHAR =>
+        value.asInstanceOf[String]
       case Type.STRING =>
         value.asInstanceOf[String]
       case Type.BINARY =>
@@ -300,6 +306,7 @@ object TableMetadata {
       case Type.INT64 | Type.UNIXTIME_MICROS => value.toLong
       case Type.FLOAT => value.toFloat
       case Type.DOUBLE => value.toDouble
+      case Type.VARCHAR => value
       case Type.STRING => value
       case Type.BINARY => Base64.decodeBase64(value)
       case Type.DECIMAL => new BigDecimal(value) // TODO: Explicitly pass scale
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java b/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
index 1a69f33..7ae0335 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
@@ -17,8 +17,12 @@
 
 package org.apache.kudu;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Objects;
 
+import org.apache.kudu.util.CharUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
@@ -254,6 +258,8 @@ public class ColumnSchema {
   @InterfaceAudience.Public
   @InterfaceStability.Evolving
   public static class ColumnSchemaBuilder {
+    public static final List<Type> TYPES_WITH_ATTRIBUTES = Arrays.asList(Type.DECIMAL,
+                                                                         Type.VARCHAR);
     private final String name;
     private final Type type;
     private boolean key = false;
@@ -377,7 +383,7 @@ public class ColumnSchema {
      * Set the column type attributes for this column.
      */
     public ColumnSchemaBuilder typeAttributes(ColumnTypeAttributes typeAttributes) {
-      if (type != Type.DECIMAL && typeAttributes != null) {
+      if (typeAttributes != null && !TYPES_WITH_ATTRIBUTES.contains(type)) {
         throw new IllegalArgumentException(
             "ColumnTypeAttributes are not used on " + type + " columns");
       }
@@ -414,10 +420,19 @@ public class ColumnSchema {
       if (wireType == null) {
         this.wireType = type.getDataType(typeAttributes);
       }
+      if (type == Type.VARCHAR) {
+        if (typeAttributes == null || !typeAttributes.hasLength()
+          || typeAttributes.getLength() < CharUtil.MIN_VARCHAR_LENGTH
+          || typeAttributes.getLength() > CharUtil.MAX_VARCHAR_LENGTH) {
+          throw new IllegalArgumentException(
+            String.format("VARCHAR's length must be set and between %d and %d",
+                          CharUtil.MIN_VARCHAR_LENGTH, CharUtil.MAX_VARCHAR_LENGTH));
+        }
+      }
       return new ColumnSchema(name, type,
                               key, nullable, defaultValue,
                               desiredBlockSize, encoding, compressionAlgorithm,
                               typeAttributes, wireType, comment);
-    }
+     }
   }
 }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/ColumnTypeAttributes.java b/java/kudu-client/src/main/java/org/apache/kudu/ColumnTypeAttributes.java
index bc0d88e..b97c519 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/ColumnTypeAttributes.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/ColumnTypeAttributes.java
@@ -32,12 +32,18 @@ public class ColumnTypeAttributes {
   private final boolean hasScale;
   private final int scale;
 
+  private final boolean hasLength;
+  private final int length;
+
   private ColumnTypeAttributes(boolean hasPrecision, int precision,
-                               boolean hasScale, int scale) {
+                               boolean hasScale, int scale,
+                               boolean hasLength, int length) {
     this.hasPrecision = hasPrecision;
     this.precision = precision;
     this.hasScale = hasScale;
     this.scale = scale;
+    this.hasLength = hasLength;
+    this.length = length;
   }
 
   /**
@@ -68,6 +74,20 @@ public class ColumnTypeAttributes {
     return scale;
   }
 
+  /**
+   * Returns true if the length is set;
+   */
+  public boolean hasLength() {
+    return hasLength;
+  }
+
+  /**
+   * Returns the length;
+   */
+  public int getLength() {
+    return length;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -91,13 +111,19 @@ public class ColumnTypeAttributes {
     if (scale != that.scale) {
       return false;
     }
+    if (hasLength != that.hasLength) {
+      return false;
+    }
+    if (length != that.length) {
+      return false;
+    }
 
     return true;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(hasPrecision, precision, hasScale, scale);
+    return Objects.hash(hasPrecision, precision, hasScale, scale, hasLength, length);
   }
 
   /**
@@ -110,6 +136,8 @@ public class ColumnTypeAttributes {
   public String toStringForType(Type type) {
     if (type == Type.DECIMAL) {
       return "(" + precision + ", " + scale + ")";
+    } else if (type == Type.VARCHAR) {
+      return "(" + length + ")";
     } else {
       return "";
     }
@@ -118,7 +146,8 @@ public class ColumnTypeAttributes {
   @Override
   public String toString() {
     return "hasPrecision: " + hasPrecision + ", precision: " + precision +
-        ", hasScale: " + hasScale + ", scale: " + scale;
+        ", hasScale: " + hasScale + ", scale: " + scale +
+        ", hasLength: " + hasLength + ", length: " + length;
   }
 
   /**
@@ -132,6 +161,8 @@ public class ColumnTypeAttributes {
     private int precision;
     private boolean hasScale;
     private int scale;
+    private boolean hasLength;
+    private int length;
 
     /**
      * Set the precision. Only used for Decimal columns.
@@ -151,12 +182,18 @@ public class ColumnTypeAttributes {
       return this;
     }
 
+    public ColumnTypeAttributesBuilder length(int length) {
+      this.hasLength = true;
+      this.length = length;
+      return this;
+    }
+
     /**
      * Builds a {@link ColumnTypeAttributes} using the passed parameters.
      * @return a new {@link ColumnTypeAttributes}
      */
     public ColumnTypeAttributes build() {
-      return new ColumnTypeAttributes(hasPrecision, precision, hasScale, scale);
+      return new ColumnTypeAttributes(hasPrecision, precision, hasScale, scale, hasLength, length);
     }
   }
 }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/Type.java b/java/kudu-client/src/main/java/org/apache/kudu/Type.java
index ec7d542..6d31910 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/Type.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/Type.java
@@ -48,7 +48,8 @@ public enum Type {
   FLOAT(DataType.FLOAT, "float"),
   DOUBLE(DataType.DOUBLE, "double"),
   UNIXTIME_MICROS(DataType.UNIXTIME_MICROS, "unixtime_micros"),
-  DECIMAL(Arrays.asList(DataType.DECIMAL32, DataType.DECIMAL64, DataType.DECIMAL128), "decimal");
+  DECIMAL(Arrays.asList(DataType.DECIMAL32, DataType.DECIMAL64, DataType.DECIMAL128), "decimal"),
+  VARCHAR(DataType.VARCHAR, "varchar");
 
   private final ImmutableList<DataType> dataTypes;
   private final String name;
@@ -143,6 +144,7 @@ public enum Type {
     switch (type) {
       case STRING:
       case BINARY:
+      case VARCHAR:
         return 8 + 8; // offset then string length
       case BOOL:
       case INT8:
@@ -171,6 +173,7 @@ public enum Type {
     switch (type) {
       case STRING: return STRING;
       case BINARY: return BINARY;
+      case VARCHAR: return VARCHAR;
       case BOOL: return BOOL;
       case INT8: return INT8;
       case INT16: return INT16;
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ColumnRangePredicate.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ColumnRangePredicate.java
index 697501d..8a8a26e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ColumnRangePredicate.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ColumnRangePredicate.java
@@ -97,6 +97,7 @@ public class ColumnRangePredicate {
         return KuduPredicate.newComparisonPredicate(column, op, Bytes.getFloat(bound));
       case DOUBLE:
         return KuduPredicate.newComparisonPredicate(column, op, Bytes.getDouble(bound));
+      case VARCHAR:
       case STRING:
         return KuduPredicate.newComparisonPredicate(column, op, Bytes.getString(bound));
       case BINARY:
@@ -185,7 +186,7 @@ public class ColumnRangePredicate {
    * @param lowerBound value for the lower bound
    */
   public void setLowerBound(String lowerBound) {
-    checkColumn(Type.STRING);
+    checkColumn(Type.STRING, Type.VARCHAR);
     setLowerBoundInternal(lowerBound.getBytes(UTF_8));
   }
 
@@ -282,7 +283,7 @@ public class ColumnRangePredicate {
    * @param upperBound value for the upper bound
    */
   public void setUpperBound(String upperBound) {
-    checkColumn(Type.STRING);
+    checkColumn(Type.STRING, Type.VARCHAR);
     setUpperBoundInternal(upperBound.getBytes(UTF_8));
   }
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
index de7ac79..a2d06bf 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
@@ -21,6 +21,7 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
@@ -144,7 +145,8 @@ class KeyEncoder {
                                                     column.getName()));
     }
     final Type type = column.getType();
-    if (type == Type.STRING || type == Type.BINARY) {
+    if (type == Type.STRING || type == Type.BINARY ||
+        type == Type.VARCHAR) {
       encodeBinary(row.getVarLengthData().get(columnIdx), isLast, buf);
     } else {
       encodeSignedInt(row.getRowAlloc(),
@@ -337,6 +339,11 @@ class KeyEncoder {
         row.addBinary(idx, binary);
         break;
       }
+      case VARCHAR: {
+        byte[] binary = decodeBinaryColumn(buf, isLast);
+        row.addVarchar(idx, new String(binary, StandardCharsets.UTF_8));
+        break;
+      }
       case STRING: {
         byte[] binary = decodeBinaryColumn(buf, isLast);
         row.addStringUtf8(idx, binary);
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java
index cfba040..a8c9e40 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java
@@ -413,7 +413,7 @@ public class KuduPredicate {
   public static KuduPredicate newComparisonPredicate(ColumnSchema column,
                                                      ComparisonOp op,
                                                      String value) {
-    checkColumn(column, Type.STRING);
+    checkColumn(column, Type.STRING, Type.VARCHAR);
 
     byte[] bytes = Bytes.fromString(value);
     if (op == ComparisonOp.LESS_EQUAL) {
@@ -495,6 +495,7 @@ public class KuduPredicate {
    *  Type.FLOAT -> java.lang.Float
    *  Type.DOUBLE -> java.lang.Double
    *  Type.STRING -> java.lang.String
+   *  Type.VARCHAR -> java.lang.String
    *  Type.BINARY -> byte[]
    *  Type.DECIMAL -> java.math.BigDecimal
    *
@@ -598,7 +599,7 @@ public class KuduPredicate {
               column.getTypeAttributes().getPrecision()));
         }
     } else if (t instanceof String) {
-      checkColumn(column, Type.STRING);
+      checkColumn(column, Type.STRING, Type.VARCHAR);
       for (T value : values) {
         vals.add(Bytes.fromString((String) value));
       }
@@ -984,6 +985,7 @@ public class KuduPredicate {
       case DOUBLE:
         return Double.compare(Bytes.getDouble(a), Bytes.getDouble(b));
       case STRING:
+      case VARCHAR:
       case BINARY:
         return UnsignedBytes.lexicographicalComparator().compare(a, b);
       case DECIMAL128:
@@ -1036,6 +1038,7 @@ public class KuduPredicate {
         return m < n && Math.nextAfter(m, Double.POSITIVE_INFINITY) == n;
       }
       case STRING:
+      case VARCHAR:
       case BINARY: {
         if (a.length + 1 != b.length || b[a.length] != 0) {
           return false;
@@ -1153,6 +1156,7 @@ public class KuduPredicate {
       case UNIXTIME_MICROS: return TimestampUtil.timestampToString(Bytes.getLong(value));
       case FLOAT: return Float.toString(Bytes.getFloat(value));
       case DOUBLE: return Double.toString(Bytes.getDouble(value));
+      case VARCHAR:
       case STRING: {
         String v = Bytes.getString(value);
         StringBuilder sb = new StringBuilder(2 + v.length());
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index 5ebda0f..2c02e5d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -384,7 +384,8 @@ public abstract class Operation extends KuduRpc<OperationResponse> {
         ColumnSchema col = schema.getColumnByIndex(colIdx);
         // Keys should always be specified, maybe check?
         if (row.isSet(colIdx) && !row.isSetToNull(colIdx)) {
-          if (col.getType() == Type.STRING || col.getType() == Type.BINARY) {
+          if (col.getType() == Type.STRING || col.getType() == Type.BINARY ||
+              col.getType() == Type.VARCHAR) {
             ByteBuffer varLengthData = row.getVarLengthData().get(colIdx);
             varLengthData.reset();
             rows.putLong(indirectWrittenBytes);
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java b/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
index 0a5ddd6..341792d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
@@ -19,6 +19,7 @@ package org.apache.kudu.client;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -30,7 +31,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
-import org.jboss.netty.util.CharsetUtil;
 
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.ColumnTypeAttributes;
@@ -634,6 +634,44 @@ public class PartialRow {
   }
 
   /**
+   * Add a VARCHAR for the specified column.
+   *
+   * Truncates val to the length of the column in characters.
+   *
+   * @param columnIndex Index of the column
+   * @param val value to add
+   * @throws IllegalArgumentException if the column doesn't exist, is the wrong type
+   *         or the string is not UTF-8
+   * @throws IllegalStateException if the row was already applied
+   */
+  public void addVarchar(int columnIndex, String val) {
+    ColumnSchema column = schema.getColumnByIndex(columnIndex);
+    checkColumn(column, Type.VARCHAR);
+    checkNotFrozen();
+    int length = column.getTypeAttributes().getLength();
+    if (length < val.length()) {
+      val = val.substring(0, length);
+    }
+    byte[] bytes = Bytes.fromString(val);
+    addVarLengthData(columnIndex, bytes);
+  }
+
+  /**
+   * Add a VARCHAR for the specified column.
+   *
+   * Truncates val to the length of the column in characters.
+   *
+   * @param columnName Name of the column
+   * @param val value to add
+   * @throws IllegalArgumentException if the column doesn't exist, is the wrong type
+   *         or the string is not UTF-8
+   * @throws IllegalStateException if the row was already applied
+   */
+  public void addVarchar(String columnName, String val) {
+    addVarchar(schema.getColumnIndex(columnName), val);
+  }
+
+  /**
    * Get the specified column's string.
    * @param columnName name of the column to get data for
    * @return a string
@@ -655,7 +693,33 @@ public class PartialRow {
   public String getString(int columnIndex) {
     checkColumn(schema.getColumnByIndex(columnIndex), Type.STRING);
     checkValue(columnIndex);
-    return new String(getVarLengthData(columnIndex).array(), CharsetUtil.UTF_8);
+    return new String(getVarLengthData(columnIndex).array(), StandardCharsets.UTF_8);
+  }
+
+  /**
+   * Get the specified column's VARCHAR.
+   * @param columnName Name of the column to get the data for
+   * @return a VARCHAR
+   * @throws IllegalArgumentException if the column is null, is unset,
+   *         or if the type doesn't match the column's type
+   * @throws IndexOutOfBoundsException if the column doesn't exist
+   */
+  public String getVarchar(String columnName) {
+    return getVarchar(this.schema.getColumnIndex(columnName));
+  }
+
+  /**
+   * Get the specified column's VARCHAR.
+   * @param columnIndex Column index in the schema
+   * @return a VARCHAR
+   * @throws IllegalArgumentException if the column is null, is unset,
+   *         or if the type doesn't match the column's type
+   * @throws IndexOutOfBoundsException if the column doesn't exist
+   */
+  public String getVarchar(int columnIndex) {
+    checkColumn(schema.getColumnByIndex(columnIndex), Type.VARCHAR);
+    checkValue(columnIndex);
+    return new String(getVarLengthData(columnIndex).array(), StandardCharsets.UTF_8);
   }
 
   /**
@@ -905,6 +969,7 @@ public class PartialRow {
    *  Type.FLOAT -> java.lang.Float
    *  Type.DOUBLE -> java.lang.Double
    *  Type.STRING -> java.lang.String
+   *  Type.VARCHAR -> java.lang.String
    *  Type.BINARY -> byte[]
    *  Type.DECIMAL -> java.math.BigDecimal
    *
@@ -932,6 +997,7 @@ public class PartialRow {
    *  Type.FLOAT -> java.lang.Float
    *  Type.DOUBLE -> java.lang.Double
    *  Type.STRING -> java.lang.String
+   *  Type.VARCHAR -> java.lang.String
    *  Type.BINARY -> byte[] or java.lang.ByteBuffer
    *  Type.DECIMAL -> java.math.BigDecimal
    *
@@ -960,6 +1026,7 @@ public class PartialRow {
    *  Type.FLOAT -> java.lang.Float
    *  Type.DOUBLE -> java.lang.Double
    *  Type.STRING -> java.lang.String
+   *  Type.VARCHAR -> java.lang.String
    *  Type.BINARY -> byte[] or java.lang.ByteBuffer
    *  Type.DECIMAL -> java.math.BigDecimal
    *
@@ -993,6 +1060,7 @@ public class PartialRow {
         case FLOAT: addFloat(columnIndex, (Float) val); break;
         case DOUBLE: addDouble(columnIndex, (Double) val); break;
         case STRING: addString(columnIndex, (String) val); break;
+        case VARCHAR: addVarchar(columnIndex, (String) val); break;
         case BINARY:
           if (val instanceof byte[]) {
             addBinary(columnIndex, (byte[]) val);
@@ -1027,6 +1095,7 @@ public class PartialRow {
    *  Type.FLOAT -> java.lang.Float
    *  Type.DOUBLE -> java.lang.Double
    *  Type.STRING -> java.lang.String
+   *  Type.VARCHAR -> java.lang.String
    *  Type.BINARY -> byte[]
    *  Type.DECIMAL -> java.math.BigDecimal
    *
@@ -1047,6 +1116,7 @@ public class PartialRow {
       case UNIXTIME_MICROS: return getTimestamp(columnIndex);
       case FLOAT: return getFloat(columnIndex);
       case DOUBLE: return getDouble(columnIndex);
+      case VARCHAR: return getVarchar(columnIndex);
       case STRING: return getString(columnIndex);
       case BINARY: return getBinaryCopy(columnIndex);
       case DECIMAL: return getDecimal(columnIndex);
@@ -1297,13 +1367,14 @@ public class PartialRow {
         sb.append(Bytes.getDecimal(rowAlloc, schema.getColumnOffset(idx),
             typeAttributes.getPrecision(), typeAttributes.getScale()));
         return;
+      case VARCHAR:
       case BINARY:
       case STRING:
         ByteBuffer value = getVarLengthData().get(idx).duplicate();
         value.reset(); // Make sure we start at the beginning.
         byte[] data = new byte[value.limit() - value.position()];
         value.get(data);
-        if (col.getType() == Type.STRING) {
+        if (col.getType() == Type.STRING || col.getType() == Type.VARCHAR) {
           sb.append('"');
           StringUtil.appendEscapedSQLString(Bytes.getString(data), sb);
           sb.append('"');
@@ -1357,6 +1428,9 @@ public class PartialRow {
       case BINARY:
         addBinary(index, AsyncKuduClient.EMPTY_ARRAY);
         break;
+      case VARCHAR:
+        addVarchar(index, "");
+        break;
       default:
         throw new RuntimeException("unreachable");
     }
@@ -1385,6 +1459,7 @@ public class PartialRow {
             getPositionInRowAllocAndSetBitSet(index), value.length);
         break;
       }
+      case VARCHAR:
       case STRING:
       case BINARY: {
         addVarLengthData(index, value);
@@ -1478,6 +1553,7 @@ public class PartialRow {
         Bytes.setBigDecimal(rowAlloc, existing.add(smallest), precision, offset);
         return true;
       }
+      case VARCHAR:
       case STRING:
       case BINARY: {
         ByteBuffer data = varLengthData.get(index);
@@ -1566,6 +1642,7 @@ public class PartialRow {
         int scale = typeAttributes.getScale();
         return Bytes.getDecimal(a.rowAlloc, offset, precision, scale)
             .equals(Bytes.getDecimal(b.rowAlloc, offset, precision, scale));
+      case VARCHAR:
       case STRING:
       case BINARY: {
         ByteBuffer aData = a.varLengthData.get(index).duplicate();
@@ -1653,6 +1730,7 @@ public class PartialRow {
         return val.add(smallestVal).equals(
                 Bytes.getDecimal(upper.rowAlloc, offset, precision, scale));
       }
+      case VARCHAR:
       case STRING:
       case BINARY: {
         // Check that b is 1 byte bigger than a, the extra byte is 0, and the other bytes are equal.
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
index 45ebe20..a8e6176 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
@@ -139,6 +139,9 @@ public class ProtobufHelper {
     if (typeAttributes.hasScale()) {
       builder.setScale(typeAttributes.getScale());
     }
+    if (typeAttributes.hasLength()) {
+      builder.setLength(typeAttributes.getLength());
+    }
     return builder.build();
   }
 
@@ -173,6 +176,9 @@ public class ProtobufHelper {
     if(pb.hasScale()) {
       builder.scale(pb.getScale());
     }
+    if (pb.hasLength()) {
+      builder.length(pb.getLength());
+    }
     return builder.build();
   }
 
@@ -270,6 +276,7 @@ public class ProtobufHelper {
       case INT64:
       case UNIXTIME_MICROS:
         return Bytes.fromLong((Long) value);
+      case VARCHAR:
       case STRING:
         return ((String) value).getBytes(UTF_8);
       case BINARY:
@@ -306,6 +313,7 @@ public class ProtobufHelper {
         return buf.getFloat();
       case DOUBLE:
         return buf.getDouble();
+      case VARCHAR:
       case STRING:
         return value.toStringUtf8();
       case BINARY:
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
index cf57160..c1558c4 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
@@ -398,9 +398,14 @@ public class RowResult {
    * @throws IndexOutOfBoundsException if the column doesn't exist
    */
   public String getString(int columnIndex) {
+    checkType(columnIndex, Type.STRING);
+    return getVarLengthData(columnIndex);
+  }
+
+  private String getVarLengthData(int columnIndex) {
     checkValidColumn(columnIndex);
     checkNull(columnIndex);
-    checkType(columnIndex, Type.STRING);
+    checkType(columnIndex, Type.STRING, Type.VARCHAR);
     // C++ puts a Slice in rowData which is 16 bytes long for simplicity, but we only support ints.
     long offset = getLongOrOffset(columnIndex);
     long length = rowData.getLong(getCurrentRowDataOffsetForColumn(columnIndex) + 8);
@@ -412,6 +417,30 @@ public class RowResult {
   }
 
   /**
+   * Get the specified column's varchar.
+   * @param columnIndex Column index in the schema
+   * @return a string
+   * @throws IllegalArgumentException if the column is null
+   * or if the type doesn't match the column's type
+   * @throws IndexOutOfBoundsException if the column doesn't exist
+   */
+  public String getVarchar(int columnIndex) {
+    checkType(columnIndex, Type.VARCHAR);
+    return getVarLengthData(columnIndex);
+  }
+
+  /**
+   * Get the specified column's varchar.
+   * @param columnName name of the column to get data for
+   * @return a string
+   * @throws IllegalArgumentException if the column doesn't exist, is null,
+   * or if the type doesn't match the column's type
+   */
+  public String getVarchar(String columnName) {
+    return getVarchar(this.schema.getColumnIndex(columnName));
+  }
+
+  /**
    * Get a copy of the specified column's binary data.
    * @param columnName name of the column to get data for
    * @return a byte[] with the binary data.
@@ -541,6 +570,7 @@ public class RowResult {
    *  Type.UNIXTIME_MICROS -> java.sql.Timestamp
    *  Type.FLOAT -> java.lang.Float
    *  Type.DOUBLE -> java.lang.Double
+   *  Type.VARCHAR -> java.lang.String
    *  Type.STRING -> java.lang.String
    *  Type.BINARY -> byte[]
    *  Type.DECIMAL -> java.math.BigDecimal
@@ -568,6 +598,7 @@ public class RowResult {
    *  Type.UNIXTIME_MICROS -> java.sql.Timestamp
    *  Type.FLOAT -> java.lang.Float
    *  Type.DOUBLE -> java.lang.Double
+   *  Type.VARCHAR -> java.lang.String
    *  Type.STRING -> java.lang.String
    *  Type.BINARY -> byte[]
    *  Type.DECIMAL -> java.math.BigDecimal
@@ -589,6 +620,7 @@ public class RowResult {
       case UNIXTIME_MICROS: return getTimestamp(columnIndex);
       case FLOAT: return getFloat(columnIndex);
       case DOUBLE: return getDouble(columnIndex);
+      case VARCHAR: return getVarchar(columnIndex);
       case STRING: return getString(columnIndex);
       case BINARY: return getBinaryCopy(columnIndex);
       case DECIMAL: return getDecimal(columnIndex);
@@ -721,6 +753,9 @@ public class RowResult {
           case UNIXTIME_MICROS: {
             buf.append(TimestampUtil.timestampToString(getTimestamp(i)));
           } break;
+          case VARCHAR:
+            buf.append(getVarchar(i));
+            break;
           case STRING:
             buf.append(getString(i));
             break;
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/CharUtil.java b/java/kudu-client/src/main/java/org/apache/kudu/util/CharUtil.java
new file mode 100644
index 0000000..bd1639d
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/CharUtil.java
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.util;
+
+import org.apache.kudu.ColumnTypeAttributes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class CharUtil {
+  public static final int MIN_VARCHAR_LENGTH = 1;
+  public static final int MAX_VARCHAR_LENGTH = 65535;
+
+  /**
+   * Convenience method to create column type attributes for VARCHAR columns.
+   * @param length the length.
+   * @return the column type attributes.
+   */
+  public static ColumnTypeAttributes typeAttributes(int length) {
+    return new ColumnTypeAttributes.ColumnTypeAttributesBuilder()
+      .length(length)
+      .build();
+  }
+}
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/DataGenerator.java b/java/kudu-client/src/main/java/org/apache/kudu/util/DataGenerator.java
index d8ad50b..3dffba4 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/util/DataGenerator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/DataGenerator.java
@@ -108,6 +108,10 @@ public class DataGenerator {
           row.addDouble(i, random.nextDouble()); break;
         case DECIMAL:
           row.addDecimal(i, randomDecimal(col.getTypeAttributes(), random)); break;
+        case VARCHAR:
+          row.addVarchar(i, randomString(Math.min(col.getTypeAttributes().getLength(),
+                                                  stringLength), random));
+          break;
         case STRING:
           row.addString(i, randomString(stringLength, random)); break;
         case BINARY:
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/SchemaGenerator.java b/java/kudu-client/src/main/java/org/apache/kudu/util/SchemaGenerator.java
index 6a97382..adac3be 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/util/SchemaGenerator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/SchemaGenerator.java
@@ -126,6 +126,14 @@ public class SchemaGenerator {
       builder.typeAttributes(typeAttributes);
     }
 
+    if (type == Type.VARCHAR) {
+      int length = random.nextInt(
+        (CharUtil.MAX_VARCHAR_LENGTH - CharUtil.MIN_VARCHAR_LENGTH) + 1)
+        + CharUtil.MIN_VARCHAR_LENGTH;
+      typeAttributes = CharUtil.typeAttributes(length);
+      builder.typeAttributes(typeAttributes);
+    }
+
     // Sometimes set a column default value.
     if (random.nextFloat() <= defaultRate) {
       switch (type) {
@@ -154,6 +162,11 @@ public class SchemaGenerator {
         case DECIMAL:
           builder.defaultValue(randomDecimal(typeAttributes, random));
           break;
+        case VARCHAR:
+          builder.defaultValue(randomString(Math.min(DEFAULT_BINARY_LENGTH,
+                                                     typeAttributes.getLength()),
+                                            random));
+          break;
         case STRING:
           builder.defaultValue(randomString(DEFAULT_BINARY_LENGTH, random));
           break;
@@ -206,6 +219,7 @@ public class SchemaGenerator {
             Encoding.PLAIN_ENCODING,
             Encoding.BIT_SHUFFLE));
         break;
+      case VARCHAR:
       case STRING:
       case BINARY:
         validEncodings.retainAll(Arrays.asList(
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/TestColumnSchema.java b/java/kudu-client/src/test/java/org/apache/kudu/TestColumnSchema.java
index 6ca8114..3da514d 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/TestColumnSchema.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/TestColumnSchema.java
@@ -21,9 +21,11 @@ import static org.junit.Assert.assertNotEquals;
 
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
 import org.apache.kudu.test.junit.RetryRule;
+import org.apache.kudu.util.CharUtil;
 import org.apache.kudu.util.DecimalUtil;
 
 public class TestColumnSchema {
@@ -31,6 +33,9 @@ public class TestColumnSchema {
   @Rule
   public RetryRule retryRule = new RetryRule();
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   @Test
   public void testToString() {
     ColumnSchema col1 = new ColumnSchemaBuilder("col1", Type.STRING).build();
@@ -95,4 +100,19 @@ public class TestColumnSchema {
     ColumnSchema commentInt3 = new ColumnSchemaBuilder("col1", Type.INT32).comment("Test").build();
     assertNotEquals(commentInt1, commentInt3);
   }
+  @Test
+  public void testOutOfRangeVarchar() throws Exception {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("VARCHAR's length must be set and between 1 and 65535");
+    new ColumnSchemaBuilder("col1", Type.VARCHAR)
+      .typeAttributes(CharUtil.typeAttributes(70000)).build();
+  }
+
+  @Test
+  public void testVarcharWithoutLength() throws Exception {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("VARCHAR's length must be set and between 1 and 65535");
+    new ColumnSchemaBuilder("col1", Type.VARCHAR).build();
+  }
+
 }
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
index 376351f..ec9670d 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
@@ -37,6 +37,7 @@ import org.apache.kudu.Type;
 import org.apache.kudu.client.PartitionSchema.HashBucketSchema;
 import org.apache.kudu.client.PartitionSchema.RangeSchema;
 import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.util.CharUtil;
 import org.apache.kudu.util.DecimalUtil;
 
 public class TestKeyEncoding {
@@ -186,6 +187,8 @@ public class TestKeyEncoding {
             .typeAttributes(DecimalUtil.typeAttributes(DecimalUtil.MAX_DECIMAL64_PRECISION, 0)),
         new ColumnSchemaBuilder("decimal128", Type.DECIMAL).key(true)
             .typeAttributes(DecimalUtil.typeAttributes(DecimalUtil.MAX_DECIMAL128_PRECISION, 0)),
+        new ColumnSchemaBuilder("varchar", Type.VARCHAR).key(true)
+            .typeAttributes(CharUtil.typeAttributes(10)),
         new ColumnSchemaBuilder("string", Type.STRING).key(true),
         new ColumnSchemaBuilder("binary", Type.BINARY).key(true));
 
@@ -199,6 +202,7 @@ public class TestKeyEncoding {
     rowA.addDecimal("decimal32", BigDecimal.valueOf(5));
     rowA.addDecimal("decimal64", BigDecimal.valueOf(6));
     rowA.addDecimal("decimal128", BigDecimal.valueOf(7));
+    rowA.addVarchar("varchar", "");
     rowA.addString("string", "");
     rowA.addBinary("binary", "".getBytes(UTF_8));
 
@@ -212,6 +216,7 @@ public class TestKeyEncoding {
                           (byte) 0x80, 0, 0, 5,
                           (byte) 0x80, 0, 0, 0, 0, 0, 0, 6,
                           (byte) 0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7,
+                          0, 0,
                           0, 0
                       });
     assertEquals(rowA.stringifyRowKey(),
@@ -227,6 +232,7 @@ public class TestKeyEncoding {
     rowB.addDecimal("decimal32", BigDecimal.valueOf(5));
     rowB.addDecimal("decimal64", BigDecimal.valueOf(6));
     rowB.addDecimal("decimal128", BigDecimal.valueOf(7));
+    rowB.addVarchar("varchar", "abc\1\0defghij");
     rowB.addString("string", "abc\1\0def");
     rowB.addBinary("binary", "\0\1binary".getBytes(UTF_8));
 
@@ -240,6 +246,7 @@ public class TestKeyEncoding {
                           (byte) 0x80, 0, 0, 5,
                           (byte) 0x80, 0, 0, 0, 0, 0, 0, 6,
                           (byte) 0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7,
+                          'a', 'b', 'c', 1, 0, 1, 'd', 'e', 'f', 'g', 'h', 0, 0,
                           'a', 'b', 'c', 1, 0, 1, 'd', 'e', 'f', 0, 0,
                           0, 1, 'b', 'i', 'n', 'a', 'r', 'y',
                       });
@@ -254,6 +261,7 @@ public class TestKeyEncoding {
     rowC.addDecimal("decimal32", BigDecimal.valueOf(5));
     rowC.addDecimal("decimal64", BigDecimal.valueOf(6));
     rowC.addDecimal("decimal128", BigDecimal.valueOf(7));
+    rowC.addVarchar("varchar", "abc\n12345678");
     rowC.addString("string", "abc\n123");
     rowC.addBinary("binary", "\0\1\2\3\4\5".getBytes(UTF_8));
 
@@ -267,6 +275,7 @@ public class TestKeyEncoding {
                           (byte) 0x80, 0, 0, 5,
                           (byte) 0x80, 0, 0, 0, 0, 0, 0, 6,
                           (byte) 0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7,
+                          'a', 'b', 'c', '\n', '1', '2', '3', '4', '5', '6', 0, 0,
                           'a', 'b', 'c', '\n', '1', '2', '3', 0, 0,
                           0, 1, 2, 3, 4, 5,
                       });
@@ -281,6 +290,7 @@ public class TestKeyEncoding {
     rowD.addDecimal("decimal32", BigDecimal.valueOf(-5));
     rowD.addDecimal("decimal64", BigDecimal.valueOf(-6));
     rowD.addDecimal("decimal128", BigDecimal.valueOf(-7));
+    rowD.addVarchar("varchar", "\0abc\n\1\1\0 123\1\0");
     rowD.addString("string", "\0abc\n\1\1\0 123\1\0");
     rowD.addBinary("binary", "\0\1\2\3\4\5\0".getBytes(UTF_8));
 
@@ -294,6 +304,7 @@ public class TestKeyEncoding {
                           (byte) 127, -1, -1, -5,
                           (byte) 127, -1, -1, -1, -1, -1, -1, -6,
                           (byte) 127, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -7,
+                          0, 1, 'a', 'b', 'c', '\n', 1, 1, 0, 1, ' ', '1', 0, 0,
                           0, 1, 'a', 'b', 'c', '\n', 1, 1, 0, 1, ' ', '1', '2', '3', 1, 0, 1, 0, 0,
                           0, 1, 2, 3, 4, 5, 0,
                       });
@@ -382,6 +393,8 @@ public class TestKeyEncoding {
           .typeAttributes(DecimalUtil.typeAttributes(DecimalUtil.MAX_DECIMAL64_PRECISION, 0)),
         new ColumnSchemaBuilder("decimal128", Type.DECIMAL).key(true)
           .typeAttributes(DecimalUtil.typeAttributes(DecimalUtil.MAX_DECIMAL128_PRECISION, 0)),
+        new ColumnSchemaBuilder("varchar", Type.VARCHAR).key(true)
+          .typeAttributes(CharUtil.typeAttributes(10)),
         new ColumnSchemaBuilder("bool", Type.BOOL),       // not primary key type
         new ColumnSchemaBuilder("float", Type.FLOAT),     // not primary key type
         new ColumnSchemaBuilder("double", Type.DOUBLE));  // not primary key type
@@ -402,9 +415,10 @@ public class TestKeyEncoding {
     row.addDecimal(7, BigDecimal.valueOf(DecimalUtil.MAX_UNSCALED_DECIMAL32));
     row.addDecimal(8, BigDecimal.valueOf(DecimalUtil.MAX_UNSCALED_DECIMAL64));
     row.addDecimal(9, new BigDecimal(DecimalUtil.MAX_UNSCALED_DECIMAL128));
-    row.addBoolean(10, true);
-    row.addFloat(11, 8.8f);
-    row.addDouble(12, 9.9);
+    row.addVarchar(10, "varchar bar");
+    row.addBoolean(11, true);
+    row.addFloat(12, 7.8f);
+    row.addDouble(13, 9.9);
     session.apply(insert);
     session.close();
 
@@ -426,9 +440,10 @@ public class TestKeyEncoding {
           .compareTo(rr.getDecimal(8)) == 0);
       assertTrue(new BigDecimal(DecimalUtil.MAX_UNSCALED_DECIMAL128)
           .compareTo(rr.getDecimal(9)) == 0);
-      assertTrue(rr.getBoolean(10));
-      assertEquals(8.8f, rr.getFloat(11), .001f);
-      assertEquals(9.9, rr.getDouble(12), .001);
+      assertEquals("varchar ba", rr.getVarchar(10));
+      assertTrue(rr.getBoolean(11));
+      assertEquals(7.8f, rr.getFloat(12), .001f);
+      assertEquals(9.9, rr.getDouble(13), .001);
     }
   }
 }
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 288dd81..91703dc 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -24,6 +24,7 @@ import static org.apache.kudu.client.KuduPredicate.ComparisonOp.LESS_EQUAL;
 import static org.apache.kudu.test.ClientTestUtil.countRowsInScan;
 import static org.apache.kudu.test.ClientTestUtil.createBasicSchemaInsert;
 import static org.apache.kudu.test.ClientTestUtil.createManyStringsSchema;
+import static org.apache.kudu.test.ClientTestUtil.createManyVarcharsSchema;
 import static org.apache.kudu.test.ClientTestUtil.createSchemaWithBinaryColumns;
 import static org.apache.kudu.test.ClientTestUtil.createSchemaWithDecimalColumns;
 import static org.apache.kudu.test.ClientTestUtil.createSchemaWithTimestampColumns;
@@ -417,6 +418,62 @@ public class TestKuduClient {
   }
 
   /**
+   * Test inserting and retrieving VARCHAR columns.
+   */
+  @Test(timeout = 100000)
+  public void testVarchars() throws Exception {
+    Schema schema = createManyVarcharsSchema();
+    client.createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
+
+    KuduSession session = client.newSession();
+    KuduTable table = client.openTable(TABLE_NAME);
+    for (int i = 0; i < 100; i++) {
+      Insert insert = table.newInsert();
+      PartialRow row = insert.getRow();
+      row.addVarchar("key", String.format("key_%02d", i));
+      row.addVarchar("c2", "c2_" + i);
+      if (i % 2 == 1) {
+        row.addVarchar("c3", "c3_" + i);
+      }
+      row.addVarchar("c4", "c4_" + i);
+      // NOTE: we purposefully add the strings in a non-left-to-right
+      // order to verify that we still place them in the right position in
+      // the row.
+      row.addVarchar("c1", "c1_" + i);
+      session.apply(insert);
+      if (i % 50 == 0) {
+        session.flush();
+      }
+    }
+    session.flush();
+
+    List<String> rowStrings = scanTableToStrings(table);
+    assertEquals(100, rowStrings.size());
+    assertEquals(
+      "VARCHAR key(10)=key_03, VARCHAR c1(10)=c1_3, VARCHAR c2(10)=c2_3," +
+        " VARCHAR c3(10)=c3_3, VARCHAR c4(10)=c4_3",
+      rowStrings.get(3));
+    assertEquals(
+      "VARCHAR key(10)=key_04, VARCHAR c1(10)=c1_4, VARCHAR c2(10)=c2_4," +
+        " VARCHAR c3(10)=NULL, VARCHAR c4(10)=c4_4",
+      rowStrings.get(4));
+
+    KuduScanner scanner = client.newScannerBuilder(table).build();
+
+    assertTrue("Scanner should have returned row", scanner.hasMoreRows());
+
+    RowResultIterator rows = scanner.nextRows();
+    final RowResult next = rows.next();
+
+    // Do negative testing on string type.
+    try {
+      next.getInt("c2");
+      fail("IllegalArgumentException was not thrown when accessing " +
+             "a VARCHAR column with getInt");
+    } catch (IllegalArgumentException ignored) {}
+  }
+
+  /**
    * Test inserting and retrieving string columns.
    */
   @Test(timeout = 100000)
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduPredicate.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduPredicate.java
index a14ea94..760f9be 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduPredicate.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduPredicate.java
@@ -37,6 +37,7 @@ import org.junit.Test;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Type;
 import org.apache.kudu.test.junit.RetryRule;
+import org.apache.kudu.util.CharUtil;
 import org.apache.kudu.util.DecimalUtil;
 
 public class TestKuduPredicate {
@@ -83,6 +84,12 @@ public class TestKuduPredicate {
           .typeAttributes(DecimalUtil.typeAttributes(DecimalUtil.MAX_DECIMAL128_PRECISION, 2))
           .build();
 
+  private static final ColumnSchema varcharCol =
+      new ColumnSchema.ColumnSchemaBuilder("varchar", Type.VARCHAR)
+          .typeAttributes(CharUtil.typeAttributes(10))
+          .nullable(true)
+          .build();
+
   @Rule
   public RetryRule retryRule = new RetryRule();
 
@@ -935,6 +942,13 @@ public class TestKuduPredicate {
                                 new byte[] { 0, 1, 2, 3, 4, 5, 6 },
                                 new byte[] { 10 }));
 
+    testMerge(KuduPredicate.newComparisonPredicate(varcharCol, GREATER_EQUAL, "bar"),
+              KuduPredicate.newComparisonPredicate(varcharCol, LESS, "foo"),
+              new KuduPredicate(RANGE,
+                                varcharCol,
+                                new byte[] {98, 97, 114},
+                                new byte[] {102, 111, 111}));
+
     byte[] bA = "a".getBytes(UTF_8);
     byte[] bB = "b".getBytes(UTF_8);
     byte[] bC = "c".getBytes(UTF_8);
@@ -966,6 +980,8 @@ public class TestKuduPredicate {
                         KuduPredicate.newComparisonPredicate(stringCol, LESS, "a\0"));
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(binaryCol, LESS_EQUAL, new byte[] { (byte) 10 }),
                         KuduPredicate.newComparisonPredicate(binaryCol, LESS, new byte[] { (byte) 10, (byte) 0 }));
+    Assert.assertEquals(KuduPredicate.newComparisonPredicate(varcharCol, LESS_EQUAL, "a"),
+                        KuduPredicate.newComparisonPredicate(varcharCol, LESS, "a\0"));
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(byteCol, LESS_EQUAL, Byte.MAX_VALUE),
                         KuduPredicate.newIsNotNullPredicate(byteCol));
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(shortCol, LESS_EQUAL, Short.MAX_VALUE),
@@ -1051,6 +1067,8 @@ public class TestKuduPredicate {
                         KuduPredicate.none(stringCol));
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(binaryCol, LESS, new byte[] {}),
                         KuduPredicate.none(binaryCol));
+    Assert.assertEquals(KuduPredicate.newComparisonPredicate(varcharCol, LESS, ""),
+                        KuduPredicate.none(varcharCol));
   }
 
   @Test
@@ -1080,6 +1098,8 @@ public class TestKuduPredicate {
                         KuduPredicate.newIsNotNullPredicate(stringCol));
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(binaryCol, GREATER_EQUAL, new byte[] {}),
                         KuduPredicate.newIsNotNullPredicate(binaryCol));
+    Assert.assertEquals(KuduPredicate.newComparisonPredicate(varcharCol, GREATER_EQUAL, ""),
+                        KuduPredicate.newIsNotNullPredicate(varcharCol));
 
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(byteCol, GREATER_EQUAL, Byte.MAX_VALUE),
                         KuduPredicate.newComparisonPredicate(byteCol, EQUAL, Byte.MAX_VALUE));
@@ -1126,6 +1146,9 @@ public class TestKuduPredicate {
     Assert.assertEquals(
         KuduPredicate.newComparisonPredicate(binaryCol, EQUAL, (Object) new byte[] { (byte) 10 }),
         KuduPredicate.newComparisonPredicate(binaryCol, EQUAL, new byte[] { (byte) 10 }));
+    Assert.assertEquals(
+        KuduPredicate.newComparisonPredicate(varcharCol, EQUAL, (Object) "a"),
+        KuduPredicate.newComparisonPredicate(varcharCol, EQUAL, "a"));
   }
 
   @Test
@@ -1163,6 +1186,12 @@ public class TestKuduPredicate {
                         KuduPredicate.newIsNotNullPredicate(stringCol).toString());
     Assert.assertEquals("`string` IS NULL",
                         KuduPredicate.newIsNullPredicate(stringCol).toString());
+    Assert.assertEquals("`varchar` = \"my varchar\"",
+                        KuduPredicate.newComparisonPredicate(varcharCol, EQUAL, "my varchar").toString());
+    Assert.assertEquals("`varchar` IS NOT NULL",
+                        KuduPredicate.newIsNotNullPredicate(varcharCol).toString());
+    Assert.assertEquals("`varchar` IS NULL",
+                        KuduPredicate.newIsNullPredicate(varcharCol).toString());
     // IS NULL predicate on non-nullable column = NONE predicate
     Assert.assertEquals("`int` NONE",
             KuduPredicate.newIsNullPredicate(intCol).toString());
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartialRow.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartialRow.java
index f4353dd..215236b 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartialRow.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartialRow.java
@@ -85,6 +85,8 @@ public class TestPartialRow {
     assertEquals(53.35, (double) partialRow.getObject("double"), 0.0);
     assertTrue(partialRow.getObject("string") instanceof String);
     assertEquals("fun with ütf\0", partialRow.getObject("string"));
+    assertTrue(partialRow.getObject("varchar") instanceof String);
+    assertEquals("árvíztűrő ", partialRow.getObject("varchar"));
     assertTrue(partialRow.getObject("binary-array") instanceof byte[]);
     assertArrayEquals(new byte[] { 0, 1, 2, 3, 4 }, partialRow.getBinaryCopy("binary-array"));
     assertTrue(partialRow.getObject("binary-bytebuffer") instanceof byte[]);
@@ -98,7 +100,7 @@ public class TestPartialRow {
   public void testAddObject() {
     Schema schema = getSchemaWithAllTypes();
     // Ensure we aren't missing any types
-    assertEquals(13, schema.getColumnCount());
+    assertEquals(14, schema.getColumnCount());
 
     PartialRow row = schema.newPartialRow();
     row.addObject("int8", (byte) 42);
@@ -110,6 +112,7 @@ public class TestPartialRow {
     row.addObject("float", 52.35F);
     row.addObject("double", 53.35);
     row.addObject("string", "fun with ütf\0");
+    row.addObject("varchar", "árvíztűrő tükörfúrógép");
     row.addObject("binary-array", new byte[] { 0, 1, 2, 3, 4 });
     ByteBuffer binaryBuffer = ByteBuffer.wrap(new byte[] { 5, 6, 7, 8, 9 });
     row.addObject("binary-bytebuffer", binaryBuffer);
@@ -334,6 +337,12 @@ public class TestPartialRow {
             "string string=\"fun with ütf\\0\", binary binary-bytebuffer=[2, 3, 4], " +
             "decimal(5, 3) decimal=12.345)",
         row.toString());
+
+    row.addVarchar("varchar", "árvíztűrő tükörfúrógép");
+    assertEquals("(int8 int8=42, int32 int32=42, double double=52.35, " +
+        "string string=\"fun with ütf\\0\", binary binary-bytebuffer=[2, 3, 4], " +
+        "decimal(5, 3) decimal=12.345, varchar(10) varchar=\"árvíztűrő \")",
+      row.toString());
   }
 
   @Test
@@ -408,6 +417,12 @@ public class TestPartialRow {
     partialRow.addBinary(binaryIndex, new byte[] { 0, 1, 2, 3, 4 });
     assertTrue(partialRow.incrementColumn(binaryIndex));
     assertArrayEquals(new byte[] { 0, 1, 2, 3, 4, 0 }, partialRow.getBinaryCopy(binaryIndex));
+
+    // Varchar
+    int varcharIndex = getColumnIndex(partialRow, "varchar");
+    partialRow.addVarchar(varcharIndex, "hello");
+    assertTrue(partialRow.incrementColumn(varcharIndex));
+    assertEquals("hello\0", partialRow.getVarchar(varcharIndex));
   }
 
   @Test
@@ -425,6 +440,7 @@ public class TestPartialRow {
     assertEquals(-Float.MAX_VALUE, partialRow.getFloat("float"), 0.0f);
     assertEquals(-Double.MAX_VALUE, partialRow.getDouble("double"), 0.0);
     assertEquals("", partialRow.getString("string"));
+    assertEquals("", partialRow.getVarchar("varchar"));
     assertArrayEquals(new byte[0], partialRow.getBinaryCopy("binary-array"));
     assertArrayEquals(new byte[0], partialRow.getBinaryCopy("binary-bytebuffer"));
     assertEquals(BigDecimal.valueOf(-99999, 3), partialRow.getDecimal("decimal"));
@@ -450,6 +466,7 @@ public class TestPartialRow {
       case INT32: return partialRow.getInt(columnName);
       case INT64: return partialRow.getLong(columnName);
       case UNIXTIME_MICROS: return partialRow.getTimestamp(columnName);
+      case VARCHAR: return partialRow.getVarchar(columnName);
       case STRING: return partialRow.getString(columnName);
       case BINARY: return partialRow.getBinary(columnName);
       case FLOAT: return partialRow.getFloat(columnName);
@@ -471,6 +488,7 @@ public class TestPartialRow {
       case INT32: return partialRow.getInt(columnIndex);
       case INT64: return partialRow.getLong(columnIndex);
       case UNIXTIME_MICROS: return partialRow.getTimestamp(columnIndex);
+      case VARCHAR: return partialRow.getVarchar(columnIndex);
       case STRING: return partialRow.getString(columnIndex);
       case BINARY: return partialRow.getBinary(columnIndex);
       case FLOAT: return partialRow.getFloat(columnIndex);
@@ -489,6 +507,7 @@ public class TestPartialRow {
       case INT32: partialRow.addInt(columnName, 44); break;
       case INT64: partialRow.addLong(columnName, 45); break;
       case UNIXTIME_MICROS: partialRow.addTimestamp(columnName, new Timestamp(1234567890)); break;
+      case VARCHAR: partialRow.addVarchar(columnName, "fun with ütf\0"); break;
       case STRING: partialRow.addString(columnName, "fun with ütf\0"); break;
       case BINARY: partialRow.addBinary(columnName, new byte[] { 0, 1, 2, 3, 4 }); break;
       case FLOAT: partialRow.addFloat(columnName, 52.35F); break;
@@ -507,6 +526,7 @@ public class TestPartialRow {
       case INT32: partialRow.addInt(columnIndex, 44); break;
       case INT64: partialRow.addLong(columnIndex, 45); break;
       case UNIXTIME_MICROS: partialRow.addTimestamp(columnIndex, new Timestamp(1234567890)); break;
+      case VARCHAR: partialRow.addVarchar(columnIndex, "fun with ütf\0"); break;
       case STRING: partialRow.addString(columnIndex, "fun with ütf\0"); break;
       case BINARY: partialRow.addBinary(columnIndex, new byte[] { 0, 1, 2, 3, 4 }); break;
       case FLOAT: partialRow.addFloat(columnIndex, 52.35F); break;
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java
index fc8375b..16e1944 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java
@@ -75,6 +75,7 @@ public class TestRowResult {
     row.setNull(10);
     row.addTimestamp(11, new Timestamp(11));
     row.addDecimal(12, BigDecimal.valueOf(12345, 3));
+    row.addVarchar(13, "varcharval");
 
     KuduClient client = harness.getClient();
     KuduSession session = client.newSession();
@@ -143,6 +144,10 @@ public class TestRowResult {
       assertEquals(BigDecimal.valueOf(12345, 3), rr.getObject(12));
       assertEquals(BigDecimal.valueOf(12345, 3), rr.getDecimal(allTypesSchema.getColumnByIndex(12).getName()));
 
+      assertEquals("varcharval", rr.getVarchar(13));
+      assertEquals("varcharval", rr.getObject(13));
+      assertEquals("varcharval", rr.getVarchar(allTypesSchema.getColumnByIndex(13).getName()));
+
       // We test with the column name once since it's the same method for all types, unlike above.
       assertEquals(Type.INT8, rr.getColumnType(allTypesSchema.getColumnByIndex(0).getName()));
       assertEquals(Type.INT8, rr.getColumnType(0));
@@ -156,6 +161,7 @@ public class TestRowResult {
       assertEquals(Type.BINARY, rr.getColumnType(8));
       assertEquals(Type.UNIXTIME_MICROS, rr.getColumnType(11));
       assertEquals(Type.DECIMAL, rr.getColumnType(12));
+      assertEquals(Type.VARCHAR, rr.getColumnType(13));
     }
   }
 }
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanPredicate.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanPredicate.java
index 5426125..e64ff44 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanPredicate.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanPredicate.java
@@ -35,6 +35,7 @@ import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.KuduPredicate.ComparisonOp;
 import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.util.CharUtil;
 import org.apache.kudu.util.DecimalUtil;
 
 public class TestScanPredicate {
@@ -53,7 +54,16 @@ public class TestScanPredicate {
 
   private Schema createTableSchema(Type type) {
     ColumnSchema key = new ColumnSchema.ColumnSchemaBuilder("key", Type.INT64).key(true).build();
-    ColumnSchema val = new ColumnSchema.ColumnSchemaBuilder("value", type).nullable(true).build();
+    ColumnSchema val;
+    switch (type) {
+      case VARCHAR:
+        val = new ColumnSchema.ColumnSchemaBuilder("value", type)
+          .typeAttributes(CharUtil.typeAttributes(10)).nullable(true).build();
+        break;
+      default:
+        val = new ColumnSchema.ColumnSchemaBuilder("value", type).nullable(true).build();
+        break;
+    }
     return new Schema(ImmutableList.of(key, val));
   }
 
@@ -631,7 +641,16 @@ public class TestScanPredicate {
 
   @Test
   public void testStringPredicates() throws Exception {
-    Schema schema = createTableSchema(Type.STRING);
+    testVarlengthPredicates(Type.STRING);
+  }
+
+  @Test
+  public void testVarcharPredicates() throws Exception {
+    testVarlengthPredicates(Type.VARCHAR);
+  }
+
+  private void testVarlengthPredicates(Type type) throws Exception {
+    Schema schema = createTableSchema(type);
     client.createTable("string-table", schema, createTableOptions());
     KuduTable table = client.openTable("string-table");
 
@@ -643,7 +662,16 @@ public class TestScanPredicate {
     for (String value : values) {
       Insert insert = table.newInsert();
       insert.getRow().addLong("key", i++);
-      insert.getRow().addString("value", value);
+      switch (type) {
+        case VARCHAR:
+          insert.getRow().addVarchar("value", value);
+          break;
+        case STRING:
+          insert.getRow().addString("value", value);
+          break;
+        default:
+          throw new IllegalArgumentException("CHAR/VARCHAR/STRING expected");
+      }
       session.apply(insert);
     }
     Insert nullInsert = table.newInsert();
diff --git a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/DistributedDataGenerator.scala b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/DistributedDataGenerator.scala
index 357eb8b..d5a0938 100644
--- a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/DistributedDataGenerator.scala
+++ b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/DistributedDataGenerator.scala
@@ -220,6 +220,8 @@ private class GeneratedRowIterator(
           row.addDecimal(
             i,
             new BigDecimal(BigInteger.valueOf(value), col.getTypeAttributes.getScale))
+        case Type.VARCHAR =>
+          row.addVarchar(i, String.valueOf(value))
         case Type.STRING =>
           row.addString(i, String.valueOf(value))
         case Type.BINARY =>
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/RowConverter.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/RowConverter.scala
index 6df627d..0863bae 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/RowConverter.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/RowConverter.scala
@@ -17,6 +17,7 @@
 package org.apache.kudu.spark.kudu
 
 import org.apache.kudu.Schema
+import org.apache.kudu.Type
 import org.apache.kudu.client.PartialRow
 import org.apache.kudu.client.RowResult
 import org.apache.spark.sql.Row
@@ -65,7 +66,14 @@ class RowConverter(kuduSchema: Schema, schema: StructType, ignoreNull: Boolean)
       } else {
         schema.fields(sparkIdx).dataType match {
           case DataTypes.StringType =>
-            partialRow.addString(kuduIdx, row.getString(sparkIdx))
+            kuduSchema.getColumnByIndex(kuduIdx).getType match {
+              case Type.STRING =>
+                partialRow.addString(kuduIdx, row.getString(sparkIdx))
+              case Type.VARCHAR =>
+                partialRow.addVarchar(kuduIdx, row.getString(sparkIdx))
+              case t =>
+                throw new IllegalArgumentException(s"Invalid Kudu column type $t")
+            }
           case DataTypes.BinaryType =>
             partialRow.addBinary(kuduIdx, row.getAs[Array[Byte]](sparkIdx))
           case DataTypes.BooleanType =>
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala
index 636b343..bac9193 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala
@@ -52,6 +52,7 @@ object SparkUtil {
       case Type.UNIXTIME_MICROS => TimestampType
       case Type.FLOAT => FloatType
       case Type.DOUBLE => DoubleType
+      case Type.VARCHAR => StringType
       case Type.STRING => StringType
       case Type.BINARY => BinaryType
       case Type.DECIMAL => DecimalType(a.getPrecision, a.getScale)
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 521c83a..2f7ddb1 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -529,7 +529,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
       ))
 
     val dfDefaultSchema = sqlContext.read.options(kuduOptions).format("kudu").load
-    assertEquals(14, dfDefaultSchema.schema.fields.length)
+    assertEquals(15, dfDefaultSchema.schema.fields.length)
 
     val dfWithUserSchema =
       sqlContext.read.options(kuduOptions).schema(userSchema).format("kudu").load
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
index 76b6b2b..b322532 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
@@ -82,7 +82,8 @@ class KuduContextTest extends KuduTestSuite with Matchers {
           "c10_byte",
           "c11_decimal32",
           "c12_decimal64",
-          "c13_decimal128"
+          "c13_decimal128",
+          "c14_varchar"
         )
       )
       .map(r => r.toSeq)
@@ -106,6 +107,7 @@ class KuduContextTest extends KuduTestSuite with Matchers {
       assert(r.apply(11).asInstanceOf[BigDecimal] == BigDecimal.valueOf(rows.apply(index)._2))
       assert(r.apply(12).asInstanceOf[BigDecimal] == BigDecimal.valueOf(rows.apply(index)._2))
       assert(r.apply(13).asInstanceOf[BigDecimal] == BigDecimal.valueOf(rows.apply(index)._2))
+      assert(r.apply(14).asInstanceOf[String] == rows.apply(index)._3)
     })
   }
 
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
index 61e1069..8dbe53f 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
@@ -30,6 +30,7 @@ import org.apache.kudu.client.KuduTable
 import org.apache.kudu.Schema
 import org.apache.kudu.Type
 import org.apache.kudu.test.KuduTestHarness
+import org.apache.kudu.util.CharUtil
 import org.apache.kudu.util.DecimalUtil
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.DataFrame
@@ -84,6 +85,10 @@ trait KuduTestSuite extends JUnitSuite {
             .precision(DecimalUtil.MAX_DECIMAL128_PRECISION)
             .build()
         )
+        .build(),
+      new ColumnSchemaBuilder("c14_varchar", Type.VARCHAR)
+        .typeAttributes(CharUtil.typeAttributes(CharUtil.MAX_VARCHAR_LENGTH))
+        .nullable(true)
         .build()
     )
     new Schema(columns.asJava)
@@ -181,9 +186,11 @@ trait KuduTestSuite extends JUnitSuite {
       // Sprinkling some nulls so that queries see them.
       val s = if (i % 2 == 0) {
         row.addString(2, i.toString)
+        row.addVarchar(14, i.toString)
         i.toString
       } else {
         row.setNull(2)
+        row.setNull(14)
         null
       }
 
@@ -216,6 +223,7 @@ trait KuduTestSuite extends JUnitSuite {
       row.addDecimal(11, BigDecimal.valueOf(i))
       row.addDecimal(12, BigDecimal.valueOf(i))
       row.addDecimal(13, BigDecimal.valueOf(i))
+      row.addVarchar(14, i.toString)
 
       // Sprinkling some nulls so that queries see them.
       val s = if (i % 2 == 0) {
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
index ba6842b..c4fce18 100644
--- a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
@@ -42,8 +42,8 @@ import org.apache.kudu.client.PartialRow;
 import org.apache.kudu.client.RowResult;
 import org.apache.kudu.client.RowResultIterator;
 import org.apache.kudu.client.Upsert;
+import org.apache.kudu.util.CharUtil;
 import org.apache.kudu.util.DecimalUtil;
-import org.apache.kudu.util.StringUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
@@ -213,7 +213,9 @@ public abstract class ClientTestUtil {
             new ColumnSchema.ColumnSchemaBuilder("null", Type.STRING).nullable(true).build(),
             new ColumnSchema.ColumnSchemaBuilder("timestamp", Type.UNIXTIME_MICROS).build(),
             new ColumnSchema.ColumnSchemaBuilder("decimal", Type.DECIMAL)
-                .typeAttributes(DecimalUtil.typeAttributes(5, 3)).build());
+              .typeAttributes(DecimalUtil.typeAttributes(5, 3)).build(),
+            new ColumnSchema.ColumnSchemaBuilder("varchar", Type.VARCHAR)
+              .typeAttributes(CharUtil.typeAttributes(10)).build());
 
     return new Schema(columns);
   }
@@ -221,7 +223,7 @@ public abstract class ClientTestUtil {
   public static PartialRow getPartialRowWithAllTypes() {
     Schema schema = getSchemaWithAllTypes();
     // Ensure we aren't missing any types
-    assertEquals(13, schema.getColumnCount());
+    assertEquals(14, schema.getColumnCount());
 
     PartialRow row = schema.newPartialRow();
     row.addByte("int8", (byte) 42);
@@ -233,6 +235,7 @@ public abstract class ClientTestUtil {
     row.addFloat("float", 52.35F);
     row.addDouble("double", 53.35);
     row.addString("string", "fun with ütf\0");
+    row.addVarchar("varchar", "árvíztűrő tükörfúrógép");
     row.addBinary("binary-array", new byte[] { 0, 1, 2, 3, 4 });
     ByteBuffer binaryBuffer = ByteBuffer.wrap(new byte[] { 5, 6, 7, 8, 9 });
     row.addBinary("binary-bytebuffer", binaryBuffer);
@@ -413,6 +416,21 @@ public abstract class ClientTestUtil {
     return table;
   }
 
+  public static Schema createManyVarcharsSchema() {
+    ArrayList<ColumnSchema> columns = new ArrayList<>();
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.VARCHAR)
+                  .typeAttributes(CharUtil.typeAttributes(10)).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.VARCHAR)
+                  .typeAttributes(CharUtil.typeAttributes(10)).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c2", Type.VARCHAR)
+                  .typeAttributes(CharUtil.typeAttributes(10)).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c3", Type.VARCHAR)
+                  .typeAttributes(CharUtil.typeAttributes(10)).nullable(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c4", Type.VARCHAR)
+                  .typeAttributes(CharUtil.typeAttributes(10)).nullable(true).build());
+    return new Schema(columns);
+  }
+
   public static Schema createManyStringsSchema() {
     ArrayList<ColumnSchema> columns = new ArrayList<ColumnSchema>(4);
     columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.STRING).key(true).build());


[kudu] 04/04: KUDU-1938 Add range partition support pt 6

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 768b6885485dcb74f2731ba59fdebf2e28d0a3a3
Author: Attila Bukor <ab...@apache.org>
AuthorDate: Tue Oct 1 18:01:23 2019 +0200

    KUDU-1938 Add range partition support pt 6
    
    Change-Id: Ib82985f09fef8d3372f40412156cd7a8ce69d61c
    Reviewed-on: http://gerrit.cloudera.org:8080/14333
    Reviewed-by: Grant Henke <gr...@apache.org>
    Tested-by: Kudu Jenkins
---
 src/kudu/common/partition-test.cc           | 54 +++++++++++++++++++++++++
 src/kudu/common/partition.cc                | 11 +++++
 src/kudu/common/partition.h                 |  1 +
 src/kudu/integration-tests/varchar-itest.cc | 62 +++++++++++++++++++++++++++--
 4 files changed, 125 insertions(+), 3 deletions(-)

diff --git a/src/kudu/common/partition-test.cc b/src/kudu/common/partition-test.cc
index 799d292..9091eed 100644
--- a/src/kudu/common/partition-test.cc
+++ b/src/kudu/common/partition-test.cc
@@ -803,4 +803,58 @@ TEST_F(PartitionTest, TestIncrementRangePartitionStringBounds) {
     check(test, false);
   }
 }
+
+TEST_F(PartitionTest, TestVarcharRangePartitions) {
+  Schema schema({ ColumnSchema("c1", VARCHAR, false, nullptr, nullptr,
+                               ColumnStorageAttributes(), ColumnTypeAttributes(10)),
+                  ColumnSchema("c2", VARCHAR, false, nullptr, nullptr,
+                               ColumnStorageAttributes(), ColumnTypeAttributes(10)) },
+                  { ColumnId(0), ColumnId(1) }, 2);
+
+  PartitionSchemaPB schema_builder;
+  PartitionSchema partition_schema;
+  ASSERT_OK(PartitionSchema::FromPB(schema_builder, schema, &partition_schema));
+
+  vector<vector<boost::optional<string>>> tests {
+    { string("a"), string("b"), string("a"), string("b\0", 2) },
+    { string("a"), boost::none, string("a"), string("\0", 1) },
+  };
+
+  auto check = [&] (const vector<boost::optional<string>>& test, bool lower_bound) {
+    CHECK_EQ(4, test.size());
+    KuduPartialRow bound(&schema);
+    if (test[0]) ASSERT_OK(bound.SetVarchar("c1", *test[0]));
+    if (test[1]) ASSERT_OK(bound.SetVarchar("c2", *test[1]));
+
+    vector<string> components;
+    partition_schema.AppendRangeDebugStringComponentsOrMin(bound, &components);
+    SCOPED_TRACE(JoinStrings(components, ", "));
+    SCOPED_TRACE(lower_bound ? "lower bound" : "upper bound");
+
+    if (lower_bound) {
+      ASSERT_OK(partition_schema.MakeLowerBoundRangePartitionKeyInclusive(&bound));
+    } else {
+      ASSERT_OK(partition_schema.MakeUpperBoundRangePartitionKeyExclusive(&bound));
+    }
+
+    Slice val;
+    if (test[2]) {
+      ASSERT_OK(bound.GetVarchar("c1", &val));
+      ASSERT_EQ(*test[2], val);
+    } else {
+      ASSERT_FALSE(bound.IsColumnSet("c1"));
+    }
+    if (test[3]) {
+      ASSERT_OK(bound.GetVarchar("c2", &val));
+      ASSERT_EQ(*test[3], val);
+    } else {
+      ASSERT_FALSE(bound.IsColumnSet("c2"));
+    }
+  };
+
+  for (const auto& test : tests) {
+    check(test, true);
+    check(test, false);
+  }
+}
 } // namespace kudu
diff --git a/src/kudu/common/partition.cc b/src/kudu/common/partition.cc
index 02b9062..571ec4a 100644
--- a/src/kudu/common/partition.cc
+++ b/src/kudu/common/partition.cc
@@ -1013,6 +1013,9 @@ namespace {
       case UNIXTIME_MICROS:
         RETURN_NOT_OK(row->SetInt64(idx, INT64_MIN + 1));
         break;
+      case VARCHAR:
+        RETURN_NOT_OK(row->SetVarchar(idx, Slice("\0", 1)));
+        break;
       case STRING:
         RETURN_NOT_OK(row->SetStringCopy(idx, Slice("\0", 1)));
         break;
@@ -1109,6 +1112,14 @@ namespace {
         RETURN_NOT_OK(row->SetBinaryCopy(idx, incremented));
         break;
       }
+      case VARCHAR: {
+        Slice value;
+        RETURN_NOT_OK(row->GetVarchar(idx, &value));
+        string incremented = value.ToString();
+        incremented.push_back('\0');
+        RETURN_NOT_OK(row->SetVarchar(idx, incremented));
+        break;
+      }
       case STRING: {
         Slice value;
         RETURN_NOT_OK(row->GetString(idx, &value));
diff --git a/src/kudu/common/partition.h b/src/kudu/common/partition.h
index d8e499b..761e8e2 100644
--- a/src/kudu/common/partition.h
+++ b/src/kudu/common/partition.h
@@ -267,6 +267,7 @@ class PartitionSchema {
   friend class PartitionPruner;
   FRIEND_TEST(PartitionTest, TestIncrementRangePartitionBounds);
   FRIEND_TEST(PartitionTest, TestIncrementRangePartitionStringBounds);
+  FRIEND_TEST(PartitionTest, TestVarcharRangePartitions);
 
   // Returns a text description of the encoded range key suitable for debug printing.
   std::string RangeKeyDebugString(Slice range_key, const Schema& schema) const;
diff --git a/src/kudu/integration-tests/varchar-itest.cc b/src/kudu/integration-tests/varchar-itest.cc
index ad92f8a..aab7623 100644
--- a/src/kudu/integration-tests/varchar-itest.cc
+++ b/src/kudu/integration-tests/varchar-itest.cc
@@ -40,14 +40,18 @@ namespace client {
 
 using sp::shared_ptr;
 
-class VarcharItest : public ExternalMiniClusterITestBase {};
+class VarcharItest : public ExternalMiniClusterITestBase {
+ protected:
+  void SetUp() override {
+    NO_FATALS(StartCluster({}, {}, kNumServers));
+  }
 
-TEST_F(VarcharItest, TestCharVarcharTruncation) {
   const int kNumServers = 3;
   const int kNumTablets = 3;
   const char* const kTableName = "varchar-table";
-  NO_FATALS(StartCluster({}, {}, kNumServers));
+};
 
+TEST_F(VarcharItest, TestVarcharTruncation) {
   KuduSchemaBuilder builder;
   builder.AddColumn("key")->Type(KuduColumnSchema::INT64)
     ->NotNull()->PrimaryKey();
@@ -106,5 +110,57 @@ TEST_F(VarcharItest, TestInvalidLength) {
   ASSERT_FALSE(s.ok());
 }
 
+TEST_F(VarcharItest, TestVarcharRangePartition) {
+  KuduSchemaBuilder builder;
+  builder.AddColumn("key")->Type(KuduColumnSchema::VARCHAR)
+    ->Length(10)->NotNull()->PrimaryKey();
+  builder.AddColumn("value")->Type(KuduColumnSchema::VARCHAR)
+    ->Length(10);
+  KuduSchema schema;
+  ASSERT_OK(builder.Build(&schema));
+
+  KuduPartialRow* lower_bound = schema.NewRow();
+  ASSERT_OK(lower_bound->SetVarchar("key", "bar"));
+  KuduPartialRow* upper_bound = schema.NewRow();
+  ASSERT_OK(upper_bound->SetVarchar("key", "foo"));
+
+  unique_ptr<client::KuduTableCreator> table_creator(client_->NewTableCreator());
+  ASSERT_OK(table_creator->table_name(kTableName)
+      .schema(&schema)
+      .num_replicas(kNumServers)
+      .set_range_partition_columns({ "key" })
+      .add_range_partition(lower_bound, upper_bound)
+      .Create());
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client_->OpenTable(kTableName, &table));
+
+  shared_ptr<KuduSession> session = client_->NewSession();
+  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
+  KuduInsert* insert = table->NewInsert();
+  KuduPartialRow* write = insert->mutable_row();
+
+  ASSERT_OK(write->SetVarchar("key", "baz"));
+  ASSERT_OK(write->SetVarchar("value", "foobar"));
+  ASSERT_OK(session->Apply(insert));
+  ASSERT_OK(session->Flush());
+
+  KuduScanner scanner(table.get());
+  ASSERT_OK(scanner.SetFaultTolerant());
+  ASSERT_OK(scanner.Open());
+  ASSERT_TRUE(scanner.HasMoreRows());
+  while (scanner.HasMoreRows()) {
+    KuduScanBatch batch;
+    ASSERT_OK(scanner.NextBatch(&batch));
+    for (const KuduScanBatch::RowPtr& read : batch) {
+      Slice key;
+      ASSERT_OK(read.GetVarchar("key", &key));
+      ASSERT_EQ("baz", key);
+      Slice value;
+      ASSERT_OK(read.GetVarchar("value", &value));
+      ASSERT_EQ("foobar", value);
+    }
+  }
+}
+
 } // namespace client
 } // namespace kudu