You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ab...@apache.org on 2019/10/23 08:46:08 UTC

[kudu] 02/04: KUDU-1938 [java] Add support for VARCHAR pt 4

This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit e2b2fb25335fe517e46cdd61924698b36e9cc652
Author: Attila Bukor <ab...@apache.org>
AuthorDate: Tue Sep 24 18:51:52 2019 +0200

    KUDU-1938 [java] Add support for VARCHAR pt 4
    
    Adds support for VARCHAR type to the Java and Spark clients. The
    kudu-client only changes would break tests in kudu-spark and kudu-backup
    so this patch also incorporates changes in these subprojects.
    
    Change-Id: I03edf5e65409e895512d5cd81a607180632e8995
    Reviewed-on: http://gerrit.cloudera.org:8080/14050
    Reviewed-by: Grant Henke <gr...@apache.org>
    Tested-by: Kudu Jenkins
---
 .../src/main/protobuf/backup.proto                 |  1 +
 .../org/apache/kudu/backup/TableMetadata.scala     |  7 ++
 .../main/java/org/apache/kudu/ColumnSchema.java    | 19 ++++-
 .../java/org/apache/kudu/ColumnTypeAttributes.java | 45 ++++++++++--
 .../src/main/java/org/apache/kudu/Type.java        |  5 +-
 .../apache/kudu/client/ColumnRangePredicate.java   |  5 +-
 .../java/org/apache/kudu/client/KeyEncoder.java    |  9 ++-
 .../java/org/apache/kudu/client/KuduPredicate.java |  8 ++-
 .../java/org/apache/kudu/client/Operation.java     |  3 +-
 .../java/org/apache/kudu/client/PartialRow.java    | 84 +++++++++++++++++++++-
 .../org/apache/kudu/client/ProtobufHelper.java     |  8 +++
 .../java/org/apache/kudu/client/RowResult.java     | 37 +++++++++-
 .../main/java/org/apache/kudu/util/CharUtil.java   | 38 ++++++++++
 .../java/org/apache/kudu/util/DataGenerator.java   |  4 ++
 .../java/org/apache/kudu/util/SchemaGenerator.java | 14 ++++
 .../java/org/apache/kudu/TestColumnSchema.java     | 20 ++++++
 .../org/apache/kudu/client/TestKeyEncoding.java    | 27 +++++--
 .../org/apache/kudu/client/TestKuduClient.java     | 57 +++++++++++++++
 .../org/apache/kudu/client/TestKuduPredicate.java  | 29 ++++++++
 .../org/apache/kudu/client/TestPartialRow.java     | 22 +++++-
 .../java/org/apache/kudu/client/TestRowResult.java |  6 ++
 .../org/apache/kudu/client/TestScanPredicate.java  | 34 ++++++++-
 .../spark/tools/DistributedDataGenerator.scala     |  2 +
 .../org/apache/kudu/spark/kudu/RowConverter.scala  | 10 ++-
 .../org/apache/kudu/spark/kudu/SparkUtil.scala     |  1 +
 .../apache/kudu/spark/kudu/DefaultSourceTest.scala |  2 +-
 .../apache/kudu/spark/kudu/KuduContextTest.scala   |  4 +-
 .../org/apache/kudu/spark/kudu/KuduTestSuite.scala |  8 +++
 .../java/org/apache/kudu/test/ClientTestUtil.java  | 24 ++++++-
 29 files changed, 500 insertions(+), 33 deletions(-)

diff --git a/java/kudu-backup-common/src/main/protobuf/backup.proto b/java/kudu-backup-common/src/main/protobuf/backup.proto
index 5711e46..ef8678a 100644
--- a/java/kudu-backup-common/src/main/protobuf/backup.proto
+++ b/java/kudu-backup-common/src/main/protobuf/backup.proto
@@ -31,6 +31,7 @@ import "google/protobuf/wrappers.proto";
 message ColumnTypeAttributesMetadataPB {
   int32 precision = 1;
   int32 scale = 2;
+  int32 length = 3;
 }
 
 // Maps to the ColumnSchema class.
diff --git a/java/kudu-backup-common/src/main/scala/org/apache/kudu/backup/TableMetadata.scala b/java/kudu-backup-common/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
index 6fc49d3..8bff6a1 100644
--- a/java/kudu-backup-common/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
+++ b/java/kudu-backup-common/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
@@ -110,6 +110,7 @@ object TableMetadata {
       .newBuilder()
       .setPrecision(attributes.getPrecision)
       .setScale(attributes.getScale)
+      .setLength(attributes.getLength)
       .build()
   }
 
@@ -215,6 +216,7 @@ object TableMetadata {
           new ColumnTypeAttributesBuilder()
             .precision(attributes.getPrecision)
             .scale(attributes.getScale)
+            .length(attributes.getLength)
             .build()
         )
       }
