You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/06/12 01:07:19 UTC

spark git commit: [SPARK-8286] Rewrite UTF8String in Java and move it into unsafe package.

Repository: spark
Updated Branches:
  refs/heads/master 9cbdf31ec -> 7d669a56f


[SPARK-8286] Rewrite UTF8String in Java and move it into unsafe package.

Unit test is still in Scala.

Author: Reynold Xin <rx...@databricks.com>

Closes #6738 from rxin/utf8string-java and squashes the following commits:

562dc6e [Reynold Xin] Flag...
98e600b [Reynold Xin] Another try with encoding setting ..
cfa6bdf [Reynold Xin] Merge branch 'master' into utf8string-java
a3b124d [Reynold Xin] Try different UTF-8 encoded characters.
1ff7c82 [Reynold Xin] Enable UTF-8 encoding.
82d58cc [Reynold Xin] Reset run-tests.
2cb3c69 [Reynold Xin] Use utf-8 encoding in set bytes.
53f8ef4 [Reynold Xin] Hack Jenkins to run one test.
9a48e8d [Reynold Xin] Fixed runtime compilation error.
911c450 [Reynold Xin] Moved unit test also to Java.
4eff7bd [Reynold Xin] Improved unit test coverage.
8e89a3c [Reynold Xin] Fixed tests.
77c64bd [Reynold Xin] Fixed string type codegen.
ffedb62 [Reynold Xin] Code review feedback.
0967ce6 [Reynold Xin] Fixed import ordering.
45a123d [Reynold Xin] [SPARK-8286] Rewrite UTF8String in Java and move it into unsafe package.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d669a56
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d669a56
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d669a56

Branch: refs/heads/master
Commit: 7d669a56ffc7a4f5827830ef3c27d45cc0e8774f
Parents: 9cbdf31
Author: Reynold Xin <rx...@databricks.com>
Authored: Thu Jun 11 16:07:15 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Jun 11 16:07:15 2015 -0700

