You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/10/28 22:27:02 UTC

git commit: [Spark 3922] Refactor spark-core to use Utils.UTF_8

Repository: spark
Updated Branches:
  refs/heads/master 47a40f60d -> abcafcfba


[Spark 3922] Refactor spark-core to use Utils.UTF_8

A global UTF8 constant is very helpful to handle encoding problems when converting between String and bytes. There are several solutions here:

1. Add `val UTF_8 = Charset.forName("UTF-8")` to Utils.scala
2. java.nio.charset.StandardCharsets.UTF_8 (require JDK7)
3. io.netty.util.CharsetUtil.UTF_8
4. com.google.common.base.Charsets.UTF_8
5. org.apache.commons.lang.CharEncoding.UTF_8
6. org.apache.commons.lang3.CharEncoding.UTF_8

IMO, I prefer option 1) because people can find it easily.

This is a PR for option 1) and only fixes Spark Core.

Author: zsxwing <zs...@gmail.com>

Closes #2781 from zsxwing/SPARK-3922 and squashes the following commits:

f974edd [zsxwing] Merge branch 'master' into SPARK-3922
2d27423 [zsxwing] Refactor spark-core to use Refactor spark-core to use Utils.UTF_8


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/abcafcfb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/abcafcfb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/abcafcfb

Branch: refs/heads/master
Commit: abcafcfba38d7c8dba68a5510475c5c49ae54d92
Parents: 47a40f6
Author: zsxwing <zs...@gmail.com>
Authored: Tue Oct 28 14:26:57 2014 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Oct 28 14:26:57 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkSaslClient.scala   |  7 ++++---
 .../main/scala/org/apache/spark/SparkSaslServer.scala   | 10 ++++++----
 .../scala/org/apache/spark/api/python/PythonRDD.scala   |  9 ++++-----
 .../api/python/WriteInputFormatTestDataGenerator.scala  |  5 +++--
 .../org/apache/spark/deploy/worker/DriverRunner.scala   |  4 ++--
 .../org/apache/spark/deploy/worker/ExecutorRunner.scala |  4 ++--
 .../network/netty/client/BlockFetchingClient.scala      |  4 ++--
 .../netty/client/BlockFetchingClientHandler.scala       |  5 +++--
 .../apache/spark/network/netty/server/BlockServer.scala |  4 ++--
 .../netty/server/BlockServerChannelInitializer.scala    |  6 +++---
 .../apache/spark/network/nio/ConnectionManager.scala    |  4 +++-
 .../scala/org/apache/spark/network/nio/Message.scala    |  4 +++-
 .../netty/client/BlockFetchingClientHandlerSuite.scala  |  3 ++-
 .../network/netty/server/BlockHeaderEncoderSuite.scala  |  8 ++++----
 .../scala/org/apache/spark/util/FileAppenderSuite.scala | 12 ++++++------
 .../test/scala/org/apache/spark/util/UtilsSuite.scala   | 12 ++++++------
 16 files changed, 55 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/abcafcfb/core/src/main/scala/org/apache/spark/SparkSaslClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkSaslClient.scala b/core/src/main/scala/org/apache/spark/SparkSaslClient.scala
index 65003b6..a954fcc 100644
--- a/core/src/main/scala/org/apache/spark/SparkSaslClient.scala
+++ b/core/src/main/scala/org/apache/spark/SparkSaslClient.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark
 
-import java.io.IOException
 import javax.security.auth.callback.Callback
 import javax.security.auth.callback.CallbackHandler
 import javax.security.auth.callback.NameCallback
@@ -31,6 +30,8 @@ import javax.security.sasl.SaslException
 
 import scala.collection.JavaConversions.mapAsJavaMap
 
+import com.google.common.base.Charsets.UTF_8
+
 /**
  * Implements SASL Client logic for Spark
  */