@@ -234,6 +236,7 @@ object TableMetadata {
       case Type.INT64 | Type.UNIXTIME_MICROS => row.getLong(columnName)
       case Type.FLOAT => row.getFloat(columnName)
       case Type.DOUBLE => row.getDouble(columnName)
+      case Type.VARCHAR => row.getVarchar(columnName)
       case Type.STRING => row.getString(columnName)
       case Type.BINARY => row.getBinary(columnName)
       case Type.DECIMAL => row.getDecimal(columnName)
@@ -252,6 +255,7 @@ object TableMetadata {
         row.addLong(columnName, value.asInstanceOf[Long])
       case Type.FLOAT => row.addFloat(columnName, value.asInstanceOf[Float])
       case Type.DOUBLE => row.addDouble(columnName, value.asInstanceOf[Double])
+      case Type.VARCHAR => row.addVarchar(columnName, value.asInstanceOf[String])
       case Type.STRING => row.addString(columnName, value.asInstanceOf[String])
       case Type.BINARY =>
         row.addBinary(columnName, value.asInstanceOf[Array[Byte]])
@@ -278,6 +282,8 @@ object TableMetadata {
         String.valueOf(value.asInstanceOf[Float])
       case Type.DOUBLE =>
         String.valueOf(value.asInstanceOf[Double])
+      case Type.VARCHAR =>
+        value.asInstanceOf[String]
       case Type.STRING =>
         value.asInstanceOf[String]
       case Type.BINARY =>
@@ -300,6 +306,7 @@ object TableMetadata {
       case Type.INT64 | Type.UNIXTIME_MICROS => value.toLong
       case Type.FLOAT => value.toFloat
       case Type.DOUBLE => value.toDouble
+      case Type.VARCHAR => value
       case Type.STRING => value
       case Type.BINARY => Base64.decodeBase64(value)
       case Type.DECIMAL => new BigDecimal(value) // TODO: Explicitly pass scale
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java b/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
index 1a69f33..7ae0335 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
@@ -17,8 +17,12 @@
 
 package org.apache.kudu;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Objects;
 
+import org.apache.kudu.util.CharUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
@@ -254,6 +258,8 @@ public class ColumnSchema {
   @InterfaceAudience.Public
   @InterfaceStability.Evolving
   public static class ColumnSchemaBuilder {
+    public static final List<Type> TYPES_WITH_ATTRIBUTES = Arrays.asList(Type.DECIMAL,
+                                                                         Type.VARCHAR);
     private final String name;
     private final Type type;
     private boolean key = false;
@@ -377,7 +383,7 @@ public class ColumnSchema {
      * Set the column type attributes for this column.
      */
     public ColumnSchemaBuilder typeAttributes(ColumnTypeAttributes typeAttributes) {
-      if (type != Type.DECIMAL && typeAttributes != null) {
+      if (typeAttributes != null && !TYPES_WITH_ATTRIBUTES.contains(type)) {
         throw new IllegalArgumentException(
             "ColumnTypeAttributes are not used on " + type + " columns");
       }
@@ -414,10 +420,19 @@ public class ColumnSchema {
       if (wireType == null) {
         this.wireType = type.getDataType(typeAttributes);
       }
+      if (type == Type.VARCHAR) {
+        if (typeAttributes == null || !typeAttributes.hasLength()
+          || typeAttributes.getLength() < CharUtil.MIN_VARCHAR_LENGTH
+          || typeAttributes.getLength() > CharUtil.MAX_VARCHAR_LENGTH) {
+          throw new IllegalArgumentException(
+            String.format("VARCHAR's length must be set and between %d and %d",
+                          CharUtil.MIN_VARCHAR_LENGTH, CharUtil.MAX_VARCHAR_LENGTH));
+        }
+      }
       return new ColumnSchema(name, type,
                               key, nullable, defaultValue,
                               desiredBlockSize, encoding, compressionAlgorithm,
                               typeAttributes, wireType, comment);
-    }
+     }
   }
 }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/ColumnTypeAttributes.java b/java/kudu-client/src/main/java/org/apache/kudu/ColumnTypeAttributes.java
index bc0d88e..b97c519 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/ColumnTypeAttributes.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/ColumnTypeAttributes.java
@@ -32,12 +32,18 @@ public class ColumnTypeAttributes {
   private final boolean hasScale;
   private final int scale;
 
+  private final boolean hasLength;
+  private final int length;
+
   private ColumnTypeAttributes(boolean hasPrecision, int precision,
-                               boolean hasScale, int scale) {
+                               boolean hasScale, int scale,
+                               boolean hasLength, int length) {
     this.hasPrecision = hasPrecision;
     this.precision = precision;
     this.hasScale = hasScale;
     this.scale = scale;
+    this.hasLength = hasLength;
+    this.length = length;
   }
 
   /**
@@ -68,6 +74,20 @@ public class ColumnTypeAttributes {
     return scale;
   }
 
+  /**
+   * Returns true if the length is set;
+   */
+  public boolean hasLength() {
+    return hasLength;
+  }
+
+  /**
+   * Returns the length;
+   */
+  public int getLength() {
+    return length;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -91,13 +111,19 @@ public class ColumnTypeAttributes {
     if (scale != that.scale) {
       return false;
     }
+    if (hasLength != that.hasLength) {
+      return false;
+    }
+    if (length != that.length) {
+      return false;
+    }
 
     return true;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(hasPrecision, precision, hasScale, scale);
+    return Objects.hash(hasPrecision, precision, hasScale, scale, hasLength, length);
   }
 
   /**
@@ -110,6 +136,8 @@ public class ColumnTypeAttributes {
   public String toStringForType(Type type) {
     if (type == Type.DECIMAL) {
       return "(" + precision + ", " + scale + ")";
+    } else if (type == Type.VARCHAR) {
+      return "(" + length + ")";
     } else {
       return "";
     }
@@ -118,7 +146,8 @@ public class ColumnTypeAttributes {
   @Override
   public String toString() {
     return "hasPrecision: " + hasPrecision + ", precision: " + precision +
-        ", hasScale: " + hasScale + ", scale: " + scale;
+        ", hasScale: " + hasScale + ", scale: " + scale +
+        ", hasLength: " + hasLength + ", length: " + length;
   }
 
   /**
@@ -132,6 +161,8 @@ public class ColumnTypeAttributes {
     private int precision;
     private boolean hasScale;
     private int scale;
+    private boolean hasLength;
+    private int length;
 
     /**
      * Set the precision. Only used for Decimal columns.
@@ -151,12 +182,18 @@ public class ColumnTypeAttributes {
       return this;
     }
 
+    public ColumnTypeAttributesBuilder length(int length) {
+      this.hasLength = true;
+      this.length = length;
+      return this;
+    }
+
     /**
      * Builds a {@link ColumnTypeAttributes} using the passed parameters.
      * @return a new {@link ColumnTypeAttributes}
      */
     public ColumnTypeAttributes build() {
-      return new ColumnTypeAttributes(hasPrecision, precision, hasScale, scale);
+      return new ColumnTypeAttributes(hasPrecision, precision, hasScale, scale, hasLength, length);
     }
   }
 }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/Type.java b/java/kudu-client/src/main/java/org/apache/kudu/Type.java
index ec7d542..6d31910 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/Type.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/Type.java
@@ -48,7 +48,8 @@ public enum Type {
   FLOAT(DataType.FLOAT, "float"),
   DOUBLE(DataType.DOUBLE, "double"),
   UNIXTIME_MICROS(DataType.UNIXTIME_MICROS, "unixtime_micros"),
-  DECIMAL(Arrays.asList(DataType.DECIMAL32, DataType.DECIMAL64, DataType.DECIMAL128), "decimal");
+  DECIMAL(Arrays.asList(DataType.DECIMAL32, DataType.DECIMAL64, DataType.DECIMAL128), "decimal"),
+  VARCHAR(DataType.VARCHAR, "varchar");
 
   private final ImmutableList<DataType> dataTypes;
   private final String name;
@@ -143,6 +144,7 @@ public enum Type {
     switch (type) {
       case STRING:
       case BINARY:
+      case VARCHAR:
         return 8 + 8; // offset then string length
       case BOOL:
       case INT8:
@@ -171,6 +173,7 @@ public enum Type {
     switch (type) {
       case STRING: return STRING;
       case BINARY: return BINARY;
+      case VARCHAR: return VARCHAR;
       case BOOL: return BOOL;
       case INT8: return INT8;
       case INT16: return INT16;
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ColumnRangePredicate.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ColumnRangePredicate.java
index 697501d..8a8a26e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ColumnRangePredicate.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ColumnRangePredicate.java
@@ -97,6 +97,7 @@ public class ColumnRangePredicate {
         return KuduPredicate.newComparisonPredicate(column, op, Bytes.getFloat(bound));
       case DOUBLE:
         return KuduPredicate.newComparisonPredicate(column, op, Bytes.getDouble(bound));
+      case VARCHAR:
       case STRING:
         return KuduPredicate.newComparisonPredicate(column, op, Bytes.getString(bound));
       case BINARY:
@@ -185,7 +186,7 @@ public class ColumnRangePredicate {
    * @param lowerBound value for the lower bound
    */
   public void setLowerBound(String lowerBound) {
-    checkColumn(Type.STRING);
+    checkColumn(Type.STRING, Type.VARCHAR);
     setLowerBoundInternal(lowerBound.getBytes(UTF_8));
   }
 
@@ -282,7 +283,7 @@ public class ColumnRangePredicate {
    * @param upperBound value for the upper bound
    */
   public void setUpperBound(String upperBound) {
-    checkColumn(Type.STRING);
+    checkColumn(Type.STRING, Type.VARCHAR);
     setUpperBoundInternal(upperBound.getBytes(UTF_8));
   }
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
index de7ac79..a2d06bf 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
@@ -21,6 +21,7 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
@@ -144,7 +145,8 @@ class KeyEncoder {
                                                     column.getName()));
     }
     final Type type = column.getType();
-    if (type == Type.STRING || type == Type.BINARY) {
+    if (type == Type.STRING || type == Type.BINARY ||
+        type == Type.VARCHAR) {
       encodeBinary(row.getVarLengthData().get(columnIdx), isLast, buf);
     } else {
       encodeSignedInt(row.getRowAlloc(),
@@ -337,6 +339,11 @@ class KeyEncoder {
         row.addBinary(idx, binary);
         break;
       }
+      case VARCHAR: {
+        byte[] binary = decodeBinaryColumn(buf, isLast);
+        row.addVarchar(idx, new String(binary, StandardCharsets.UTF_8));
+        break;
+      }
       case STRING: {
         byte[] binary = decodeBinaryColumn(buf, isLast);
         row.addStringUtf8(idx, binary);
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java
index cfba040..a8c9e40 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java
@@ -413,7 +413,7 @@ public class KuduPredicate {
   public static KuduPredicate newComparisonPredicate(ColumnSchema column,
                                                      ComparisonOp op,
                                                      String value) {
-    checkColumn(column, Type.STRING);
+    checkColumn(column, Type.STRING, Type.VARCHAR);
 
     byte[] bytes = Bytes.fromString(value);
     if (op == ComparisonOp.LESS_EQUAL) {
@@ -495,6 +495,7 @@ public class KuduPredicate {
    *  Type.FLOAT -> java.lang.Float
    *  Type.DOUBLE -> java.lang.Double
    *  Type.STRING -> java.lang.String
+   *  Type.VARCHAR -> java.lang.String
    *  Type.BINARY -> byte[]
    *  Type.DECIMAL -> java.math.BigDecimal
    *
@@ -598,7 +599,7 @@ public class KuduPredicate {
               column.getTypeAttributes().getPrecision()));
         }
     } else if (t instanceof String) {
-      checkColumn(column, Type.STRING);
+      checkColumn(column, Type.STRING, Type.VARCHAR);
       for (T value : values) {
         vals.add(Bytes.fromString((String) value));
       }
@@ -984,6 +985,7 @@ public class KuduPredicate {
       case DOUBLE:
         return Double.compare(Bytes.getDouble(a), Bytes.getDouble(b));
       case STRING:
+      case VARCHAR:
       case BINARY:
         return UnsignedBytes.lexicographicalComparator().compare(a, b);
       case DECIMAL128:
@@ -1036,6 +1038,7 @@ public class KuduPredicate {
         return m < n && Math.nextAfter(m, Double.POSITIVE_INFINITY) == n;
       }
       case STRING:
+      case VARCHAR:
       case BINARY: {
         if (a.length + 1 != b.length || b[a.length] != 0) {
           return false;
@@ -1153,6 +1156,7 @@ public class KuduPredicate {
       case UNIXTIME_MICROS: return TimestampUtil.timestampToString(Bytes.getLong(value));
       case FLOAT: return Float.toString(Bytes.getFloat(value));
       case DOUBLE: return Double.toString(Bytes.getDouble(value));
+      case VARCHAR:
       case STRING: {
         String v = Bytes.getString(value);
         StringBuilder sb = new StringBuilder(2 + v.length());
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index 5ebda0f..2c02e5d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -384,7 +384,8 @@ public abstract class Operation extends KuduRpc<OperationResponse> {
         ColumnSchema col = schema.getColumnByIndex(colIdx);
         // Keys should always be specified, maybe check?
         if (row.isSet(colIdx) && !row.isSetToNull(colIdx)) {
-          if (col.getType() == Type.STRING || col.getType() == Type.BINARY) {
+          if (col.getType() == Type.STRING || col.getType() == Type.BINARY ||
+              col.getType() == Type.VARCHAR) {
             ByteBuffer varLengthData = row.getVarLengthData().get(colIdx);
             varLengthData.reset();
             rows.putLong(indirectWrittenBytes);
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java b/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
index 0a5ddd6..341792d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
@@ -19,6 +19,7 @@ package org.apache.kudu.client;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -30,7 +31,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
-import org.jboss.netty.util.CharsetUtil;
 
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.ColumnTypeAttributes;
@@ -634,6 +634,44 @@ public class PartialRow {
   }
 
   /**
+   * Add a VARCHAR for the specified column.
+   *
+   * Truncates val to the length of the column in characters.
+   *
+   * @param columnIndex Index of the column
+   * @param val value to add
+   * @throws IllegalArgumentException if the column doesn't exist, is the wrong type
+   *         or the string is not UTF-8
+   * @throws IllegalStateException if the row was already applied
+   */
+  public void addVarchar(int columnIndex, String val) {
+    ColumnSchema column = schema.getColumnByIndex(columnIndex);
+    checkColumn(column, Type.VARCHAR);
+    checkNotFrozen();
+    int length = column.getTypeAttributes().getLength();
+    if (length < val.length()) {
+      val = val.substring(0, length);
+    }
+    byte[] bytes = Bytes.fromString(val);
+    addVarLengthData(columnIndex, bytes);
+  }
+
+  /**
+   * Add a VARCHAR for the specified column.
+   *
+   * Truncates val to the length of the column in characters.
+   *
+   * @param columnName Name of the column
+   * @param val value to add
+   * @throws IllegalArgumentException if the column doesn't exist, is the wrong type
+   *         or the string is not UTF-8
+   * @throws IllegalStateException if the row was already applied
+   */
+  public void addVarchar(String columnName, String val) {
+    addVarchar(schema.getColumnIndex(columnName), val);
+  }
+
+  /**
    * Get the specified column's string.
    * @param columnName name of the column to get data for
    * @return a string
@@ -655,7 +693,33 @@ public class PartialRow {
   public String getString(int columnIndex) {
     checkColumn(schema.getColumnByIndex(columnIndex), Type.STRING);
     checkValue(columnIndex);
-    return new String(getVarLengthData(columnIndex).array(), CharsetUtil.UTF_8);
+    return new String(getVarLengthData(columnIndex).array(), StandardCharsets.UTF_8);
+  }
+
+  /**
+   * Get the specified column's VARCHAR.
+   * @param columnName Name of the column to get the data for
+   * @return a VARCHAR
+   * @throws IllegalArgumentException if the column is null, is unset,
+   *         or if the type doesn't match the column's type
+   * @throws IndexOutOfBoundsException if the column doesn't exist
+   */
+  public String getVarchar(String columnName) {
+    return getVarchar(this.schema.getColumnIndex(columnName));
+  }
+
+  /**
+   * Get the specified column's VARCHAR.
+   * @param columnIndex Column index in the schema
+   * @return a VARCHAR
+   * @throws IllegalArgumentException if the column is null, is unset,
+   *         or if the type doesn't match the column's type
+   * @throws IndexOutOfBoundsException if the column doesn't exist
+   */
+  public String getVarchar(int columnIndex) {
+    checkColumn(schema.getColumnByIndex(columnIndex), Type.VARCHAR);
+    checkValue(columnIndex);
+    return new String(getVarLengthData(columnIndex).array(), StandardCharsets.UTF_8);
   }
 
   /**
@@ -905,6 +969,7 @@ public class PartialRow {
    *  Type.FLOAT -> java.lang.Float
    *  Type.DOUBLE -> java.lang.Double
    *  Type.STRING -> java.lang.String
+   *  Type.VARCHAR -> java.lang.String
    *  Type.BINARY -> byte[]
    *  Type.DECIMAL -> java.math.BigDecimal
    *
@@ -932,6 +997,7 @@ public class PartialRow {
    *  Type.FLOAT -> java.lang.Float
    *  Type.DOUBLE -> java.lang.Double
    *  Type.STRING -> java.lang.String
+   *  Type.VARCHAR -> java.lang.String
    *  Type.BINARY -> byte[] or java.lang.ByteBuffer
    *  Type.DECIMAL -> java.math.BigDecimal
    *
@@ -960,6 +1026,7 @@ public class PartialRow {
    *  Type.FLOAT -> java.lang.Float
    *  Type.DOUBLE -> java.lang.Double
    *  Type.STRING -> java.lang.String
+   *  Type.VARCHAR -> java.lang.String
    *  Type.BINARY -> byte[] or java.lang.ByteBuffer
    *  Type.DECIMAL -> java.math.BigDecimal
    *
@@ -993,6 +1060,7 @@ public class PartialRow {
         case FLOAT: addFloat(columnIndex, (Float) val); break;
         case DOUBLE: addDouble(columnIndex, (Double) val); break;
         case STRING: addString(columnIndex, (String) val); break;
+        case VARCHAR: addVarchar(columnIndex, (String) val); break;
         case BINARY:
           if (val instanceof byte[]) {
             addBinary(columnIndex, (byte[]) val);
@@ -1027,6 +1095,7 @@ public class PartialRow {
    *  Type.FLOAT -> java.lang.Float
    *  Type.DOUBLE -> java.lang.Double
    *  Type.STRING -> java.lang.String
+   *  Type.VARCHAR -> java.lang.String
    *  Type.BINARY -> byte[]
    *  Type.DECIMAL -> java.math.BigDecimal
    *
@@ -1047,6 +1116,7 @@ public class PartialRow {
       case UNIXTIME_MICROS: return getTimestamp(columnIndex);
       case FLOAT: return getFloat(columnIndex);
       case DOUBLE: return getDouble(columnIndex);
+      case VARCHAR: return getVarchar(columnIndex);
       case STRING: return getString(columnIndex);
       case BINARY: return getBinaryCopy(columnIndex);
       case DECIMAL: return getDecimal(columnIndex);
@@ -1297,13 +1367,14 @@ public class PartialRow {
         sb.append(Bytes.getDecimal(rowAlloc, schema.getColumnOffset(idx),
             typeAttributes.getPrecision(), typeAttributes.getScale()));
         return;
+      case VARCHAR:
       case BINARY:
       case STRING:
         ByteBuffer value = getVarLengthData().get(idx).duplicate();
         value.reset(); // Make sure we start at the beginning.
         byte[] data = new byte[value.limit() - value.position()];
         value.get(data);
-        if (col.getType() == Type.STRING) {
+        if (col.getType() == Type.STRING || col.getType() == Type.VARCHAR) {
           sb.append('"');
           StringUtil.appendEscapedSQLString(Bytes.getString(data), sb);
           sb.append('"');
@@ -1357,6 +1428,9 @@ public class PartialRow {
       case BINARY:
         addBinary(index, AsyncKuduClient.EMPTY_ARRAY);
         break;
+      case VARCHAR:
+        addVarchar(index, "");
+        break;
       default:
         throw new RuntimeException("unreachable");
     }
@@ -1385,6 +1459,7 @@ public class PartialRow {
             getPositionInRowAllocAndSetBitSet(index), value.length);
         break;
       }
+      case VARCHAR:
       case STRING:
       case BINARY: {
         addVarLengthData(index, value);
@@ -1478,6 +1553,7 @@ public class PartialRow {
         Bytes.setBigDecimal(rowAlloc, existing.add(smallest), precision, offset);
         return true;
       }
+      case VARCHAR:
       case STRING:
       case BINARY: {
         ByteBuffer data = varLengthData.get(index);
@@ -1566,6 +1642,7 @@ public class PartialRow {
         int scale = typeAttributes.getScale();
         return Bytes.getDecimal(a.rowAlloc, offset, precision, scale)
             .equals(Bytes.getDecimal(b.rowAlloc, offset, precision, scale));
+      case VARCHAR:
       case STRING:
       case BINARY: {
         ByteBuffer aData = a.varLengthData.get(index).duplicate();
@@ -1653,6 +1730,7 @@ public class PartialRow {
         return val.add(smallestVal).equals(
                 Bytes.getDecimal(upper.rowAlloc, offset, precision, scale));
       }
+      case VARCHAR:
       case STRING:
       case BINARY: {
         // Check that b is 1 byte bigger than a, the extra byte is 0, and the other bytes are equal.
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
index 45ebe20..a8e6176 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
@@ -139,6 +139,9 @@ public class ProtobufHelper {
     if (typeAttributes.hasScale()) {
       builder.setScale(typeAttributes.getScale());
     }
+    if (typeAttributes.hasLength()) {
+      builder.setLength(typeAttributes.getLength());
+    }
     return builder.build();
   }
 
@@ -173,6 +176,9 @@ public class ProtobufHelper {
     if(pb.hasScale()) {
       builder.scale(pb.getScale());
     }
+    if (pb.hasLength()) {
+      builder.length(pb.getLength());
+    }
     return builder.build();
   }
 
@@ -270,6 +276,7 @@ public class ProtobufHelper {
       case INT64:
       case UNIXTIME_MICROS:
         return Bytes.fromLong((Long) value);
+      case VARCHAR:
       case STRING:
         return ((String) value).getBytes(UTF_8);
       case BINARY:
@@ -306,6 +313,7 @@ public class ProtobufHelper {
         return buf.getFloat();
       case DOUBLE:
         return buf.getDouble();
+      case VARCHAR:
       case STRING:
         return value.toStringUtf8();
       case BINARY:
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
index cf57160..c1558c4 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
@@ -398,9 +398,14 @@ public class RowResult {
    * @throws IndexOutOfBoundsException if the column doesn't exist
    */
   public String getString(int columnIndex) {
+    checkType(columnIndex, Type.STRING);
+    return getVarLengthData(columnIndex);
+  }
+
+  private String getVarLengthData(int columnIndex) {
     checkValidColumn(columnIndex);
     checkNull(columnIndex);
-    checkType(columnIndex, Type.STRING);
+    checkType(columnIndex, Type.STRING, Type.VARCHAR);
     // C++ puts a Slice in rowData which is 16 bytes long for simplicity, but we only support ints.
     long offset = getLongOrOffset(columnIndex);
     long length = rowData.getLong(getCurrentRowDataOffsetForColumn(columnIndex) + 8);
@@ -412,6 +417,30 @@ public class RowResult {
   }
 
   /**
+   * Get the specified column's varchar.
+   * @param columnIndex Column index in the schema
+   * @return a string
+   * @throws IllegalArgumentException if the column is null
+   * or if the type doesn't match the column's type
+   * @throws IndexOutOfBoundsException if the column doesn't exist
+   */
+  public String getVarchar(int columnIndex) {
+    checkType(columnIndex, Type.VARCHAR);
+    return getVarLengthData(columnIndex);
+  }
+
+  /**
+   * Get the specified column's varchar.
+   * @param columnName name of the column to get data for
+   * @return a string
+   * @throws IllegalArgumentException if the column doesn't exist, is null,
+   * or if the type doesn't match the column's type
+   */
+  public String getVarchar(String columnName) {
+    return getVarchar(this.schema.getColumnIndex(columnName));
+  }
+
+  /**
    * Get a copy of the specified column's binary data.
    * @param columnName name of the column to get data for
    * @return a byte[] with the binary data.
@@ -541,6 +570,7 @@ public class RowResult {
    *  Type.UNIXTIME_MICROS -> java.sql.Timestamp
    *  Type.FLOAT -> java.lang.Float
    *  Type.DOUBLE -> java.lang.Double
+   *  Type.VARCHAR -> java.lang.String
    *  Type.STRING -> java.lang.String
    *  Type.BINARY -> byte[]
    *  Type.DECIMAL -> java.math.BigDecimal
@@ -568,6 +598,7 @@ public class RowResult {
    *  Type.UNIXTIME_MICROS -> java.sql.Timestamp
    *  Type.FLOAT -> java.lang.Float
    *  Type.DOUBLE -> java.lang.Double
+   *  Type.VARCHAR -> java.lang.String
    *  Type.STRING -> java.lang.String
    *  Type.BINARY -> byte[]
    *  Type.DECIMAL -> java.math.BigDecimal
@@ -589,6 +620,7 @@ public class RowResult {
       case UNIXTIME_MICROS: return getTimestamp(columnIndex);
       case FLOAT: return getFloat(columnIndex);
       case DOUBLE: return getDouble(columnIndex);
+      case VARCHAR: return getVarchar(columnIndex);
       case STRING: return getString(columnIndex);
       case BINARY: return getBinaryCopy(columnIndex);
       case DECIMAL: return getDecimal(columnIndex);
@@ -721,6 +753,9 @@ public class RowResult {
           case UNIXTIME_MICROS: {
             buf.append(TimestampUtil.timestampToString(getTimestamp(i)));
           } break;
+          case VARCHAR:
+            buf.append(getVarchar(i));
+            break;
           case STRING:
             buf.append(getString(i));
             break;
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/CharUtil.java b/java/kudu-client/src/main/java/org/apache/kudu/util/CharUtil.java
new file mode 100644
index 0000000..bd1639d
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/CharUtil.java
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.util;
+
+import org.apache.kudu.ColumnTypeAttributes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class CharUtil {
+  public static final int MIN_VARCHAR_LENGTH = 1;
+  public static final int MAX_VARCHAR_LENGTH = 65535;
+
+  /**
+   * Convenience method to create column type attributes for VARCHAR columns.
+   * @param length the length.
+   * @return the column type attributes.
+   */
+  public static ColumnTypeAttributes typeAttributes(int length) {
+    return new ColumnTypeAttributes.ColumnTypeAttributesBuilder()
+      .length(length)
+      .build();
+  }
+}
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/DataGenerator.java b/java/kudu-client/src/main/java/org/apache/kudu/util/DataGenerator.java
index d8ad50b..3dffba4 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/util/DataGenerator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/DataGenerator.java
@@ -108,6 +108,10 @@ public class DataGenerator {
           row.addDouble(i, random.nextDouble()); break;
         case DECIMAL:
           row.addDecimal(i, randomDecimal(col.getTypeAttributes(), random)); break;
+        case VARCHAR:
+          row.addVarchar(i, randomString(Math.min(col.getTypeAttributes().getLength(),
+                                                  stringLength), random));
+          break;
         case STRING:
           row.addString(i, randomString(stringLength, random)); break;
         case BINARY:
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/SchemaGenerator.java b/java/kudu-client/src/main/java/org/apache/kudu/util/SchemaGenerator.java
index 6a97382..adac3be 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/util/SchemaGenerator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/SchemaGenerator.java
@@ -126,6 +126,14 @@ public class SchemaGenerator {
       builder.typeAttributes(typeAttributes);
     }
 
+    if (type == Type.VARCHAR) {
+      int length = random.nextInt(
+        (CharUtil.MAX_VARCHAR_LENGTH - CharUtil.MIN_VARCHAR_LENGTH) + 1)
+        + CharUtil.MIN_VARCHAR_LENGTH;
+      typeAttributes = CharUtil.typeAttributes(length);
+      builder.typeAttributes(typeAttributes);
+    }
+
     // Sometimes set a column default value.
     if (random.nextFloat() <= defaultRate) {
       switch (type) {
@@ -154,6 +162,11 @@ public class SchemaGenerator {
         case DECIMAL:
           builder.defaultValue(randomDecimal(typeAttributes, random));
           break;
+        case VARCHAR:
+          builder.defaultValue(randomString(Math.min(DEFAULT_BINARY_LENGTH,
+                                                     typeAttributes.getLength()),
+                                            random));
+          break;
         case STRING:
           builder.defaultValue(randomString(DEFAULT_BINARY_LENGTH, random));
           break;
@@ -206,6 +219,7 @@ public class SchemaGenerator {
             Encoding.PLAIN_ENCODING,
             Encoding.BIT_SHUFFLE));
         break;
+      case VARCHAR:
       case STRING:
       case BINARY:
         validEncodings.retainAll(Arrays.asList(
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/TestColumnSchema.java b/java/kudu-client/src/test/java/org/apache/kudu/TestColumnSchema.java
index 6ca8114..3da514d 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/TestColumnSchema.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/TestColumnSchema.java
@@ -21,9 +21,11 @@ import static org.junit.Assert.assertNotEquals;
 
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
 import org.apache.kudu.test.junit.RetryRule;
+import org.apache.kudu.util.CharUtil;
 import org.apache.kudu.util.DecimalUtil;
 
 public class TestColumnSchema {
@@ -31,6 +33,9 @@ public class TestColumnSchema {
   @Rule
   public RetryRule retryRule = new RetryRule();
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   @Test
   public void testToString() {
     ColumnSchema col1 = new ColumnSchemaBuilder("col1", Type.STRING).build();
@@ -95,4 +100,19 @@ public class TestColumnSchema {
     ColumnSchema commentInt3 = new ColumnSchemaBuilder("col1", Type.INT32).comment("Test").build();
     assertNotEquals(commentInt1, commentInt3);
   }
+  @Test
+  public void testOutOfRangeVarchar() throws Exception {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("VARCHAR's length must be set and between 1 and 65535");
+    new ColumnSchemaBuilder("col1", Type.VARCHAR)
+      .typeAttributes(CharUtil.typeAttributes(70000)).build();
+  }
+
+  @Test
+  public void testVarcharWithoutLength() throws Exception {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("VARCHAR's length must be set and between 1 and 65535");
+    new ColumnSchemaBuilder("col1", Type.VARCHAR).build();
+  }
+
 }
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
index 376351f..ec9670d 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
@@ -37,6 +37,7 @@ import org.apache.kudu.Type;
 import org.apache.kudu.client.PartitionSchema.HashBucketSchema;
 import org.apache.kudu.client.PartitionSchema.RangeSchema;
 import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.util.CharUtil;
 import org.apache.kudu.util.DecimalUtil;
 
 public class TestKeyEncoding {
@@ -186,6 +187,8 @@ public class TestKeyEncoding {
             .typeAttributes(DecimalUtil.typeAttributes(DecimalUtil.MAX_DECIMAL64_PRECISION, 0)),
         new ColumnSchemaBuilder("decimal128", Type.DECIMAL).key(true)
             .typeAttributes(DecimalUtil.typeAttributes(DecimalUtil.MAX_DECIMAL128_PRECISION, 0)),
+        new ColumnSchemaBuilder("varchar", Type.VARCHAR).key(true)
+            .typeAttributes(CharUtil.typeAttributes(10)),
         new ColumnSchemaBuilder("string", Type.STRING).key(true),
         new ColumnSchemaBuilder("binary", Type.BINARY).key(true));
 
@@ -199,6 +202,7 @@ public class TestKeyEncoding {
     rowA.addDecimal("decimal32", BigDecimal.valueOf(5));
     rowA.addDecimal("decimal64", BigDecimal.valueOf(6));
     rowA.addDecimal("decimal128", BigDecimal.valueOf(7));
+    rowA.addVarchar("varchar", "");
     rowA.addString("string", "");
     rowA.addBinary("binary", "".getBytes(UTF_8));
 
@@ -212,6 +216,7 @@ public class TestKeyEncoding {
                           (byte) 0x80, 0, 0, 5,
                           (byte) 0x80, 0, 0, 0, 0, 0, 0, 6,
                           (byte) 0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7,
+                          0, 0,
                           0, 0
                       });
     assertEquals(rowA.stringifyRowKey(),
@@ -227,6 +232,7 @@ public class TestKeyEncoding {
     rowB.addDecimal("decimal32", BigDecimal.valueOf(5));
     rowB.addDecimal("decimal64", BigDecimal.valueOf(6));
     rowB.addDecimal("decimal128", BigDecimal.valueOf(7));
+    rowB.addVarchar("varchar", "abc\1\0defghij");
     rowB.addString("string", "abc\1\0def");
     rowB.addBinary("binary", "\0\1binary".getBytes(UTF_8));
 
@@ -240,6 +246,7 @@ public class TestKeyEncoding {
                           (byte) 0x80, 0, 0, 5,
                           (byte) 0x80, 0, 0, 0, 0, 0, 0, 6,
                           (byte) 0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7,
+                          'a', 'b', 'c', 1, 0, 1, 'd', 'e', 'f', 'g', 'h', 0, 0,
                           'a', 'b', 'c', 1, 0, 1, 'd', 'e', 'f', 0, 0,
                           0, 1, 'b', 'i', 'n', 'a', 'r', 'y',
                       });
@@ -254,6 +261,7 @@ public class TestKeyEncoding {
     rowC.addDecimal("decimal32", BigDecimal.valueOf(5));
     rowC.addDecimal("decimal64", BigDecimal.valueOf(6));
     rowC.addDecimal("decimal128", BigDecimal.valueOf(7));
+    rowC.addVarchar("varchar", "abc\n12345678");
     rowC.addString("string", "abc\n123");
     rowC.addBinary("binary", "\0\1\2\3\4\5".getBytes(UTF_8));
 
@@ -267,6 +275,7 @@ public class TestKeyEncoding {
                           (byte) 0x80, 0, 0, 5,
                           (byte) 0x80, 0, 0, 0, 0, 0, 0, 6,
                           (byte) 0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7,
+                          'a', 'b', 'c', '\n', '1', '2', '3', '4', '5', '6', 0, 0,
                           'a', 'b', 'c', '\n', '1', '2', '3', 0, 0,
                           0, 1, 2, 3, 4, 5,
                       });
@@ -281,6 +290,7 @@ public class TestKeyEncoding {
     rowD.addDecimal("decimal32", BigDecimal.valueOf(-5));
     rowD.addDecimal("decimal64", BigDecimal.valueOf(-6));
     rowD.addDecimal("decimal128", BigDecimal.valueOf(-7));
+    rowD.addVarchar("varchar", "\0abc\n\1\1\0 123\1\0");
     rowD.addString("string", "\0abc\n\1\1\0 123\1\0");
     rowD.addBinary("binary", "\0\1\2\3\4\5\0".getBytes(UTF_8));
 
@@ -294,6 +304,7 @@ public class TestKeyEncoding {
                           (byte) 127, -1, -1, -5,
                           (byte) 127, -1, -1, -1, -1, -1, -1, -6,
                           (byte) 127, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -7,
+                          0, 1, 'a', 'b', 'c', '\n', 1, 1, 0, 1, ' ', '1', 0, 0,
                           0, 1, 'a', 'b', 'c', '\n', 1, 1, 0, 1, ' ', '1', '2', '3', 1, 0, 1, 0, 0,
                           0, 1, 2, 3, 4, 5, 0,
                       });
@@ -382,6 +393,8 @@ public class TestKeyEncoding {
           .typeAttributes(DecimalUtil.typeAttributes(DecimalUtil.MAX_DECIMAL64_PRECISION, 0)),
         new ColumnSchemaBuilder("decimal128", Type.DECIMAL).key(true)
           .typeAttributes(DecimalUtil.typeAttributes(DecimalUtil.MAX_DECIMAL128_PRECISION, 0)),
+        new ColumnSchemaBuilder("varchar", Type.VARCHAR).key(true)
+          .typeAttributes(CharUtil.typeAttributes(10)),
         new ColumnSchemaBuilder("bool", Type.BOOL),       // not primary key type
         new ColumnSchemaBuilder("float", Type.FLOAT),     // not primary key type
         new ColumnSchemaBuilder("double", Type.DOUBLE));  // not primary key type
@@ -402,9 +415,10 @@ public class TestKeyEncoding {
     row.addDecimal(7, BigDecimal.valueOf(DecimalUtil.MAX_UNSCALED_DECIMAL32));
     row.addDecimal(8, BigDecimal.valueOf(DecimalUtil.MAX_UNSCALED_DECIMAL64));
     row.addDecimal(9, new BigDecimal(DecimalUtil.MAX_UNSCALED_DECIMAL128));
-    row.addBoolean(10, true);
-    row.addFloat(11, 8.8f);
-    row.addDouble(12, 9.9);
+    row.addVarchar(10, "varchar bar");
+    row.addBoolean(11, true);
+    row.addFloat(12, 7.8f);
+    row.addDouble(13, 9.9);
     session.apply(insert);
     session.close();
 
@@ -426,9 +440,10 @@ public class TestKeyEncoding {
           .compareTo(rr.getDecimal(8)) == 0);
       assertTrue(new BigDecimal(DecimalUtil.MAX_UNSCALED_DECIMAL128)
           .compareTo(rr.getDecimal(9)) == 0);
-      assertTrue(rr.getBoolean(10));
-      assertEquals(8.8f, rr.getFloat(11), .001f);
-      assertEquals(9.9, rr.getDouble(12), .001);
+      assertEquals("varchar ba", rr.getVarchar(10));
+      assertTrue(rr.getBoolean(11));
+      assertEquals(7.8f, rr.getFloat(12), .001f);
+      assertEquals(9.9, rr.getDouble(13), .001);
     }
   }
 }
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 288dd81..91703dc 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -24,6 +24,7 @@ import static org.apache.kudu.client.KuduPredicate.ComparisonOp.LESS_EQUAL;
 import static org.apache.kudu.test.ClientTestUtil.countRowsInScan;
 import static org.apache.kudu.test.ClientTestUtil.createBasicSchemaInsert;
 import static org.apache.kudu.test.ClientTestUtil.createManyStringsSchema;
+import static org.apache.kudu.test.ClientTestUtil.createManyVarcharsSchema;
 import static org.apache.kudu.test.ClientTestUtil.createSchemaWithBinaryColumns;
 import static org.apache.kudu.test.ClientTestUtil.createSchemaWithDecimalColumns;
 import static org.apache.kudu.test.ClientTestUtil.createSchemaWithTimestampColumns;
@@ -417,6 +418,62 @@ public class TestKuduClient {
   }
 
   /**
+   * Test inserting and retrieving VARCHAR columns.
+   */
+  @Test(timeout = 100000)
+  public void testVarchars() throws Exception {
+    Schema schema = createManyVarcharsSchema();
+    client.createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
+
+    KuduSession session = client.newSession();
+    KuduTable table = client.openTable(TABLE_NAME);
+    for (int i = 0; i < 100; i++) {
+      Insert insert = table.newInsert();
+      PartialRow row = insert.getRow();
+      row.addVarchar("key", String.format("key_%02d", i));
+      row.addVarchar("c2", "c2_" + i);
+      if (i % 2 == 1) {
+        row.addVarchar("c3", "c3_" + i);
+      }
+      row.addVarchar("c4", "c4_" + i);
+      // NOTE: we purposefully add the strings in a non-left-to-right
+      // order to verify that we still place them in the right position in
+      // the row.
+      row.addVarchar("c1", "c1_" + i);
+      session.apply(insert);
+      if (i % 50 == 0) {
+        session.flush();
+      }
+    }
+    session.flush();
+
+    List<String> rowStrings = scanTableToStrings(table);
+    assertEquals(100, rowStrings.size());
+    assertEquals(
+      "VARCHAR key(10)=key_03, VARCHAR c1(10)=c1_3, VARCHAR c2(10)=c2_3," +
+        " VARCHAR c3(10)=c3_3, VARCHAR c4(10)=c4_3",
+      rowStrings.get(3));
+    assertEquals(
+      "VARCHAR key(10)=key_04, VARCHAR c1(10)=c1_4, VARCHAR c2(10)=c2_4," +
+        " VARCHAR c3(10)=NULL, VARCHAR c4(10)=c4_4",
+      rowStrings.get(4));
+
+    KuduScanner scanner = client.newScannerBuilder(table).build();
+
+    assertTrue("Scanner should have returned row", scanner.hasMoreRows());
+
+    RowResultIterator rows = scanner.nextRows();
+    final RowResult next = rows.next();
+
+    // Do negative testing on string type.
+    try {
+      next.getInt("c2");
+      fail("IllegalArgumentException was not thrown when accessing " +
+             "a VARCHAR column with getInt");
+    } catch (IllegalArgumentException ignored) {}
+  }
+
+  /**
    * Test inserting and retrieving string columns.
    */
   @Test(timeout = 100000)
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduPredicate.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduPredicate.java
index a14ea94..760f9be 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduPredicate.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduPredicate.java
@@ -37,6 +37,7 @@ import org.junit.Test;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Type;
 import org.apache.kudu.test.junit.RetryRule;
+import org.apache.kudu.util.CharUtil;
 import org.apache.kudu.util.DecimalUtil;
 
 public class TestKuduPredicate {
@@ -83,6 +84,12 @@ public class TestKuduPredicate {
           .typeAttributes(DecimalUtil.typeAttributes(DecimalUtil.MAX_DECIMAL128_PRECISION, 2))
           .build();
 
+  private static final ColumnSchema varcharCol =
+      new ColumnSchema.ColumnSchemaBuilder("varchar", Type.VARCHAR)
+          .typeAttributes(CharUtil.typeAttributes(10))
+          .nullable(true)
+          .build();
+
   @Rule
   public RetryRule retryRule = new RetryRule();
 
@@ -935,6 +942,13 @@ public class TestKuduPredicate {
                                 new byte[] { 0, 1, 2, 3, 4, 5, 6 },
                                 new byte[] { 10 }));
 
+    testMerge(KuduPredicate.newComparisonPredicate(varcharCol, GREATER_EQUAL, "bar"),
+              KuduPredicate.newComparisonPredicate(varcharCol, LESS, "foo"),
+              new KuduPredicate(RANGE,
+                                varcharCol,
+                                new byte[] {98, 97, 114},
+                                new byte[] {102, 111, 111}));
+
     byte[] bA = "a".getBytes(UTF_8);
     byte[] bB = "b".getBytes(UTF_8);
     byte[] bC = "c".getBytes(UTF_8);
@@ -966,6 +980,8 @@ public class TestKuduPredicate {
                         KuduPredicate.newComparisonPredicate(stringCol, LESS, "a\0"));
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(binaryCol, LESS_EQUAL, new byte[] { (byte) 10 }),
                         KuduPredicate.newComparisonPredicate(binaryCol, LESS, new byte[] { (byte) 10, (byte) 0 }));
+    Assert.assertEquals(KuduPredicate.newComparisonPredicate(varcharCol, LESS_EQUAL, "a"),
+                        KuduPredicate.newComparisonPredicate(varcharCol, LESS, "a\0"));
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(byteCol, LESS_EQUAL, Byte.MAX_VALUE),
                         KuduPredicate.newIsNotNullPredicate(byteCol));
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(shortCol, LESS_EQUAL, Short.MAX_VALUE),
@@ -1051,6 +1067,8 @@ public class TestKuduPredicate {
                         KuduPredicate.none(stringCol));
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(binaryCol, LESS, new byte[] {}),
                         KuduPredicate.none(binaryCol));
+    Assert.assertEquals(KuduPredicate.newComparisonPredicate(varcharCol, LESS, ""),
+                        KuduPredicate.none(varcharCol));
   }
 
   @Test
@@ -1080,6 +1098,8 @@ public class TestKuduPredicate {
                         KuduPredicate.newIsNotNullPredicate(stringCol));
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(binaryCol, GREATER_EQUAL, new byte[] {}),
                         KuduPredicate.newIsNotNullPredicate(binaryCol));
+    Assert.assertEquals(KuduPredicate.newComparisonPredicate(varcharCol, GREATER_EQUAL, ""),
+                        KuduPredicate.newIsNotNullPredicate(varcharCol));
 
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(byteCol, GREATER_EQUAL, Byte.MAX_VALUE),
                         KuduPredicate.newComparisonPredicate(byteCol, EQUAL, Byte.MAX_VALUE));
@@ -1126,6 +1146,9 @@ public class TestKuduPredicate {
     Assert.assertEquals(
         KuduPredicate.newComparisonPredicate(binaryCol, EQUAL, (Object) new byte[] { (byte) 10 }),
         KuduPredicate.newComparisonPredicate(binaryCol, EQUAL, new byte[] { (byte) 10 }));
+    Assert.assertEquals(
+        KuduPredicate.newComparisonPredicate(varcharCol, EQUAL, (Object) "a"),
+        KuduPredicate.newComparisonPredicate(varcharCol, EQUAL, "a"));
   }
 
   @Test
@@ -1163,6 +1186,12 @@ public class TestKuduPredicate {
                         KuduPredicate.newIsNotNullPredicate(stringCol).toString());
     Assert.assertEquals("`string` IS NULL",
                         KuduPredicate.newIsNullPredicate(stringCol).toString());
+    Assert.assertEquals("`varchar` = \"my varchar\"",
+                        KuduPredicate.newComparisonPredicate(varcharCol, EQUAL, "my varchar").toString());
+    Assert.assertEquals("`varchar` IS NOT NULL",
+                        KuduPredicate.newIsNotNullPredicate(varcharCol).toString());
+    Assert.assertEquals("`varchar` IS NULL",
+                        KuduPredicate.newIsNullPredicate(varcharCol).toString());
     // IS NULL predicate on non-nullable column = NONE predicate
     Assert.assertEquals("`int` NONE",
             KuduPredicate.newIsNullPredicate(intCol).toString());
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartialRow.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartialRow.java
index f4353dd..215236b 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartialRow.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartialRow.java
@@ -85,6 +85,8 @@ public class TestPartialRow {
     assertEquals(53.35, (double) partialRow.getObject("double"), 0.0);
     assertTrue(partialRow.getObject("string") instanceof String);
     assertEquals("fun with ütf\0", partialRow.getObject("string"));
+    assertTrue(partialRow.getObject("varchar") instanceof String);
+    assertEquals("árvíztűrő ", partialRow.getObject("varchar"));
     assertTrue(partialRow.getObject("binary-array") instanceof byte[]);
     assertArrayEquals(new byte[] { 0, 1, 2, 3, 4 }, partialRow.getBinaryCopy("binary-array"));
     assertTrue(partialRow.getObject("binary-bytebuffer") instanceof byte[]);
@@ -98,7 +100,7 @@ public class TestPartialRow {
   public void testAddObject() {
     Schema schema = getSchemaWithAllTypes();
     // Ensure we aren't missing any types
-    assertEquals(13, schema.getColumnCount());
+    assertEquals(14, schema.getColumnCount());
 
     PartialRow row = schema.newPartialRow();
     row.addObject("int8", (byte) 42);
@@ -110,6 +112,7 @@ public class TestPartialRow {
     row.addObject("float", 52.35F);
     row.addObject("double", 53.35);
     row.addObject("string", "fun with ütf\0");
+    row.addObject("varchar", "árvíztűrő tükörfúrógép");
     row.addObject("binary-array", new byte[] { 0, 1, 2, 3, 4 });
     ByteBuffer binaryBuffer = ByteBuffer.wrap(new byte[] { 5, 6, 7, 8, 9 });
     row.addObject("binary-bytebuffer", binaryBuffer);
@@ -334,6 +337,12 @@ public class TestPartialRow {
             "string string=\"fun with ütf\\0\", binary binary-bytebuffer=[2, 3, 4], " +
             "decimal(5, 3) decimal=12.345)",
         row.toString());
+
+    row.addVarchar("varchar", "árvíztűrő tükörfúrógép");
+    assertEquals("(int8 int8=42, int32 int32=42, double double=52.35, " +
+        "string string=\"fun with ütf\\0\", binary binary-bytebuffer=[2, 3, 4], " +
+        "decimal(5, 3) decimal=12.345, varchar(10) varchar=\"árvíztűrő \")",
+      row.toString());
   }
 
   @Test
@@ -408,6 +417,12 @@ public class TestPartialRow {
     partialRow.addBinary(binaryIndex, new byte[] { 0, 1, 2, 3, 4 });
     assertTrue(partialRow.incrementColumn(binaryIndex));
     assertArrayEquals(new byte[] { 0, 1, 2, 3, 4, 0 }, partialRow.getBinaryCopy(binaryIndex));
+
+    // Varchar
+    int varcharIndex = getColumnIndex(partialRow, "varchar");
+    partialRow.addVarchar(varcharIndex, "hello");
+    assertTrue(partialRow.incrementColumn(varcharIndex));
+    assertEquals("hello\0", partialRow.getVarchar(varcharIndex));
   }
 
   @Test
@@ -425,6 +440,7 @@ public class TestPartialRow {
     assertEquals(-Float.MAX_VALUE, partialRow.getFloat("float"), 0.0f);
     assertEquals(-Double.MAX_VALUE, partialRow.getDouble("double"), 0.0);
     assertEquals("", partialRow.getString("string"));
+    assertEquals("", partialRow.getVarchar("varchar"));
     assertArrayEquals(new byte[0], partialRow.getBinaryCopy("binary-array"));
     assertArrayEquals(new byte[0], partialRow.getBinaryCopy("binary-bytebuffer"));
     assertEquals(BigDecimal.valueOf(-99999, 3), partialRow.getDecimal("decimal"));
@@ -450,6 +466,7 @@ public class TestPartialRow {
       case INT32: return partialRow.getInt(columnName);
       case INT64: return partialRow.getLong(columnName);
       case UNIXTIME_MICROS: return partialRow.getTimestamp(columnName);
+      case VARCHAR: return partialRow.getVarchar(columnName);
       case STRING: return partialRow.getString(columnName);
       case BINARY: return partialRow.getBinary(columnName);
       case FLOAT: return partialRow.getFloat(columnName);
@@ -471,6 +488,7 @@ public class TestPartialRow {
       case INT32: return partialRow.getInt(columnIndex);
       case INT64: return partialRow.getLong(columnIndex);
       case UNIXTIME_MICROS: return partialRow.getTimestamp(columnIndex);
+      case VARCHAR: return partialRow.getVarchar(columnIndex);
       case STRING: return partialRow.getString(columnIndex);
       case BINARY: return partialRow.getBinary(columnIndex);
       case FLOAT: return partialRow.getFloat(columnIndex);
@@ -489,6 +507,7 @@ public class TestPartialRow {
       case INT32: partialRow.addInt(columnName, 44); break;
       case INT64: partialRow.addLong(columnName, 45); break;
       case UNIXTIME_MICROS: partialRow.addTimestamp(columnName, new Timestamp(1234567890)); break;
+      case VARCHAR: partialRow.addVarchar(columnName, "fun with ütf\0"); break;
       case STRING: partialRow.addString(columnName, "fun with ütf\0"); break;
       case BINARY: partialRow.addBinary(columnName, new byte[] { 0, 1, 2, 3, 4 }); break;
       case FLOAT: partialRow.addFloat(columnName, 52.35F); break;
@@ -507,6 +526,7 @@ public class TestPartialRow {
       case INT32: partialRow.addInt(columnIndex, 44); break;
       case INT64: partialRow.addLong(columnIndex, 45); break;
       case UNIXTIME_MICROS: partialRow.addTimestamp(columnIndex, new Timestamp(1234567890)); break;
+      case VARCHAR: partialRow.addVarchar(columnIndex, "fun with ütf\0"); break;
       case STRING: partialRow.addString(columnIndex, "fun with ütf\0"); break;
       case BINARY: partialRow.addBinary(columnIndex, new byte[] { 0, 1, 2, 3, 4 }); break;
       case FLOAT: partialRow.addFloat(columnIndex, 52.35F); break;
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java
index fc8375b..16e1944 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java
@@ -75,6 +75,7 @@ public class TestRowResult {
     row.setNull(10);
     row.addTimestamp(11, new Timestamp(11));
     row.addDecimal(12, BigDecimal.valueOf(12345, 3));
+    row.addVarchar(13, "varcharval");
 
     KuduClient client = harness.getClient();
     KuduSession session = client.newSession();
@@ -143,6 +144,10 @@ public class TestRowResult {
       assertEquals(BigDecimal.valueOf(12345, 3), rr.getObject(12));
       assertEquals(BigDecimal.valueOf(12345, 3), rr.getDecimal(allTypesSchema.getColumnByIndex(12).getName()));
 
+      assertEquals("varcharval", rr.getVarchar(13));
+      assertEquals("varcharval", rr.getObject(13));
+      assertEquals("varcharval", rr.getVarchar(allTypesSchema.getColumnByIndex(13).getName()));
+
       // We test with the column name once since it's the same method for all types, unlike above.
       assertEquals(Type.INT8, rr.getColumnType(allTypesSchema.getColumnByIndex(0).getName()));
       assertEquals(Type.INT8, rr.getColumnType(0));
@@ -156,6 +161,7 @@ public class TestRowResult {
       assertEquals(Type.BINARY, rr.getColumnType(8));
       assertEquals(Type.UNIXTIME_MICROS, rr.getColumnType(11));
       assertEquals(Type.DECIMAL, rr.getColumnType(12));
+      assertEquals(Type.VARCHAR, rr.getColumnType(13));
     }
   }
 }
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanPredicate.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanPredicate.java
index 5426125..e64ff44 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanPredicate.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanPredicate.java
@@ -35,6 +35,7 @@ import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.KuduPredicate.ComparisonOp;
 import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.util.CharUtil;
 import org.apache.kudu.util.DecimalUtil;
 
 public class TestScanPredicate {
@@ -53,7 +54,16 @@ public class TestScanPredicate {
 
   private Schema createTableSchema(Type type) {
     ColumnSchema key = new ColumnSchema.ColumnSchemaBuilder("key", Type.INT64).key(true).build();
-    ColumnSchema val = new ColumnSchema.ColumnSchemaBuilder("value", type).nullable(true).build();
+    ColumnSchema val;
+    switch (type) {
+      case VARCHAR:
+        val = new ColumnSchema.ColumnSchemaBuilder("value", type)
+          .typeAttributes(CharUtil.typeAttributes(10)).nullable(true).build();
+        break;
+      default:
+        val = new ColumnSchema.ColumnSchemaBuilder("value", type).nullable(true).build();
+        break;
+    }
     return new Schema(ImmutableList.of(key, val));
   }
 
@@ -631,7 +641,16 @@ public class TestScanPredicate {
 
   @Test
   public void testStringPredicates() throws Exception {
-    Schema schema = createTableSchema(Type.STRING);
+    testVarlengthPredicates(Type.STRING);
+  }
+
+  @Test
+  public void testVarcharPredicates() throws Exception {
+    testVarlengthPredicates(Type.VARCHAR);
+  }
+
+  private void testVarlengthPredicates(Type type) throws Exception {
+    Schema schema = createTableSchema(type);
     client.createTable("string-table", schema, createTableOptions());
     KuduTable table = client.openTable("string-table");
 
@@ -643,7 +662,16 @@ public class TestScanPredicate {
     for (String value : values) {
       Insert insert = table.newInsert();
       insert.getRow().addLong("key", i++);
-      insert.getRow().addString("value", value);
+      switch (type) {
+        case VARCHAR:
+          insert.getRow().addVarchar("value", value);
+          break;
+        case STRING:
+          insert.getRow().addString("value", value);
+          break;
+        default:
+          throw new IllegalArgumentException("CHAR/VARCHAR/STRING expected");
+      }
       session.apply(insert);
     }
     Insert nullInsert = table.newInsert();
diff --git a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/DistributedDataGenerator.scala b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/DistributedDataGenerator.scala
index 357eb8b..d5a0938 100644
--- a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/DistributedDataGenerator.scala
+++ b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/DistributedDataGenerator.scala
@@ -220,6 +220,8 @@ private class GeneratedRowIterator(
           row.addDecimal(
             i,
             new BigDecimal(BigInteger.valueOf(value), col.getTypeAttributes.getScale))
+        case Type.VARCHAR =>
+          row.addVarchar(i, String.valueOf(value))
         case Type.STRING =>
           row.addString(i, String.valueOf(value))
         case Type.BINARY =>
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/RowConverter.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/RowConverter.scala
index 6df627d..0863bae 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/RowConverter.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/RowConverter.scala
@@ -17,6 +17,7 @@
 package org.apache.kudu.spark.kudu
 
 import org.apache.kudu.Schema
+import org.apache.kudu.Type
 import org.apache.kudu.client.PartialRow
 import org.apache.kudu.client.RowResult
 import org.apache.spark.sql.Row
@@ -65,7 +66,14 @@ class RowConverter(kuduSchema: Schema, schema: StructType, ignoreNull: Boolean)
       } else {
         schema.fields(sparkIdx).dataType match {
           case DataTypes.StringType =>
-            partialRow.addString(kuduIdx, row.getString(sparkIdx))
+            kuduSchema.getColumnByIndex(kuduIdx).getType match {
+              case Type.STRING =>
+                partialRow.addString(kuduIdx, row.getString(sparkIdx))
+              case Type.VARCHAR =>
+                partialRow.addVarchar(kuduIdx, row.getString(sparkIdx))
+              case t =>
+                throw new IllegalArgumentException(s"Invalid Kudu column type $t")
+            }
           case DataTypes.BinaryType =>
             partialRow.addBinary(kuduIdx, row.getAs[Array[Byte]](sparkIdx))
           case DataTypes.BooleanType =>
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala
index 636b343..bac9193 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala
@@ -52,6 +52,7 @@ object SparkUtil {
       case Type.UNIXTIME_MICROS => TimestampType
       case Type.FLOAT => FloatType
       case Type.DOUBLE => DoubleType
+      case Type.VARCHAR => StringType
       case Type.STRING => StringType
       case Type.BINARY => BinaryType
       case Type.DECIMAL => DecimalType(a.getPrecision, a.getScale)
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 521c83a..2f7ddb1 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -529,7 +529,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
       ))
 
     val dfDefaultSchema = sqlContext.read.options(kuduOptions).format("kudu").load
-    assertEquals(14, dfDefaultSchema.schema.fields.length)
+    assertEquals(15, dfDefaultSchema.schema.fields.length)
 
     val dfWithUserSchema =
       sqlContext.read.options(kuduOptions).schema(userSchema).format("kudu").load
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
index 76b6b2b..b322532 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
@@ -82,7 +82,8 @@ class KuduContextTest extends KuduTestSuite with Matchers {
           "c10_byte",
           "c11_decimal32",
           "c12_decimal64",
-          "c13_decimal128"
+          "c13_decimal128",
+          "c14_varchar"
         )
       )
       .map(r => r.toSeq)
@@ -106,6 +107,7 @@ class KuduContextTest extends KuduTestSuite with Matchers {
       assert(r.apply(11).asInstanceOf[BigDecimal] == BigDecimal.valueOf(rows.apply(index)._2))
       assert(r.apply(12).asInstanceOf[BigDecimal] == BigDecimal.valueOf(rows.apply(index)._2))
       assert(r.apply(13).asInstanceOf[BigDecimal] == BigDecimal.valueOf(rows.apply(index)._2))
+      assert(r.apply(14).asInstanceOf[String] == rows.apply(index)._3)
     })
   }
 
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
index 61e1069..8dbe53f 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
@@ -30,6 +30,7 @@ import org.apache.kudu.client.KuduTable
 import org.apache.kudu.Schema
 import org.apache.kudu.Type
 import org.apache.kudu.test.KuduTestHarness
+import org.apache.kudu.util.CharUtil
 import org.apache.kudu.util.DecimalUtil
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.DataFrame
@@ -84,6 +85,10 @@ trait KuduTestSuite extends JUnitSuite {
             .precision(DecimalUtil.MAX_DECIMAL128_PRECISION)
             .build()
         )
+        .build(),
+      new ColumnSchemaBuilder("c14_varchar", Type.VARCHAR)
+        .typeAttributes(CharUtil.typeAttributes(CharUtil.MAX_VARCHAR_LENGTH))
+        .nullable(true)
         .build()
     )
     new Schema(columns.asJava)
@@ -181,9 +186,11 @@ trait KuduTestSuite extends JUnitSuite {
       // Sprinkling some nulls so that queries see them.
       val s = if (i % 2 == 0) {
         row.addString(2, i.toString)
+        row.addVarchar(14, i.toString)
         i.toString
       } else {
         row.setNull(2)
+        row.setNull(14)
         null
       }
 
@@ -216,6 +223,7 @@ trait KuduTestSuite extends JUnitSuite {
       row.addDecimal(11, BigDecimal.valueOf(i))
       row.addDecimal(12, BigDecimal.valueOf(i))
       row.addDecimal(13, BigDecimal.valueOf(i))
+      row.addVarchar(14, i.toString)
 
       // Sprinkling some nulls so that queries see them.
       val s = if (i % 2 == 0) {
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
index ba6842b..c4fce18 100644
--- a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
@@ -42,8 +42,8 @@ import org.apache.kudu.client.PartialRow;
 import org.apache.kudu.client.RowResult;
 import org.apache.kudu.client.RowResultIterator;
 import org.apache.kudu.client.Upsert;
+import org.apache.kudu.util.CharUtil;
 import org.apache.kudu.util.DecimalUtil;
-import org.apache.kudu.util.StringUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
@@ -213,7 +213,9 @@ public abstract class ClientTestUtil {
             new ColumnSchema.ColumnSchemaBuilder("null", Type.STRING).nullable(true).build(),
             new ColumnSchema.ColumnSchemaBuilder("timestamp", Type.UNIXTIME_MICROS).build(),
             new ColumnSchema.ColumnSchemaBuilder("decimal", Type.DECIMAL)
-                .typeAttributes(DecimalUtil.typeAttributes(5, 3)).build());
+              .typeAttributes(DecimalUtil.typeAttributes(5, 3)).build(),
+            new ColumnSchema.ColumnSchemaBuilder("varchar", Type.VARCHAR)
+              .typeAttributes(CharUtil.typeAttributes(10)).build());
 
     return new Schema(columns);
   }
@@ -221,7 +223,7 @@ public abstract class ClientTestUtil {
   public static PartialRow getPartialRowWithAllTypes() {
     Schema schema = getSchemaWithAllTypes();
     // Ensure we aren't missing any types
-    assertEquals(13, schema.getColumnCount());
+    assertEquals(14, schema.getColumnCount());
 
     PartialRow row = schema.newPartialRow();
     row.addByte("int8", (byte) 42);
@@ -233,6 +235,7 @@ public abstract class ClientTestUtil {
     row.addFloat("float", 52.35F);
     row.addDouble("double", 53.35);
     row.addString("string", "fun with ütf\0");
+    row.addVarchar("varchar", "árvíztűrő tükörfúrógép");
     row.addBinary("binary-array", new byte[] { 0, 1, 2, 3, 4 });
     ByteBuffer binaryBuffer = ByteBuffer.wrap(new byte[] { 5, 6, 7, 8, 9 });
     row.addBinary("binary-bytebuffer", binaryBuffer);
@@ -413,6 +416,21 @@ public abstract class ClientTestUtil {
     return table;
   }
 
+  public static Schema createManyVarcharsSchema() {
+    ArrayList<ColumnSchema> columns = new ArrayList<>();
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.VARCHAR)
+                  .typeAttributes(CharUtil.typeAttributes(10)).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.VARCHAR)
+                  .typeAttributes(CharUtil.typeAttributes(10)).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c2", Type.VARCHAR)
+                  .typeAttributes(CharUtil.typeAttributes(10)).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c3", Type.VARCHAR)
+                  .typeAttributes(CharUtil.typeAttributes(10)).nullable(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c4", Type.VARCHAR)
+                  .typeAttributes(CharUtil.typeAttributes(10)).nullable(true).build());
+    return new Schema(columns);
+  }
+
   public static Schema createManyStringsSchema() {
     ArrayList<ColumnSchema> columns = new ArrayList<ColumnSchema>(4);
     columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.STRING).key(true).build());