----------------------------------------------------------------------
 project/SparkBuild.scala                        |   4 +-
 .../sql/catalyst/expressions/UnsafeRow.java     |   2 +-
 .../sql/catalyst/CatalystTypeConverters.scala   |   3 +-
 .../spark/sql/catalyst/ScalaReflection.scala    |   1 +
 .../spark/sql/catalyst/expressions/Cast.scala   |   9 +-
 .../expressions/SpecificMutableRow.scala        |   4 +-
 .../expressions/UnsafeRowConverter.scala        |   1 +
 .../expressions/codegen/CodeGenerator.scala     |   2 +
 .../sql/catalyst/expressions/literals.scala     |   3 +-
 .../spark/sql/catalyst/expressions/rows.scala   |   7 +-
 .../catalyst/expressions/stringOperations.scala |   3 +-
 .../org/apache/spark/sql/types/StringType.scala |   1 +
 .../org/apache/spark/sql/types/UTF8String.scala | 221 -------------------
 .../catalyst/expressions/ComplexTypeSuite.scala |   1 +
 .../UnsafeFixedWidthAggregationMapSuite.scala   |  10 +-
 .../spark/sql/types/UTF8StringSuite.scala       |  70 ------
 .../apache/spark/sql/columnar/ColumnStats.scala |   1 +
 .../apache/spark/sql/columnar/ColumnType.scala  |   3 +-
 .../sql/execution/SparkSqlSerializer2.scala     |   3 +-
 .../spark/sql/execution/debug/package.scala     |   1 +
 .../apache/spark/sql/execution/pythonUdfs.scala |   9 +-
 .../org/apache/spark/sql/jdbc/JDBCRDD.scala     |   1 +
 .../apache/spark/sql/json/JacksonParser.scala   |  11 +-
 .../org/apache/spark/sql/json/JsonRDD.scala     |   8 +-
 .../spark/sql/parquet/ParquetConverter.scala    |   7 +-
 .../spark/sql/parquet/ParquetFilters.scala      |   1 +
 .../spark/sql/parquet/ParquetTableSupport.scala |   1 +
 .../spark/sql/sources/DataSourceStrategy.scala  |   3 +-
 .../spark/sql/columnar/ColumnTypeSuite.scala    |   8 +-
 .../spark/sql/columnar/ColumnarTestUtils.scala  |   7 +-
 .../apache/spark/sql/hive/HiveInspectors.scala  |  13 +-
 .../apache/spark/unsafe/types/UTF8String.java   | 212 ++++++++++++++++++
 .../apache/spark/unsafe/bitset/BitSetSuite.java |   1 -
 .../spark/unsafe/types/UTF8StringSuite.java     |  93 ++++++++
 34 files changed, 390 insertions(+), 335 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index aa75a64..41b7eba 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -149,7 +149,9 @@ object SparkBuild extends PomBuild {
     javacOptions in (Compile, doc) ++= {
       val Array(major, minor, _) = System.getProperty("java.version").split("\\.", 3)
       if (major.toInt >= 1 && minor.toInt >= 8) Seq("-Xdoclint:all", "-Xdoclint:-missing") else Seq.empty
-    }
+    },
+
+    javacOptions in Compile ++= Seq("-encoding", "UTF-8")
   )
 
   def enable(settings: Seq[Setting[_]])(projectRef: ProjectRef) = {

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index ec97fe6..143acc9 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -30,7 +30,7 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.BaseMutableRow;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.types.UTF8String;
+import org.apache.spark.unsafe.types.UTF8String;
 import org.apache.spark.unsafe.PlatformDependent;
 import org.apache.spark.unsafe.bitset.BitSetMethods;
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index beb82db..7e4b11a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -28,6 +28,7 @@ import scala.collection.mutable.HashMap
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Functions to convert Scala types to Catalyst types and vice versa.
@@ -257,7 +258,7 @@ object CatalystTypeConverters {
 
   private object StringConverter extends CatalystTypeConverter[Any, String, Any] {
     override def toCatalystImpl(scalaValue: Any): UTF8String = scalaValue match {
-      case str: String => UTF8String(str)
+      case str: String => UTF8String.fromString(str)
       case utf8: UTF8String => utf8
     }
     override def toScala(catalystValue: Any): String = catalystValue match {

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 6998cc8..90698cd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst
 
+import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 037efd7..4c7123f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -24,6 +24,7 @@ import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
 import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 
 /** Cast the child expression to the target data type. */
 case class Cast(child: Expression, dataType: DataType) extends UnaryExpression with Logging {
@@ -111,11 +112,11 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
 
   // UDFToString
   private[this] def castToString(from: DataType): Any => Any = from match {
-    case BinaryType => buildCast[Array[Byte]](_, UTF8String(_))
-    case DateType => buildCast[Int](_, d => UTF8String(DateUtils.toString(d)))
+    case BinaryType => buildCast[Array[Byte]](_, UTF8String.fromBytes)
+    case DateType => buildCast[Int](_, d => UTF8String.fromString(DateUtils.toString(d)))
     case TimestampType => buildCast[Long](_,
-      t => UTF8String(timestampToString(DateUtils.toJavaTimestamp(t))))
-    case _ => buildCast[Any](_, o => UTF8String(o.toString))
+      t => UTF8String.fromString(timestampToString(DateUtils.toJavaTimestamp(t))))
+    case _ => buildCast[Any](_, o => UTF8String.fromString(o.toString))
   }
 
   // BinaryConverter

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
index 2c88451..98eda61 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.catalyst.expressions
 
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * A parent class for mutable container objects that are reused when the values are changed,
@@ -240,7 +241,8 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR
     }
   }
 
-  override def setString(ordinal: Int, value: String): Unit = update(ordinal, UTF8String(value))
+  override def setString(ordinal: Int, value: String): Unit =
+    update(ordinal, UTF8String.fromString(value))
 
   override def getString(ordinal: Int): String = apply(ordinal).toString
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
index 5b2c857..5350123 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.PlatformDependent
 import org.apache.spark.unsafe.array.ByteArrayMethods
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Converts Rows into UnsafeRow format. This class is NOT thread-safe.

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index ecf8e0d..536e477 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -26,6 +26,8 @@ import org.codehaus.janino.ClassBodyEvaluator
 import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
 
 // These classes are here to avoid issues with serialization and integration with quasiquotes.
 class IntegerHashSet extends org.apache.spark.util.collection.OpenHashSet[Int]

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index ef50c50..a33007b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.CatalystTypeConverters
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
 import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 
 object Literal {
   def apply(v: Any): Literal = v match {
@@ -32,7 +33,7 @@ object Literal {
     case f: Float => Literal(f, FloatType)
     case b: Byte => Literal(b, ByteType)
     case s: Short => Literal(s, ShortType)
-    case s: String => Literal(UTF8String(s), StringType)
+    case s: String => Literal(UTF8String.fromString(s), StringType)
     case b: Boolean => Literal(b, BooleanType)
     case d: BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
     case d: java.math.BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
index 5fd892c..5d2d820 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import org.apache.spark.sql.types.{UTF8String, DataType, StructType, AtomicType}
+import org.apache.spark.sql.types.{DataType, StructType, AtomicType}
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * An extended interface to [[Row]] that allows the values for each column to be updated.  Setting
@@ -197,7 +198,9 @@ class GenericMutableRow(v: Array[Any]) extends GenericRow(v) with MutableRow {
   override def setFloat(ordinal: Int, value: Float): Unit = { values(ordinal) = value }
   override def setInt(ordinal: Int, value: Int): Unit = { values(ordinal) = value }
   override def setLong(ordinal: Int, value: Long): Unit = { values(ordinal) = value }
-  override def setString(ordinal: Int, value: String) { values(ordinal) = UTF8String(value)}
+  override def setString(ordinal: Int, value: String) {
+    values(ordinal) = UTF8String.fromString(value)
+  }
   override def setNullAt(i: Int): Unit = { values(i) = null }
 
   override def setShort(ordinal: Int, value: Short): Unit = { values(ordinal) = value }

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
index 3450383..4f4c195 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
@@ -22,6 +22,7 @@ import java.util.regex.Pattern
 import org.apache.spark.sql.catalyst.analysis.UnresolvedException
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 
 trait StringRegexExpression extends ExpectsInputTypes {
   self: BinaryExpression =>
@@ -277,7 +278,7 @@ case class Substring(str: Expression, pos: Expression, len: Expression)
           ba.slice(st, end)
         case s: UTF8String =>
           val (st, end) = slicePos(start, length, () => s.length())
-          s.slice(st, end)
+          s.substring(st, end)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StringType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StringType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StringType.scala
index 134ab0a..1e9476a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StringType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StringType.scala
@@ -22,6 +22,7 @@ import scala.reflect.runtime.universe.typeTag
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.sql.catalyst.ScalaReflectionLock
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * :: DeveloperApi ::

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala
deleted file mode 100644
index f5d8fcc..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.types
-
-import java.util.Arrays
-
-import org.apache.spark.annotation.DeveloperApi
-
-/**
- * :: DeveloperApi ::
- * A UTF-8 String, as internal representation of StringType in SparkSQL
- *
- * A String encoded in UTF-8 as an Array[Byte], which can be used for comparison,
- * search, see http://en.wikipedia.org/wiki/UTF-8 for details.
- *
- * Note: This is not designed for general use cases, should not be used outside SQL.
- */
-@DeveloperApi
-final class UTF8String extends Ordered[UTF8String] with Serializable {
-
-  private[this] var bytes: Array[Byte] = _
-
-  /**
-   * Update the UTF8String with String.
-   */
-  def set(str: String): UTF8String = {
-    bytes = str.getBytes("utf-8")
-    this
-  }
-
-  /**
-   * Update the UTF8String with Array[Byte], which should be encoded in UTF-8
-   */
-  def set(bytes: Array[Byte]): UTF8String = {
-    this.bytes = bytes
-    this
-  }
-
-  /**
-   * Return the number of bytes for a code point with the first byte as `b`
-   * @param b The first byte of a code point
-   */
-  @inline
-  private[this] def numOfBytes(b: Byte): Int = {
-    val offset = (b & 0xFF) - 192
-    if (offset >= 0) UTF8String.bytesOfCodePointInUTF8(offset) else 1
-  }
-
-  /**
-   * Return the number of code points in it.
-   *
-   * This is only used by Substring() when `start` is negative.
-   */
-  def length(): Int = {
-    var len = 0
-    var i: Int = 0
-    while (i < bytes.length) {
-      i += numOfBytes(bytes(i))
-      len += 1
-    }
-    len
-  }
-
-  def getBytes: Array[Byte] = {
-    bytes
-  }
-
-  /**
-   * Return a substring of this,
-   * @param start the position of first code point
-   * @param until the position after last code point
-   */
-  def slice(start: Int, until: Int): UTF8String = {
-    if (until <= start || start >= bytes.length || bytes == null) {
-      new UTF8String
-    }
-
-    var c = 0
-    var i: Int = 0
-    while (c < start && i < bytes.length) {
-      i += numOfBytes(bytes(i))
-      c += 1
-    }
-    var j = i
-    while (c < until && j < bytes.length) {
-      j += numOfBytes(bytes(j))
-      c += 1
-    }
-    UTF8String(Arrays.copyOfRange(bytes, i, j))
-  }
-
-  def contains(sub: UTF8String): Boolean = {
-    val b = sub.getBytes
-    if (b.length == 0) {
-      return true
-    }
-    var i: Int = 0
-    while (i <= bytes.length - b.length) {
-      // In worst case, it's O(N*K), but should works fine with SQL
-      if (bytes(i) == b(0) && Arrays.equals(Arrays.copyOfRange(bytes, i, i + b.length), b)) {
-        return true
-      }
-      i += 1
-    }
-    false
-  }
-
-  def startsWith(prefix: UTF8String): Boolean = {
-    val b = prefix.getBytes
-    if (b.length > bytes.length) {
-      return false
-    }
-    Arrays.equals(Arrays.copyOfRange(bytes, 0, b.length), b)
-  }
-
-  def endsWith(suffix: UTF8String): Boolean = {
-    val b = suffix.getBytes
-    if (b.length > bytes.length) {
-      return false
-    }
-    Arrays.equals(Arrays.copyOfRange(bytes, bytes.length - b.length, bytes.length), b)
-  }
-
-  def toUpperCase(): UTF8String = {
-    // upper case depends on locale, fallback to String.
-    UTF8String(toString().toUpperCase)
-  }
-
-  def toLowerCase(): UTF8String = {
-    // lower case depends on locale, fallback to String.
-    UTF8String(toString().toLowerCase)
-  }
-
-  override def toString(): String = {
-    new String(bytes, "utf-8")
-  }
-
-  override def clone(): UTF8String = new UTF8String().set(this.bytes)
-
-  override def compare(other: UTF8String): Int = {
-    var i: Int = 0
-    val b = other.getBytes
-    while (i < bytes.length && i < b.length) {
-      val res = bytes(i).compareTo(b(i))
-      if (res != 0) return res
-      i += 1
-    }
-    bytes.length - b.length
-  }
-
-  override def compareTo(other: UTF8String): Int = {
-    compare(other)
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case s: UTF8String =>
-      Arrays.equals(bytes, s.getBytes)
-    case s: String =>
-      // This is only used for Catalyst unit tests
-      // fail fast
-      bytes.length >= s.length && length() == s.length && toString() == s
-    case _ =>
-      false
-  }
-
-  override def hashCode(): Int = {
-    Arrays.hashCode(bytes)
-  }
-}
-
-/**
- * :: DeveloperApi ::
- */
-@DeveloperApi
-object UTF8String {
-  // number of tailing bytes in a UTF8 sequence for a code point
-  // see http://en.wikipedia.org/wiki/UTF-8, 192-256 of Byte 1
-  private[types] val bytesOfCodePointInUTF8: Array[Int] = Array(2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
-    2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
-    3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
-    4, 4, 4, 4, 4, 4, 4, 4,
-    5, 5, 5, 5,
-    6, 6, 6, 6)
-
-  /**
-   * Create a UTF-8 String from String
-   */
-  def apply(s: String): UTF8String = {
-    if (s != null) {
-      new UTF8String().set(s)
-    } else {
-      null
-    }
-  }
-
-  /**
-   * Create a UTF-8 String from Array[Byte], which should be encoded in UTF-8
-   */
-  def apply(bytes: Array[Byte]): UTF8String = {
-    if (bytes != null) {
-      new UTF8String().set(bytes)
-    } else {
-      null
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala
index f151dd2..bcc594c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala
@@ -21,6 +21,7 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.analysis.UnresolvedExtractValue
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 
 
 class ComplexTypeSuite extends SparkFunSuite with ExpressionEvalHelper {

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala
index 88a36aa..72bbc4e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala
@@ -20,11 +20,13 @@ package org.apache.spark.sql.catalyst.expressions
 import scala.collection.JavaConverters._
 import scala.util.Random
 
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, TaskMemoryManager, MemoryAllocator}
 import org.scalatest.{BeforeAndAfterEach, Matchers}
 
+import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, TaskMemoryManager, MemoryAllocator}
+import org.apache.spark.unsafe.types.UTF8String
+
 
 class UnsafeFixedWidthAggregationMapSuite
   extends SparkFunSuite
@@ -82,7 +84,7 @@ class UnsafeFixedWidthAggregationMapSuite
       1024, // initial capacity
       false // disable perf metrics
     )
-    val groupKey = new GenericRow(Array[Any](UTF8String("cats")))
+    val groupKey = new GenericRow(Array[Any](UTF8String.fromString("cats")))
 
     // Looking up a key stores a zero-entry in the map (like Python Counters or DefaultDicts)
     map.getAggregationBuffer(groupKey)
@@ -111,7 +113,7 @@ class UnsafeFixedWidthAggregationMapSuite
     val rand = new Random(42)
     val groupKeys: Set[String] = Seq.fill(512)(rand.nextString(1024)).toSet
     groupKeys.foreach { keyString =>
-      map.getAggregationBuffer(new GenericRow(Array[Any](UTF8String(keyString))))
+      map.getAggregationBuffer(new GenericRow(Array[Any](UTF8String.fromString(keyString))))
     }
     val seenKeys: Set[String] = map.iterator().asScala.map { entry =>
       entry.key.getString(0)

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/catalyst/src/test/scala/org/apache/spark/sql/types/UTF8StringSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/UTF8StringSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/UTF8StringSuite.scala
deleted file mode 100644
index 81d7ab0..0000000
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/UTF8StringSuite.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.types
-
-import org.apache.spark.SparkFunSuite
-
-// scalastyle:off
-class UTF8StringSuite extends SparkFunSuite {
-  test("basic") {
-    def check(str: String, len: Int) {
-
-      assert(UTF8String(str).length == len)
-      assert(UTF8String(str.getBytes("utf8")).length() == len)
-
-      assert(UTF8String(str) == str)
-      assert(UTF8String(str.getBytes("utf8")) == str)
-      assert(UTF8String(str).toString == str)
-      assert(UTF8String(str.getBytes("utf8")).toString == str)
-      assert(UTF8String(str.getBytes("utf8")) == UTF8String(str))
-
-      assert(UTF8String(str).hashCode() == UTF8String(str.getBytes("utf8")).hashCode())
-    }
-
-    check("hello", 5)
-    check("世 界", 3)
-  }
-
-  test("contains") {
-    assert(UTF8String("hello").contains(UTF8String("ello")))
-    assert(!UTF8String("hello").contains(UTF8String("vello")))
-    assert(UTF8String("大千世界").contains(UTF8String("千世")))
-    assert(!UTF8String("大千世界").contains(UTF8String("世千")))
-  }
-
-  test("prefix") {
-    assert(UTF8String("hello").startsWith(UTF8String("hell")))
-    assert(!UTF8String("hello").startsWith(UTF8String("ell")))
-    assert(UTF8String("大千世界").startsWith(UTF8String("大千")))
-    assert(!UTF8String("大千世界").startsWith(UTF8String("千")))
-  }
-
-  test("suffix") {
-    assert(UTF8String("hello").endsWith(UTF8String("ello")))
-    assert(!UTF8String("hello").endsWith(UTF8String("ellov")))
-    assert(UTF8String("大千世界").endsWith(UTF8String("世界")))
-    assert(!UTF8String("大千世界").endsWith(UTF8String("世")))
-  }
-
-  test("slice") {
-    assert(UTF8String("hello").slice(1, 3) == UTF8String("el"))
-    assert(UTF8String("大千世界").slice(0, 1) == UTF8String("大"))
-    assert(UTF8String("大千世界").slice(1, 3) == UTF8String("千世"))
-    assert(UTF8String("大千世界").slice(3, 5) == UTF8String("界"))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
index 83881a3..11c79c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.columnar
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 
 private[sql] class ColumnStatisticsSchema(a: Attribute) extends Serializable {
   val upperBound = AttributeReference(a.name + ".upperBound", a.dataType, nullable = true)()

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
index c9c4d63..8e21020 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.MutableRow
 import org.apache.spark.sql.execution.SparkSqlSerializer
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * An abstract class that represents type of a column. Used to append/extract Java objects into/from
@@ -320,7 +321,7 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
     val length = buffer.getInt()
     val stringBytes = new Array[Byte](length)
     buffer.get(stringBytes, 0, length)
-    UTF8String(stringBytes)
+    UTF8String.fromBytes(stringBytes)
   }
 
   override def setField(row: MutableRow, ordinal: Int, value: UTF8String): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
index 60f3b2d..202e448 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
@@ -28,6 +28,7 @@ import org.apache.spark.serializer._
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, MutableRow, SpecificMutableRow}
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * The serialization stream for [[SparkSqlSerializer2]]. It assumes that the object passed in
@@ -434,7 +435,7 @@ private[sql] object SparkSqlSerializer2 {
                 val length = in.readInt()
                 val bytes = new Array[Byte](length)
                 in.readFully(bytes)
-                mutableRow.update(i, UTF8String(bytes))
+                mutableRow.update(i, UTF8String.fromBytes(bytes))
               }
 
             case BinaryType =>

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index 720b529..83c1f65 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.unsafe.types.UTF8String
 
 import scala.collection.mutable.HashSet
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index b1333ec..2b45a83 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 
 import net.razorvine.pickle.{Pickler, Unpickler}
 
+import org.apache.spark.{Accumulator, Logging => SparkLogging}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.api.python.{PythonBroadcast, PythonRDD}
 import org.apache.spark.broadcast.Broadcast
@@ -34,7 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.types._
-import org.apache.spark.{Accumulator, Logging => SparkLogging}
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * A serialized version of a Python lambda function.  Suitable for use in a [[PythonRDD]].
@@ -203,8 +204,10 @@ object EvaluatePython {
     case (c: Long, IntegerType) => c.toInt
     case (c: Int, LongType) => c.toLong
     case (c: Double, FloatType) => c.toFloat
-    case (c: String, StringType) => UTF8String(c)
-    case (c, StringType) if !c.isInstanceOf[String] => UTF8String(c.toString)
+    case (c: String, StringType) => UTF8String.fromString(c)
+    case (c, StringType) =>
+      // If we get here, c is not a string. Call toString on it.
+      UTF8String.fromString(c.toString)
 
     case (c, _) => c
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
index 9028d5e..e75e668 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.{Row, SpecificMutableRow}
 import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.sources._
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Data corresponding to one partition of a JDBCRDD.

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
index 4e07cf3..f16075c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
@@ -28,6 +28,8 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.json.JacksonUtils.nextUntil
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
 
 private[sql] object JacksonParser {
   def apply(
@@ -54,7 +56,7 @@ private[sql] object JacksonParser {
         convertField(factory, parser, schema)
 
       case (VALUE_STRING, StringType) =>
-        UTF8String(parser.getText)
+        UTF8String.fromString(parser.getText)
 
       case (VALUE_STRING, _) if parser.getTextLength < 1 =>
         // guard the non string type
@@ -74,7 +76,7 @@ private[sql] object JacksonParser {
         val generator = factory.createGenerator(writer, JsonEncoding.UTF8)
         generator.copyCurrentStructure(parser)
         generator.close()
-        UTF8String(writer.toByteArray)
+        UTF8String.fromBytes(writer.toByteArray)
 
       case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, FloatType) =>
         parser.getFloatValue
@@ -152,7 +154,8 @@ private[sql] object JacksonParser {
       valueType: DataType): Map[UTF8String, Any] = {
     val builder = Map.newBuilder[UTF8String, Any]
     while (nextUntil(parser, JsonToken.END_OBJECT)) {
-      builder += UTF8String(parser.getCurrentName) -> convertField(factory, parser, valueType)
+      builder +=
+        UTF8String.fromString(parser.getCurrentName) -> convertField(factory, parser, valueType)
     }
 
     builder.result()
@@ -180,7 +183,7 @@ private[sql] object JacksonParser {
       val row = new GenericMutableRow(schema.length)
       for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecords)) {
         require(schema(corruptIndex).dataType == StringType)
-        row.update(corruptIndex, UTF8String(record))
+        row.update(corruptIndex, UTF8String.fromString(record))
       }
 
       Seq(row)

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index fb0d137..e4acf1d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -30,6 +30,8 @@ import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
 
 private[sql] object JsonRDD extends Logging {
 
@@ -317,7 +319,7 @@ private[sql] object JsonRDD extends Logging {
           parsed
         } catch {
           case e: JsonProcessingException =>
-            Map(columnNameOfCorruptRecords -> UTF8String(record)) :: Nil
+            Map(columnNameOfCorruptRecords -> UTF8String.fromString(record)) :: Nil
         }
       }
     })
@@ -409,7 +411,7 @@ private[sql] object JsonRDD extends Logging {
       null
     } else {
       desiredType match {
-        case StringType => UTF8String(toString(value))
+        case StringType => UTF8String.fromString(toString(value))
         case _ if value == null || value == "" => null // guard the non string type
         case IntegerType => value.asInstanceOf[IntegerType.InternalType]
         case LongType => toLong(value)
@@ -423,7 +425,7 @@ private[sql] object JsonRDD extends Logging {
           val map = value.asInstanceOf[Map[String, Any]]
           map.map {
             case (k, v) =>
-              (UTF8String(k), enforceCorrectType(v, valueType))
+              (UTF8String.fromString(k), enforceCorrectType(v, valueType))
           }.map(identity)
         case struct: StructType => asRow(value.asInstanceOf[Map[String, Any]], struct)
         case DateType => toDate(value)

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index ddc5097..ab9f878 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.parquet.CatalystConverter.FieldType
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.parquet.timestamp.NanoTime
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Collection of converters of Parquet types (group and primitive types) that
@@ -222,7 +223,7 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
     updateField(fieldIndex, value.getBytes)
 
   protected[parquet] def updateString(fieldIndex: Int, value: Array[Byte]): Unit =
-    updateField(fieldIndex, UTF8String(value))
+    updateField(fieldIndex, UTF8String.fromBytes(value))
 
   protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit =
     updateField(fieldIndex, readTimestamp(value))
@@ -423,7 +424,7 @@ private[parquet] class CatalystPrimitiveRowConverter(
     current.update(fieldIndex, value.getBytes)
 
   override protected[parquet] def updateString(fieldIndex: Int, value: Array[Byte]): Unit =
-    current.update(fieldIndex, UTF8String(value))
+    current.update(fieldIndex, UTF8String.fromBytes(value))
 
   override protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit =
     current.setLong(fieldIndex, readTimestamp(value))
@@ -719,7 +720,7 @@ private[parquet] class CatalystNativeArrayConverter(
 
   override protected[parquet] def updateString(fieldIndex: Int, value: Array[Byte]): Unit = {
     checkGrowBuffer()
-    buffer(elements) = UTF8String(value).asInstanceOf[NativeType]
+    buffer(elements) = UTF8String.fromBytes(value).asInstanceOf[NativeType]
     elements += 1
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
index 88ae88e..4d659f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
@@ -31,6 +31,7 @@ import org.apache.spark.SparkEnv
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.sources
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 
 private[sql] object ParquetFilters {
   val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index e03dbde..c62c592 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -31,6 +31,7 @@ import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
 import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * A `parquet.io.api.RecordMaterializer` for Rows.

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index c6a4dab..edda3f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -26,9 +26,10 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.types.{StringType, StructType, UTF8String}
+import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.sql.{SaveMode, Strategy, execution, sources}
 import org.apache.spark.util.Utils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * A Strategy for planning scans over data sources defined using the sources API.

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
index 8421e67..6daddfb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
@@ -22,12 +22,14 @@ import java.nio.ByteBuffer
 import com.esotericsoftware.kryo.io.{Input, Output}
 import com.esotericsoftware.kryo.{Kryo, Serializer}
 
+import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
 import org.apache.spark.serializer.KryoRegistrator
 import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
 import org.apache.spark.sql.columnar.ColumnarTestUtils._
 import org.apache.spark.sql.execution.SparkSqlSerializer
 import org.apache.spark.sql.types._
-import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
+import org.apache.spark.unsafe.types.UTF8String
+
 
 class ColumnTypeSuite extends SparkFunSuite with Logging {
   val DEFAULT_BUFFER_SIZE = 512
@@ -66,7 +68,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
     checkActualSize(FLOAT, Float.MaxValue, 4)
     checkActualSize(FIXED_DECIMAL(15, 10), Decimal(0, 15, 10), 8)
     checkActualSize(BOOLEAN, true, 1)
-    checkActualSize(STRING, UTF8String("hello"), 4 + "hello".getBytes("utf-8").length)
+    checkActualSize(STRING, UTF8String.fromString("hello"), 4 + "hello".getBytes("utf-8").length)
     checkActualSize(DATE, 0, 4)
     checkActualSize(TIMESTAMP, 0L, 8)
 
@@ -118,7 +120,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
       val length = buffer.getInt()
       val bytes = new Array[Byte](length)
       buffer.get(bytes)
-      UTF8String(bytes)
+      UTF8String.fromBytes(bytes)
     })
 
   testColumnType[BinaryType.type, Array[Byte]](

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
index c5d3859..1bc7eb3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
@@ -22,7 +22,10 @@ import scala.util.Random
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
-import org.apache.spark.sql.types.{AtomicType, DataType, Decimal, UTF8String}
+import org.apache.spark.sql.types.{AtomicType, DataType, Decimal}
+import org.apache.spark.sql.types.{DataType, Decimal, AtomicType}
+import org.apache.spark.unsafe.types.UTF8String
+
 
 object ColumnarTestUtils {
   def makeNullRow(length: Int): GenericMutableRow = {
@@ -46,7 +49,7 @@ object ColumnarTestUtils {
       case FLOAT => Random.nextFloat()
       case DOUBLE => Random.nextDouble()
       case FIXED_DECIMAL(precision, scale) => Decimal(Random.nextLong() % 100, precision, scale)
-      case STRING => UTF8String(Random.nextString(Random.nextInt(32)))
+      case STRING => UTF8String.fromString(Random.nextString(Random.nextInt(32)))
       case BOOLEAN => Random.nextBoolean()
       case BINARY => randomBytes(Random.nextInt(32))
       case DATE => Random.nextInt()

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index 1f14cba..fd01a87 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.types
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 
 /* Implicit conversions */
 import scala.collection.JavaConversions._
@@ -242,9 +243,9 @@ private[hive] trait HiveInspectors {
   def unwrap(data: Any, oi: ObjectInspector): Any = oi match {
     case coi: ConstantObjectInspector if coi.getWritableConstantValue == null => null
     case poi: WritableConstantStringObjectInspector =>
-      UTF8String(poi.getWritableConstantValue.toString)
+      UTF8String.fromString(poi.getWritableConstantValue.toString)
     case poi: WritableConstantHiveVarcharObjectInspector =>
-      UTF8String(poi.getWritableConstantValue.getHiveVarchar.getValue)
+      UTF8String.fromString(poi.getWritableConstantValue.getHiveVarchar.getValue)
     case poi: WritableConstantHiveDecimalObjectInspector =>
       HiveShim.toCatalystDecimal(
         PrimitiveObjectInspectorFactory.javaHiveDecimalObjectInspector,
@@ -288,13 +289,13 @@ private[hive] trait HiveInspectors {
     case pi: PrimitiveObjectInspector => pi match {
       // We think HiveVarchar is also a String
       case hvoi: HiveVarcharObjectInspector if hvoi.preferWritable() =>
-        UTF8String(hvoi.getPrimitiveWritableObject(data).getHiveVarchar.getValue)
+        UTF8String.fromString(hvoi.getPrimitiveWritableObject(data).getHiveVarchar.getValue)
       case hvoi: HiveVarcharObjectInspector =>
-        UTF8String(hvoi.getPrimitiveJavaObject(data).getValue)
+        UTF8String.fromString(hvoi.getPrimitiveJavaObject(data).getValue)
       case x: StringObjectInspector if x.preferWritable() =>
-        UTF8String(x.getPrimitiveWritableObject(data).toString)
+        UTF8String.fromString(x.getPrimitiveWritableObject(data).toString)
       case x: StringObjectInspector =>
-        UTF8String(x.getPrimitiveJavaObject(data))
+        UTF8String.fromString(x.getPrimitiveJavaObject(data))
       case x: IntObjectInspector if x.preferWritable() => x.get(data)
       case x: BooleanObjectInspector if x.preferWritable() => x.get(data)
       case x: FloatObjectInspector if x.preferWritable() => x.get(data)

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
----------------------------------------------------------------------
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
new file mode 100644
index 0000000..a351680
--- /dev/null
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.types;
+
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+import org.apache.spark.unsafe.PlatformDependent;
+
+/**
+ * A UTF-8 String for internal Spark use.
+ * <p>
+ * A String encoded in UTF-8 as an Array[Byte], which can be used for comparison,
+ * search, see http://en.wikipedia.org/wiki/UTF-8 for details.
+ * <p>
+ * Note: This is not designed for general use cases, should not be used outside SQL.
+ */
+public final class UTF8String implements Comparable<UTF8String>, Serializable {
+
+  @Nullable
+  private byte[] bytes;
+
+  private static int[] bytesOfCodePointInUTF8 = {2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
+    2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
+    3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
+    4, 4, 4, 4, 4, 4, 4, 4,
+    5, 5, 5, 5,
+    6, 6, 6, 6};
+
+  public static UTF8String fromBytes(byte[] bytes) {
+    return (bytes != null) ? new UTF8String().set(bytes) : null;
+  }
+
+  public static UTF8String fromString(String str) {
+    return (str != null) ? new UTF8String().set(str) : null;
+  }
+
+  /**
+   * Updates the UTF8String with String.
+   */
+  public UTF8String set(final String str) {
+    try {
+      bytes = str.getBytes("utf-8");
+    } catch (UnsupportedEncodingException e) {
+      // Turn the exception into unchecked so we can find out about it at runtime, but
+      // don't need to add lots of boilerplate code everywhere.
+      PlatformDependent.throwException(e);
+    }
+    return this;
+  }
+
+  /**
+   * Updates the UTF8String with byte[], which should be encoded in UTF-8.
+   */
+  public UTF8String set(final byte[] bytes) {
+    this.bytes = bytes;
+    return this;
+  }
+
+  /**
+   * Returns the number of bytes for a code point with the first byte as `b`
+   * @param b The first byte of a code point
+   */
+  public int numBytes(final byte b) {
+    final int offset = (b & 0xFF) - 192;
+    return (offset >= 0) ? bytesOfCodePointInUTF8[offset] : 1;
+  }
+
+  /**
+   * Returns the number of code points in it.
+   *
+   * This is only used by Substring() when `start` is negative.
+   */
+  public int length() {
+    int len = 0;
+    for (int i = 0; i < bytes.length; i+= numBytes(bytes[i])) {
+      len += 1;
+    }
+    return len;
+  }
+
+  public byte[] getBytes() {
+    return bytes;
+  }
+
+  /**
+   * Returns a substring of this.
+   * @param start the position of first code point
+   * @param until the position after last code point, exclusive.
+   */
+  public UTF8String substring(final int start, final int until) {
+    if (until <= start || start >= bytes.length) {
+      return UTF8String.fromBytes(new byte[0]);
+    }
+
+    int i = 0;
+    int c = 0;
+    for (; i < bytes.length && c < start; i += numBytes(bytes[i])) {
+      c += 1;
+    }
+
+    int j = i;
+    for (; j < bytes.length && c < until; j += numBytes(bytes[i])) {
+      c += 1;
+    }
+
+    return UTF8String.fromBytes(Arrays.copyOfRange(bytes, i, j));
+  }
+
+  public boolean contains(final UTF8String substring) {
+    final byte[] b = substring.getBytes();
+    if (b.length == 0) {
+      return true;
+    }
+
+    for (int i = 0; i <= bytes.length - b.length; i++) {
+      // TODO: Avoid copying.
+      if (bytes[i] == b[0] && Arrays.equals(Arrays.copyOfRange(bytes, i, i + b.length), b)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public boolean startsWith(final UTF8String prefix) {
+    final byte[] b = prefix.getBytes();
+    // TODO: Avoid copying.
+    return b.length <= bytes.length && Arrays.equals(Arrays.copyOfRange(bytes, 0, b.length), b);
+  }
+
+  public boolean endsWith(final UTF8String suffix) {
+    final byte[] b = suffix.getBytes();
+    return b.length <= bytes.length &&
+      Arrays.equals(Arrays.copyOfRange(bytes, bytes.length - b.length, bytes.length), b);
+  }
+
+  public UTF8String toUpperCase() {
+    return UTF8String.fromString(toString().toUpperCase());
+  }
+
+  public UTF8String toLowerCase() {
+    return UTF8String.fromString(toString().toLowerCase());
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return new String(bytes, "utf-8");
+    } catch (UnsupportedEncodingException e) {
+      // Turn the exception into unchecked so we can find out about it at runtime, but
+      // don't need to add lots of boilerplate code everywhere.
+      PlatformDependent.throwException(e);
+      return "unknown";  // we will never reach here.
+    }
+  }
+
+  @Override
+  public UTF8String clone() {
+    return new UTF8String().set(bytes);
+  }
+
+  @Override
+  public int compareTo(final UTF8String other) {
+    final byte[] b = other.getBytes();
+    for (int i = 0; i < bytes.length && i < b.length; i++) {
+      int res = bytes[i] - b[i];
+      if (res != 0) {
+        return res;
+      }
+    }
+    return bytes.length - b.length;
+  }
+
+  public int compare(final UTF8String other) {
+    return compareTo(other);
+  }
+
+  @Override
+  public boolean equals(final Object other) {
+    if (other instanceof UTF8String) {
+      return Arrays.equals(bytes, ((UTF8String) other).getBytes());
+    } else if (other instanceof String) {
+      // Used only in unit tests.
+      String s = (String) other;
+      return bytes.length >= s.length() && length() == s.length() && toString().equals(s);
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    return Arrays.hashCode(bytes);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/unsafe/src/test/java/org/apache/spark/unsafe/bitset/BitSetSuite.java
----------------------------------------------------------------------
diff --git a/unsafe/src/test/java/org/apache/spark/unsafe/bitset/BitSetSuite.java b/unsafe/src/test/java/org/apache/spark/unsafe/bitset/BitSetSuite.java
index 18393db..a93fc0e 100644
--- a/unsafe/src/test/java/org/apache/spark/unsafe/bitset/BitSetSuite.java
+++ b/unsafe/src/test/java/org/apache/spark/unsafe/bitset/BitSetSuite.java
@@ -18,7 +18,6 @@
 package org.apache.spark.unsafe.bitset;
 
 import junit.framework.Assert;
-import org.apache.spark.unsafe.bitset.BitSet;
 import org.junit.Test;
 
 import org.apache.spark.unsafe.memory.MemoryBlock;

http://git-wip-us.apache.org/repos/asf/spark/blob/7d669a56/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
----------------------------------------------------------------------
diff --git a/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java b/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
new file mode 100644
index 0000000..80c179a
--- /dev/null
+++ b/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
@@ -0,0 +1,93 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.unsafe.types;
+
+import java.io.UnsupportedEncodingException;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+public class UTF8StringSuite {
+
+  private void checkBasic(String str, int len) throws UnsupportedEncodingException {
+    Assert.assertEquals(UTF8String.fromString(str).length(), len);
+    Assert.assertEquals(UTF8String.fromBytes(str.getBytes("utf8")).length(), len);
+
+    Assert.assertEquals(UTF8String.fromString(str), str);
+    Assert.assertEquals(UTF8String.fromBytes(str.getBytes("utf8")), str);
+    Assert.assertEquals(UTF8String.fromString(str).toString(), str);
+    Assert.assertEquals(UTF8String.fromBytes(str.getBytes("utf8")).toString(), str);
+    Assert.assertEquals(UTF8String.fromBytes(str.getBytes("utf8")), UTF8String.fromString(str));
+
+    Assert.assertEquals(UTF8String.fromString(str).hashCode(),
+      UTF8String.fromBytes(str.getBytes("utf8")).hashCode());
+  }
+
+  @Test
+  public void basicTest() throws UnsupportedEncodingException {
+    checkBasic("hello", 5);
+    checkBasic("世 界", 3);
+  }
+
+  @Test
+  public void contains() {
+    Assert.assertTrue(UTF8String.fromString("hello").contains(UTF8String.fromString("ello")));
+    Assert.assertFalse(UTF8String.fromString("hello").contains(UTF8String.fromString("vello")));
+    Assert.assertFalse(UTF8String.fromString("hello").contains(UTF8String.fromString("hellooo")));
+    Assert.assertTrue(UTF8String.fromString("大千世界").contains(UTF8String.fromString("千世")));
+    Assert.assertFalse(UTF8String.fromString("大千世界").contains(UTF8String.fromString("世千")));
+    Assert.assertFalse(
+      UTF8String.fromString("大千世界").contains(UTF8String.fromString("大千世界好")));
+  }
+
+  @Test
+  public void startsWith() {
+    Assert.assertTrue(UTF8String.fromString("hello").startsWith(UTF8String.fromString("hell")));
+    Assert.assertFalse(UTF8String.fromString("hello").startsWith(UTF8String.fromString("ell")));
+    Assert.assertFalse(UTF8String.fromString("hello").startsWith(UTF8String.fromString("hellooo")));
+    Assert.assertTrue(UTF8String.fromString("数据砖头").startsWith(UTF8String.fromString("数据")));
+    Assert.assertFalse(UTF8String.fromString("大千世界").startsWith(UTF8String.fromString("千")));
+    Assert.assertFalse(
+      UTF8String.fromString("大千世界").startsWith(UTF8String.fromString("大千世界好")));
+  }
+
+  @Test
+  public void endsWith() {
+    Assert.assertTrue(UTF8String.fromString("hello").endsWith(UTF8String.fromString("ello")));
+    Assert.assertFalse(UTF8String.fromString("hello").endsWith(UTF8String.fromString("ellov")));
+    Assert.assertFalse(UTF8String.fromString("hello").endsWith(UTF8String.fromString("hhhello")));
+    Assert.assertTrue(UTF8String.fromString("大千世界").endsWith(UTF8String.fromString("世界")));
+    Assert.assertFalse(UTF8String.fromString("大千世界").endsWith(UTF8String.fromString("世")));
+    Assert.assertFalse(
+      UTF8String.fromString("数据砖头").endsWith(UTF8String.fromString("我的数据砖头")));
+  }
+
+  @Test
+  public void substring() {
+    Assert.assertEquals(
+      UTF8String.fromString("hello").substring(0, 0), UTF8String.fromString(""));
+    Assert.assertEquals(
+      UTF8String.fromString("hello").substring(1, 3), UTF8String.fromString("el"));
+    Assert.assertEquals(
+      UTF8String.fromString("数据砖头").substring(0, 1), UTF8String.fromString("数"));
+    Assert.assertEquals(
+      UTF8String.fromString("数据砖头").substring(1, 3), UTF8String.fromString("据砖"));
+    Assert.assertEquals(
+      UTF8String.fromString("数据砖头").substring(3, 5), UTF8String.fromString("头"));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org