@@ -111,10 +112,10 @@ private[spark] class SparkSaslClient(securityMgr: SecurityManager)  extends Logg
     CallbackHandler {
 
     private val userName: String =
-      SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes("utf-8"))
+      SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes(UTF_8))
     private val secretKey = securityMgr.getSecretKey()
     private val userPassword: Array[Char] = SparkSaslServer.encodePassword(
-        if (secretKey != null) secretKey.getBytes("utf-8") else "".getBytes("utf-8"))
+        if (secretKey != null) secretKey.getBytes(UTF_8) else "".getBytes(UTF_8))
 
     /**
      * Implementation used to respond to SASL request from the server.

http://git-wip-us.apache.org/repos/asf/spark/blob/abcafcfb/core/src/main/scala/org/apache/spark/SparkSaslServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkSaslServer.scala b/core/src/main/scala/org/apache/spark/SparkSaslServer.scala
index f6b0a91..7c2afb3 100644
--- a/core/src/main/scala/org/apache/spark/SparkSaslServer.scala
+++ b/core/src/main/scala/org/apache/spark/SparkSaslServer.scala
@@ -28,6 +28,8 @@ import javax.security.sasl.Sasl
 import javax.security.sasl.SaslException
 import javax.security.sasl.SaslServer
 import scala.collection.JavaConversions.mapAsJavaMap
+
+import com.google.common.base.Charsets.UTF_8
 import org.apache.commons.net.util.Base64
 
 /**
@@ -89,7 +91,7 @@ private[spark] class SparkSaslServer(securityMgr: SecurityManager) extends Loggi
     extends CallbackHandler {
 
     private val userName: String =
-      SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes("utf-8"))
+      SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes(UTF_8))
 
     override def handle(callbacks: Array[Callback]) {
       logDebug("In the sasl server callback handler")
@@ -101,7 +103,7 @@ private[spark] class SparkSaslServer(securityMgr: SecurityManager) extends Loggi
         case pc: PasswordCallback => {
           logDebug("handle: SASL server callback: setting userPassword")
           val password: Array[Char] =
-            SparkSaslServer.encodePassword(securityMgr.getSecretKey().getBytes("utf-8"))
+            SparkSaslServer.encodePassword(securityMgr.getSecretKey().getBytes(UTF_8))
           pc.setPassword(password)
         }
         case rc: RealmCallback => {
@@ -159,7 +161,7 @@ private[spark] object SparkSaslServer {
    * @return Base64-encoded string
    */
   def encodeIdentifier(identifier: Array[Byte]): String = {
-    new String(Base64.encodeBase64(identifier), "utf-8")
+    new String(Base64.encodeBase64(identifier), UTF_8)
   }
 
   /**
@@ -168,7 +170,7 @@ private[spark] object SparkSaslServer {
    * @return password as a char array.
    */
   def encodePassword(password: Array[Byte]): Array[Char] = {
-    new String(Base64.encodeBase64(password), "utf-8").toCharArray()
+    new String(Base64.encodeBase64(password), UTF_8).toCharArray()
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/abcafcfb/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 163dca6..61b125e 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -19,7 +19,6 @@ package org.apache.spark.api.python
 
 import java.io._
 import java.net._
-import java.nio.charset.Charset
 import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
 
 import scala.collection.JavaConversions._
@@ -27,6 +26,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.language.existentials
 
+import com.google.common.base.Charsets.UTF_8
 import net.razorvine.pickle.{Pickler, Unpickler}
 
 import org.apache.hadoop.conf.Configuration
@@ -134,7 +134,7 @@ private[spark] class PythonRDD(
               val exLength = stream.readInt()
               val obj = new Array[Byte](exLength)
               stream.readFully(obj)
-              throw new PythonException(new String(obj, "utf-8"),
+              throw new PythonException(new String(obj, UTF_8),
                 writerThread.exception.getOrElse(null))
             case SpecialLengths.END_OF_DATA_SECTION =>
               // We've finished the data section of the output, but we can still
@@ -318,7 +318,6 @@ private object SpecialLengths {
 }
 
 private[spark] object PythonRDD extends Logging {
-  val UTF8 = Charset.forName("UTF-8")
 
   // remember the broadcasts sent to each worker
   private val workerBroadcasts = new mutable.WeakHashMap[Socket, mutable.Set[Long]]()
@@ -586,7 +585,7 @@ private[spark] object PythonRDD extends Logging {
   }
 
   def writeUTF(str: String, dataOut: DataOutputStream) {
-    val bytes = str.getBytes(UTF8)
+    val bytes = str.getBytes(UTF_8)
     dataOut.writeInt(bytes.length)
     dataOut.write(bytes)
   }
@@ -849,7 +848,7 @@ private[spark] object PythonRDD extends Logging {
 
 private
 class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] {
-  override def call(arr: Array[Byte]) : String = new String(arr, PythonRDD.UTF8)
+  override def call(arr: Array[Byte]) : String = new String(arr, UTF_8)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/abcafcfb/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
index d11db97..e9ca916 100644
--- a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.api.python
 
 import java.io.{DataOutput, DataInput}
-import java.nio.charset.Charset
+
+import com.google.common.base.Charsets.UTF_8
 
 import org.apache.hadoop.io._
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
@@ -136,7 +137,7 @@ object WriteInputFormatTestDataGenerator {
     sc.parallelize(intKeys).saveAsSequenceFile(intPath)
     sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath)
     sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath)
-    sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes(Charset.forName("UTF-8"))) }
+    sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes(UTF_8)) }
       ).saveAsSequenceFile(bytesPath)
     val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), (1, false))
     sc.parallelize(bools).saveAsSequenceFile(boolPath)

http://git-wip-us.apache.org/repos/asf/spark/blob/abcafcfb/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 9f99117..3bf0b94 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConversions._
 import scala.collection.Map
 
 import akka.actor.ActorRef
-import com.google.common.base.Charsets
+import com.google.common.base.Charsets.UTF_8
 import com.google.common.io.Files
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileUtil, Path}
@@ -178,7 +178,7 @@ private[spark] class DriverRunner(
       val stderr = new File(baseDir, "stderr")
       val header = "Launch Command: %s\n%s\n\n".format(
         command.mkString("\"", "\" \"", "\""), "=" * 40)
-      Files.append(header, stderr, Charsets.UTF_8)
+      Files.append(header, stderr, UTF_8)
       CommandUtils.redirectStream(process.getErrorStream, stderr)
     }
     runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)

http://git-wip-us.apache.org/repos/asf/spark/blob/abcafcfb/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 71d7385..030a651 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -20,7 +20,7 @@ package org.apache.spark.deploy.worker
 import java.io._
 
 import akka.actor.ActorRef
-import com.google.common.base.Charsets
+import com.google.common.base.Charsets.UTF_8
 import com.google.common.io.Files
 
 import org.apache.spark.{SparkConf, Logging}
@@ -151,7 +151,7 @@ private[spark] class ExecutorRunner(
       stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
 
       val stderr = new File(executorDir, "stderr")
-      Files.write(header, stderr, Charsets.UTF_8)
+      Files.write(header, stderr, UTF_8)
       stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
 
       state = ExecutorState.RUNNING

http://git-wip-us.apache.org/repos/asf/spark/blob/abcafcfb/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala b/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala
index 5aea7ba..3ab13b9 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala
@@ -19,13 +19,13 @@ package org.apache.spark.network.netty.client
 
 import java.util.concurrent.TimeoutException
 
+import com.google.common.base.Charsets.UTF_8
 import io.netty.bootstrap.Bootstrap
 import io.netty.buffer.PooledByteBufAllocator
 import io.netty.channel.socket.SocketChannel
 import io.netty.channel.{ChannelFutureListener, ChannelFuture, ChannelInitializer, ChannelOption}
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder
 import io.netty.handler.codec.string.StringEncoder
-import io.netty.util.CharsetUtil
 
 import org.apache.spark.Logging
 
@@ -61,7 +61,7 @@ class BlockFetchingClient(factory: BlockFetchingClientFactory, hostname: String,
     b.handler(new ChannelInitializer[SocketChannel] {
       override def initChannel(ch: SocketChannel): Unit = {
         ch.pipeline
-          .addLast("encoder", new StringEncoder(CharsetUtil.UTF_8))
+          .addLast("encoder", new StringEncoder(UTF_8))
           // maxFrameLength = 2G, lengthFieldOffset = 0, lengthFieldLength = 4
           .addLast("framedLengthDecoder", new LengthFieldBasedFrameDecoder(Int.MaxValue, 0, 4))
           .addLast("handler", handler)

http://git-wip-us.apache.org/repos/asf/spark/blob/abcafcfb/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala b/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala
index 83265b1..d9d3f7b 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.network.netty.client
 
+import com.google.common.base.Charsets.UTF_8
 import io.netty.buffer.ByteBuf
 import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
 
@@ -67,7 +68,7 @@ class BlockFetchingClientHandler extends SimpleChannelInboundHandler[ByteBuf] wi
     val blockIdLen = in.readInt()
     val blockIdBytes = new Array[Byte](math.abs(blockIdLen))
     in.readBytes(blockIdBytes)
-    val blockId = new String(blockIdBytes)
+    val blockId = new String(blockIdBytes, UTF_8)
     val blockSize = totalLen - math.abs(blockIdLen) - 4
 
     def server = ctx.channel.remoteAddress.toString
@@ -76,7 +77,7 @@ class BlockFetchingClientHandler extends SimpleChannelInboundHandler[ByteBuf] wi
     if (blockIdLen < 0) {
       val errorMessageBytes = new Array[Byte](blockSize)
       in.readBytes(errorMessageBytes)
-      val errorMsg = new String(errorMessageBytes)
+      val errorMsg = new String(errorMessageBytes, UTF_8)
       logTrace(s"Received block $blockId ($blockSize B) with error $errorMsg from $server")
 
       val listener = outstandingRequests.get(blockId)

http://git-wip-us.apache.org/repos/asf/spark/blob/abcafcfb/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala b/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala
index 7b2f9a8..9194c7c 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala
@@ -19,6 +19,7 @@ package org.apache.spark.network.netty.server
 
 import java.net.InetSocketAddress
 
+import com.google.common.base.Charsets.UTF_8
 import io.netty.bootstrap.ServerBootstrap
 import io.netty.buffer.PooledByteBufAllocator
 import io.netty.channel.{ChannelFuture, ChannelInitializer, ChannelOption}
@@ -30,7 +31,6 @@ import io.netty.channel.socket.nio.NioServerSocketChannel
 import io.netty.channel.socket.oio.OioServerSocketChannel
 import io.netty.handler.codec.LineBasedFrameDecoder
 import io.netty.handler.codec.string.StringDecoder
-import io.netty.util.CharsetUtil
 
 import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.network.netty.NettyConfig
@@ -131,7 +131,7 @@ class BlockServer(conf: NettyConfig, dataProvider: BlockDataProvider) extends Lo
       override def initChannel(ch: SocketChannel): Unit = {
         ch.pipeline
           .addLast("frameDecoder", new LineBasedFrameDecoder(1024))  // max block id length 1024
-          .addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8))
+          .addLast("stringDecoder", new StringDecoder(UTF_8))
           .addLast("blockHeaderEncoder", new BlockHeaderEncoder)
           .addLast("handler", new BlockServerHandler(dataProvider))
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/abcafcfb/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala b/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala
index cc70bd0..188154d 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala
@@ -17,13 +17,13 @@
 
 package org.apache.spark.network.netty.server
 
+import com.google.common.base.Charsets.UTF_8
 import io.netty.channel.ChannelInitializer
 import io.netty.channel.socket.SocketChannel
 import io.netty.handler.codec.LineBasedFrameDecoder
 import io.netty.handler.codec.string.StringDecoder
-import io.netty.util.CharsetUtil
-import org.apache.spark.storage.BlockDataProvider
 
+import org.apache.spark.storage.BlockDataProvider
 
 /** Channel initializer that sets up the pipeline for the BlockServer. */
 private[netty]
@@ -33,7 +33,7 @@ class BlockServerChannelInitializer(dataProvider: BlockDataProvider)
   override def initChannel(ch: SocketChannel): Unit = {
     ch.pipeline
       .addLast("frameDecoder", new LineBasedFrameDecoder(1024))  // max block id length 1024
-      .addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8))
+      .addLast("stringDecoder", new StringDecoder(UTF_8))
       .addLast("blockHeaderEncoder", new BlockHeaderEncoder)
       .addLast("handler", new BlockServerHandler(dataProvider))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/abcafcfb/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index bda4bf5..8408b75 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -31,6 +31,8 @@ import scala.concurrent.duration._
 import scala.concurrent.{Await, ExecutionContext, Future, Promise}
 import scala.language.postfixOps
 
+import com.google.common.base.Charsets.UTF_8
+
 import org.apache.spark._
 import org.apache.spark.util.Utils
 
@@ -923,7 +925,7 @@ private[nio] class ConnectionManager(
             val errorMsgByteBuf = ackMessage.asInstanceOf[BufferMessage].buffers.head
             val errorMsgBytes = new Array[Byte](errorMsgByteBuf.limit())
             errorMsgByteBuf.get(errorMsgBytes)
-            val errorMsg = new String(errorMsgBytes, "utf-8")
+            val errorMsg = new String(errorMsgBytes, UTF_8)
             val e = new IOException(
               s"sendMessageReliably failed with ACK that signalled a remote error: $errorMsg")
             if (!promise.tryFailure(e)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/abcafcfb/core/src/main/scala/org/apache/spark/network/nio/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/nio/Message.scala b/core/src/main/scala/org/apache/spark/network/nio/Message.scala
index 3ad0459..fb4a979 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/Message.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/Message.scala
@@ -22,6 +22,8 @@ import java.nio.ByteBuffer
 
 import scala.collection.mutable.ArrayBuffer
 
+import com.google.common.base.Charsets.UTF_8
+
 import org.apache.spark.util.Utils
 
 private[nio] abstract class Message(val typ: Long, val id: Int) {
@@ -92,7 +94,7 @@ private[nio] object Message {
    */
   def createErrorMessage(exception: Exception, ackId: Int): BufferMessage = {
     val exceptionString = Utils.exceptionString(exception)
-    val serializedExceptionString = ByteBuffer.wrap(exceptionString.getBytes("utf-8"))
+    val serializedExceptionString = ByteBuffer.wrap(exceptionString.getBytes(UTF_8))
     val errorMessage = createBufferMessage(serializedExceptionString, ackId)
     errorMessage.hasError = true
     errorMessage

http://git-wip-us.apache.org/repos/asf/spark/blob/abcafcfb/core/src/test/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandlerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandlerSuite.scala b/core/src/test/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandlerSuite.scala
index 903ab09..f629322 100644
--- a/core/src/test/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandlerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandlerSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.network.netty.client
 
 import java.nio.ByteBuffer
 
+import com.google.common.base.Charsets.UTF_8
 import io.netty.buffer.Unpooled
 import io.netty.channel.embedded.EmbeddedChannel
 
@@ -42,7 +43,7 @@ class BlockFetchingClientHandlerSuite extends FunSuite with PrivateMethodTester
           parsedBlockId = bid
           val bytes = new Array[Byte](refCntBuf.byteBuffer().remaining)
           refCntBuf.byteBuffer().get(bytes)
-          parsedBlockData = new String(bytes)
+          parsedBlockData = new String(bytes, UTF_8)
         }
       }
     )

http://git-wip-us.apache.org/repos/asf/spark/blob/abcafcfb/core/src/test/scala/org/apache/spark/network/netty/server/BlockHeaderEncoderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/network/netty/server/BlockHeaderEncoderSuite.scala b/core/src/test/scala/org/apache/spark/network/netty/server/BlockHeaderEncoderSuite.scala
index 3ee281c..3f8d0cf 100644
--- a/core/src/test/scala/org/apache/spark/network/netty/server/BlockHeaderEncoderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/netty/server/BlockHeaderEncoderSuite.scala
@@ -17,12 +17,12 @@
 
 package org.apache.spark.network.netty.server
 
+import com.google.common.base.Charsets.UTF_8
 import io.netty.buffer.ByteBuf
 import io.netty.channel.embedded.EmbeddedChannel
 
 import org.scalatest.FunSuite
 
-
 class BlockHeaderEncoderSuite extends FunSuite {
 
   test("encode normal block data") {
@@ -35,7 +35,7 @@ class BlockHeaderEncoderSuite extends FunSuite {
 
     val blockIdBytes = new Array[Byte](blockId.length)
     out.readBytes(blockIdBytes)
-    assert(new String(blockIdBytes) === blockId)
+    assert(new String(blockIdBytes, UTF_8) === blockId)
     assert(out.readableBytes() === 0)
 
     channel.close()
@@ -52,11 +52,11 @@ class BlockHeaderEncoderSuite extends FunSuite {
 
     val blockIdBytes = new Array[Byte](blockId.length)
     out.readBytes(blockIdBytes)
-    assert(new String(blockIdBytes) === blockId)
+    assert(new String(blockIdBytes, UTF_8) === blockId)
 
     val errorMsgBytes = new Array[Byte](errorMsg.length)
     out.readBytes(errorMsgBytes)
-    assert(new String(errorMsgBytes) === errorMsg)
+    assert(new String(errorMsgBytes, UTF_8) === errorMsg)
     assert(out.readableBytes() === 0)
 
     channel.close()

http://git-wip-us.apache.org/repos/asf/spark/blob/abcafcfb/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
index d2bee44..4dc5b61 100644
--- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -18,13 +18,13 @@
 package org.apache.spark.util
 
 import java.io._
-import java.nio.charset.Charset
 
 import scala.collection.mutable.HashSet
 import scala.reflect._
 
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
+import com.google.common.base.Charsets.UTF_8
 import com.google.common.io.Files
 
 import org.apache.spark.{Logging, SparkConf}
@@ -44,11 +44,11 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
 
   test("basic file appender") {
     val testString = (1 to 1000).mkString(", ")
-    val inputStream = new ByteArrayInputStream(testString.getBytes(Charset.forName("UTF-8")))
+    val inputStream = new ByteArrayInputStream(testString.getBytes(UTF_8))
     val appender = new FileAppender(inputStream, testFile)
     inputStream.close()
     appender.awaitTermination()
-    assert(Files.toString(testFile, Charset.forName("UTF-8")) === testString)
+    assert(Files.toString(testFile, UTF_8) === testString)
   }
 
   test("rolling file appender - time-based rolling") {
@@ -96,7 +96,7 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
     val allGeneratedFiles = new HashSet[String]()
     val items = (1 to 10).map { _.toString * 10000 }
     for (i <- 0 until items.size) {
-      testOutputStream.write(items(i).getBytes(Charset.forName("UTF-8")))
+      testOutputStream.write(items(i).getBytes(UTF_8))
       testOutputStream.flush()
       allGeneratedFiles ++= RollingFileAppender.getSortedRolledOverFiles(
         testFile.getParentFile.toString, testFile.getName).map(_.toString)
@@ -199,7 +199,7 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
     // send data to appender through the input stream, and wait for the data to be written
     val expectedText = textToAppend.mkString("")
     for (i <- 0 until textToAppend.size) {
-      outputStream.write(textToAppend(i).getBytes(Charset.forName("UTF-8")))
+      outputStream.write(textToAppend(i).getBytes(UTF_8))
       outputStream.flush()
       Thread.sleep(sleepTimeBetweenTexts)
     }
@@ -214,7 +214,7 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
     logInfo("Filtered files: \n" + generatedFiles.mkString("\n"))
     assert(generatedFiles.size > 1)
     val allText = generatedFiles.map { file =>
-      Files.toString(file, Charset.forName("UTF-8"))
+      Files.toString(file, UTF_8)
     }.mkString("")
     assert(allText === expectedText)
     generatedFiles

http://git-wip-us.apache.org/repos/asf/spark/blob/abcafcfb/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index ea7ef05..65579bb 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -23,7 +23,7 @@ import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStr
 import java.net.{BindException, ServerSocket, URI}
 import java.nio.{ByteBuffer, ByteOrder}
 
-import com.google.common.base.Charsets
+import com.google.common.base.Charsets.UTF_8
 import com.google.common.io.Files
 import org.scalatest.FunSuite
 
@@ -118,7 +118,7 @@ class UtilsSuite extends FunSuite {
     tmpDir2.deleteOnExit()
     val f1Path = tmpDir2 + "/f1"
     val f1 = new FileOutputStream(f1Path)
-    f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(Charsets.UTF_8))
+    f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(UTF_8))
     f1.close()
 
     // Read first few bytes
@@ -146,9 +146,9 @@ class UtilsSuite extends FunSuite {
     val tmpDir = Utils.createTempDir()
     tmpDir.deleteOnExit()
     val files = (1 to 3).map(i => new File(tmpDir, i.toString))
-    Files.write("0123456789", files(0), Charsets.UTF_8)
-    Files.write("abcdefghij", files(1), Charsets.UTF_8)
-    Files.write("ABCDEFGHIJ", files(2), Charsets.UTF_8)
+    Files.write("0123456789", files(0), UTF_8)
+    Files.write("abcdefghij", files(1), UTF_8)
+    Files.write("ABCDEFGHIJ", files(2), UTF_8)
 
     // Read first few bytes in the 1st file
     assert(Utils.offsetBytes(files, 0, 5) === "01234")
@@ -339,7 +339,7 @@ class UtilsSuite extends FunSuite {
     try {
       System.setProperty("spark.test.fileNameLoadB", "2")
       Files.write("spark.test.fileNameLoadA true\n" +
-        "spark.test.fileNameLoadB 1\n", outFile, Charsets.UTF_8)
+        "spark.test.fileNameLoadB 1\n", outFile, UTF_8)
       val properties = Utils.getPropertiesFromFile(outFile.getAbsolutePath)
       properties
         .filter { case (k, v) => k.startsWith("spark.")}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org