You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/03/14 05:03:54 UTC
[2/2] spark git commit: [SPARK-13823][CORE][STREAMING][SQL] Always
specify Charset in String <-> byte[] conversions (and remaining Coverity
items)
[SPARK-13823][CORE][STREAMING][SQL] Always specify Charset in String <-> byte[] conversions (and remaining Coverity items)
## What changes were proposed in this pull request?
- Fixes calls to `new String(byte[])` or `String.getBytes()` that rely on platform default encoding, to use UTF-8
- Same for `InputStreamReader` and `OutputStreamWriter` constructors
- Standardizes on UTF-8 everywhere
- Standardizes specifying the encoding with `StandardCharsets.UTF-8`, not the Guava constant or "UTF-8" (which means handling `UnuspportedEncodingException`)
- (also addresses the other remaining Coverity scan issues, which are pretty trivial; these are separated into commit https://github.com/srowen/spark/commit/1deecd8d9ca986d8adb1a42d315890ce5349d29c )
## How was this patch tested?
Jenkins tests
Author: Sean Owen <so...@cloudera.com>
Closes #11657 from srowen/SPARK-13823.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/18408528
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/18408528
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/18408528
Branch: refs/heads/master
Commit: 184085284185011d7cc6d054b54d2d38eaf1dd77
Parents: 473263f
Author: Sean Owen <so...@cloudera.com>
Authored: Sun Mar 13 21:03:49 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sun Mar 13 21:03:49 2016 -0700
----------------------------------------------------------------------
.../spark/network/client/StreamInterceptor.java | 3 +-
.../apache/spark/network/protocol/Encoders.java | 8 ++---
.../spark/network/sasl/SparkSaslServer.java | 10 +++---
.../apache/spark/network/util/JavaUtils.java | 6 ++--
.../shuffle/ExternalShuffleBlockResolver.java | 12 +++----
.../ExternalShuffleBlockResolverSuite.java | 36 ++++++++++++--------
.../shuffle/ExternalShuffleCleanupSuite.java | 21 +++++++-----
.../apache/spark/unsafe/types/UTF8String.java | 10 ++----
.../org/apache/spark/api/python/PythonRDD.scala | 8 ++---
.../spark/api/python/PythonWorkerFactory.scala | 3 +-
.../WriteInputFormatTestDataGenerator.scala | 4 +--
.../scala/org/apache/spark/api/r/SerDe.scala | 3 +-
.../spark/deploy/FaultToleranceTest.scala | 4 ++-
.../spark/deploy/SparkSubmitArguments.scala | 3 +-
.../deploy/rest/RestSubmissionClient.scala | 4 +--
.../spark/deploy/worker/DriverRunner.scala | 4 +--
.../spark/deploy/worker/ExecutorRunner.scala | 4 +--
.../spark/scheduler/EventLoggingListener.scala | 4 +--
.../serializer/GenericAvroSerializer.scala | 3 +-
.../scala/org/apache/spark/util/Utils.scala | 5 +--
.../java/org/apache/spark/JavaAPISuite.java | 4 +--
.../sort/ShuffleInMemorySorterSuite.java | 3 +-
.../unsafe/sort/UnsafeExternalSorterSuite.java | 2 --
.../unsafe/sort/UnsafeInMemorySorterSuite.java | 3 +-
.../org/apache/spark/SparkContextSuite.scala | 17 ++++-----
.../spark/api/python/PythonRDDSuite.scala | 11 +++---
.../apache/spark/deploy/SparkSubmitSuite.scala | 6 ++--
.../deploy/history/FsHistoryProviderSuite.scala | 9 ++---
.../deploy/history/HistoryServerSuite.scala | 6 ++--
.../deploy/rest/StandaloneRestSubmitSuite.scala | 4 +--
.../netty/NettyBlockTransferSecuritySuite.scala | 7 ++--
.../apache/spark/util/FileAppenderSuite.scala | 12 +++----
.../org/apache/spark/util/UtilsSuite.scala | 16 ++++-----
docs/streaming-custom-receivers.md | 6 ++--
.../examples/streaming/JavaCustomReceiver.java | 4 ++-
.../examples/streaming/CustomReceiver.scala | 4 ++-
.../streaming/flume/sink/SparkSinkSuite.scala | 4 ++-
.../spark/streaming/flume/FlumeTestUtils.scala | 4 +--
.../streaming/flume/PollingFlumeTestUtils.scala | 5 +--
.../spark/streaming/kafka/KafkaUtils.scala | 4 +--
.../streaming/kinesis/KinesisTestUtils.scala | 3 +-
.../kinesis/KPLBasedKinesisTestUtils.scala | 3 +-
.../spark/streaming/mqtt/MQTTInputDStream.scala | 4 ++-
.../spark/streaming/mqtt/MQTTTestUtils.scala | 4 +--
.../apache/spark/graphx/GraphLoaderSuite.scala | 3 +-
.../spark/launcher/AbstractCommandBuilder.java | 5 +--
.../apache/spark/launcher/OutputRedirector.java | 3 +-
.../SparkSubmitCommandBuilderSuite.java | 6 +---
.../spark/mllib/api/python/PythonMLLibAPI.scala | 3 +-
.../source/libsvm/JavaLibSVMRelationSuite.java | 4 +--
.../ml/source/libsvm/LibSVMRelationSuite.scala | 6 ++--
.../apache/spark/mllib/util/MLUtilsSuite.scala | 8 ++---
.../spark/sql/catalyst/parser/ParseUtils.java | 4 ++-
.../sql/catalyst/expressions/literals.scala | 3 +-
.../spark/sql/catalyst/util/package.scala | 3 +-
.../expressions/LiteralExpressionSuite.scala | 4 ++-
.../expressions/MathFunctionsSuite.scala | 6 ++--
.../expressions/MiscFunctionsSuite.scala | 19 +++++++----
.../expressions/UnsafeRowConverterSuite.scala | 14 ++++----
.../codegen/GeneratedProjectionSuite.scala | 5 ++-
.../execution/vectorized/ColumnVectorUtils.java | 3 +-
.../sql/execution/vectorized/ColumnarBatch.java | 4 +++
.../execution/datasources/csv/CSVOptions.scala | 4 +--
.../execution/datasources/csv/CSVParser.scala | 3 +-
.../datasources/csv/DefaultSource.scala | 4 +--
.../execution/streaming/FileStreamSource.scala | 4 +--
.../spark/sql/sources/JavaSaveLoadSuite.java | 2 --
.../spark/sql/DataFrameFunctionsSuite.scala | 6 ++--
.../org/apache/spark/sql/DataFrameSuite.scala | 5 +--
.../apache/spark/sql/MathExpressionsSuite.scala | 6 ++--
.../CompressionSchemeBenchmark.scala | 3 +-
.../vectorized/ColumnarBatchBenchmark.scala | 4 ++-
.../vectorized/ColumnarBatchSuite.scala | 21 +++++++-----
.../sql/streaming/FileStreamSourceSuite.scala | 6 ++--
.../org/apache/spark/sql/test/SQLTestData.scala | 12 ++++---
.../spark/sql/hive/thriftserver/CliSuite.scala | 3 +-
.../thriftserver/HiveThriftServer2Suites.scala | 4 +--
.../org/apache/spark/sql/hive/HiveContext.scala | 3 +-
.../hive/execution/ScriptTransformation.scala | 3 +-
.../sql/hive/JavaMetastoreDataSourcesSuite.java | 2 --
.../spark/sql/hive/orc/OrcQuerySuite.scala | 3 +-
.../streaming/dstream/SocketInputDStream.scala | 4 ++-
.../apache/spark/streaming/JavaAPISuite.java | 7 ++--
.../spark/streaming/JavaReceiverAPISuite.java | 4 ++-
.../spark/streaming/CheckpointSuite.scala | 4 +--
.../spark/streaming/InputStreamsSuite.scala | 10 +++---
.../spark/streaming/MasterFailureTest.scala | 4 +--
.../org/apache/spark/deploy/yarn/Client.scala | 4 +--
.../deploy/yarn/BaseYarnClusterSuite.scala | 8 ++---
.../spark/deploy/yarn/YarnClusterSuite.scala | 12 +++----
.../yarn/YarnShuffleIntegrationSuite.scala | 4 +--
.../deploy/yarn/YarnSparkHadoopUtilSuite.scala | 3 +-
92 files changed, 321 insertions(+), 244 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java b/common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java
index 88ba3cc..b0e85ba 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java
@@ -34,8 +34,7 @@ class StreamInterceptor implements TransportFrameDecoder.Interceptor {
private final String streamId;
private final long byteCount;
private final StreamCallback callback;
-
- private volatile long bytesRead;
+ private long bytesRead;
StreamInterceptor(
TransportResponseHandler handler,
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
index 9162d0b..be21752 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
@@ -17,8 +17,8 @@
package org.apache.spark.network.protocol;
+import java.nio.charset.StandardCharsets;
-import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
/** Provides a canonical set of Encoders for simple types. */
@@ -27,11 +27,11 @@ public class Encoders {
/** Strings are encoded with their length followed by UTF-8 bytes. */
public static class Strings {
public static int encodedLength(String s) {
- return 4 + s.getBytes(Charsets.UTF_8).length;
+ return 4 + s.getBytes(StandardCharsets.UTF_8).length;
}
public static void encode(ByteBuf buf, String s) {
- byte[] bytes = s.getBytes(Charsets.UTF_8);
+ byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
buf.writeInt(bytes.length);
buf.writeBytes(bytes);
}
@@ -40,7 +40,7 @@ public class Encoders {
int length = buf.readInt();
byte[] bytes = new byte[length];
buf.readBytes(bytes);
- return new String(bytes, Charsets.UTF_8);
+ return new String(bytes, StandardCharsets.UTF_8);
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java b/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java
index 431cb67..b802a5a 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java
@@ -28,9 +28,9 @@ import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.Map;
-import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
@@ -187,14 +187,14 @@ public class SparkSaslServer implements SaslEncryptionBackend {
/* Encode a byte[] identifier as a Base64-encoded string. */
public static String encodeIdentifier(String identifier) {
Preconditions.checkNotNull(identifier, "User cannot be null if SASL is enabled");
- return Base64.encode(Unpooled.wrappedBuffer(identifier.getBytes(Charsets.UTF_8)))
- .toString(Charsets.UTF_8);
+ return Base64.encode(Unpooled.wrappedBuffer(identifier.getBytes(StandardCharsets.UTF_8)))
+ .toString(StandardCharsets.UTF_8);
}
/** Encode a password as a base64-encoded char[] array. */
public static char[] encodePassword(String password) {
Preconditions.checkNotNull(password, "Password cannot be null if SASL is enabled");
- return Base64.encode(Unpooled.wrappedBuffer(password.getBytes(Charsets.UTF_8)))
- .toString(Charsets.UTF_8).toCharArray();
+ return Base64.encode(Unpooled.wrappedBuffer(password.getBytes(StandardCharsets.UTF_8)))
+ .toString(StandardCharsets.UTF_8).toCharArray();
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java
index ccc5273..8d83ae0 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java
@@ -21,11 +21,11 @@ import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.netty.buffer.Unpooled;
@@ -68,7 +68,7 @@ public class JavaUtils {
* converted back to the same string through {@link #bytesToString(ByteBuffer)}.
*/
public static ByteBuffer stringToBytes(String s) {
- return Unpooled.wrappedBuffer(s.getBytes(Charsets.UTF_8)).nioBuffer();
+ return Unpooled.wrappedBuffer(s.getBytes(StandardCharsets.UTF_8)).nioBuffer();
}
/**
@@ -76,7 +76,7 @@ public class JavaUtils {
* converted back to the same byte buffer through {@link #stringToBytes(String)}.
*/
public static String bytesToString(ByteBuffer b) {
- return Unpooled.wrappedBuffer(b).toString(Charsets.UTF_8);
+ return Unpooled.wrappedBuffer(b).toString(StandardCharsets.UTF_8);
}
/*
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index fe933ed..460110d 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -18,6 +18,7 @@
package org.apache.spark.network.shuffle;
import java.io.*;
+import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
@@ -27,7 +28,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
import com.google.common.base.Objects;
import com.google.common.collect.Maps;
import org.fusesource.leveldbjni.JniDBFactory;
@@ -152,7 +152,7 @@ public class ExternalShuffleBlockResolver {
try {
if (db != null) {
byte[] key = dbAppExecKey(fullId);
- byte[] value = mapper.writeValueAsString(executorInfo).getBytes(Charsets.UTF_8);
+ byte[] value = mapper.writeValueAsString(executorInfo).getBytes(StandardCharsets.UTF_8);
db.put(key, value);
}
} catch (Exception e) {
@@ -350,7 +350,7 @@ public class ExternalShuffleBlockResolver {
// we stick a common prefix on all the keys so we can find them in the DB
String appExecJson = mapper.writeValueAsString(appExecId);
String key = (APP_KEY_PREFIX + ";" + appExecJson);
- return key.getBytes(Charsets.UTF_8);
+ return key.getBytes(StandardCharsets.UTF_8);
}
private static AppExecId parseDbAppExecKey(String s) throws IOException {
@@ -368,10 +368,10 @@ public class ExternalShuffleBlockResolver {
ConcurrentMap<AppExecId, ExecutorShuffleInfo> registeredExecutors = Maps.newConcurrentMap();
if (db != null) {
DBIterator itr = db.iterator();
- itr.seek(APP_KEY_PREFIX.getBytes(Charsets.UTF_8));
+ itr.seek(APP_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> e = itr.next();
- String key = new String(e.getKey(), Charsets.UTF_8);
+ String key = new String(e.getKey(), StandardCharsets.UTF_8);
if (!key.startsWith(APP_KEY_PREFIX)) {
break;
}
@@ -418,7 +418,7 @@ public class ExternalShuffleBlockResolver {
public static class StoreVersion {
- static final byte[] KEY = "StoreVersion".getBytes(Charsets.UTF_8);
+ static final byte[] KEY = "StoreVersion".getBytes(StandardCharsets.UTF_8);
public final int major;
public final int minor;
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
index 60a1b8b..d9b5f02 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
@@ -20,6 +20,7 @@ package org.apache.spark.network.shuffle;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.CharStreams;
@@ -34,15 +35,16 @@ import org.junit.Test;
import static org.junit.Assert.*;
public class ExternalShuffleBlockResolverSuite {
- static String sortBlock0 = "Hello!";
- static String sortBlock1 = "World!";
+ private static final String sortBlock0 = "Hello!";
+ private static final String sortBlock1 = "World!";
- static String hashBlock0 = "Elementary";
- static String hashBlock1 = "Tabular";
+ private static final String hashBlock0 = "Elementary";
+ private static final String hashBlock1 = "Tabular";
- static TestShuffleDataContext dataContext;
+ private static TestShuffleDataContext dataContext;
- static TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
+ private static final TransportConf conf =
+ new TransportConf("shuffle", new SystemPropertyConfigProvider());
@BeforeClass
public static void beforeAll() throws IOException {
@@ -50,10 +52,12 @@ public class ExternalShuffleBlockResolverSuite {
dataContext.create();
// Write some sort and hash data.
- dataContext.insertSortShuffleData(0, 0,
- new byte[][] { sortBlock0.getBytes(), sortBlock1.getBytes() } );
- dataContext.insertHashShuffleData(1, 0,
- new byte[][] { hashBlock0.getBytes(), hashBlock1.getBytes() } );
+ dataContext.insertSortShuffleData(0, 0, new byte[][] {
+ sortBlock0.getBytes(StandardCharsets.UTF_8),
+ sortBlock1.getBytes(StandardCharsets.UTF_8)});
+ dataContext.insertHashShuffleData(1, 0, new byte[][] {
+ hashBlock0.getBytes(StandardCharsets.UTF_8),
+ hashBlock1.getBytes(StandardCharsets.UTF_8)});
}
@AfterClass
@@ -100,13 +104,15 @@ public class ExternalShuffleBlockResolverSuite {
InputStream block0Stream =
resolver.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream();
- String block0 = CharStreams.toString(new InputStreamReader(block0Stream));
+ String block0 = CharStreams.toString(
+ new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
block0Stream.close();
assertEquals(sortBlock0, block0);
InputStream block1Stream =
resolver.getBlockData("app0", "exec0", "shuffle_0_0_1").createInputStream();
- String block1 = CharStreams.toString(new InputStreamReader(block1Stream));
+ String block1 = CharStreams.toString(
+ new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
block1Stream.close();
assertEquals(sortBlock1, block1);
}
@@ -119,13 +125,15 @@ public class ExternalShuffleBlockResolverSuite {
InputStream block0Stream =
resolver.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream();
- String block0 = CharStreams.toString(new InputStreamReader(block0Stream));
+ String block0 = CharStreams.toString(
+ new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
block0Stream.close();
assertEquals(hashBlock0, block0);
InputStream block1Stream =
resolver.getBlockData("app0", "exec0", "shuffle_1_0_1").createInputStream();
- String block1 = CharStreams.toString(new InputStreamReader(block1Stream));
+ String block1 = CharStreams.toString(
+ new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
block1Stream.close();
assertEquals(hashBlock1, block1);
}
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
index 532d7ab..43d0201 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -19,6 +19,7 @@ package org.apache.spark.network.shuffle;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -34,8 +35,8 @@ import org.apache.spark.network.util.TransportConf;
public class ExternalShuffleCleanupSuite {
// Same-thread Executor used to ensure cleanup happens synchronously in test thread.
- Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
- TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
+ private Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
+ private TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
@Test
public void noCleanupAndCleanup() throws IOException {
@@ -123,27 +124,29 @@ public class ExternalShuffleCleanupSuite {
assertCleanedUp(dataContext1);
}
- private void assertStillThere(TestShuffleDataContext dataContext) {
+ private static void assertStillThere(TestShuffleDataContext dataContext) {
for (String localDir : dataContext.localDirs) {
assertTrue(localDir + " was cleaned up prematurely", new File(localDir).exists());
}
}
- private void assertCleanedUp(TestShuffleDataContext dataContext) {
+ private static void assertCleanedUp(TestShuffleDataContext dataContext) {
for (String localDir : dataContext.localDirs) {
assertFalse(localDir + " wasn't cleaned up", new File(localDir).exists());
}
}
- private TestShuffleDataContext createSomeData() throws IOException {
+ private static TestShuffleDataContext createSomeData() throws IOException {
Random rand = new Random(123);
TestShuffleDataContext dataContext = new TestShuffleDataContext(10, 5);
dataContext.create();
- dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000),
- new byte[][] { "ABC".getBytes(), "DEF".getBytes() } );
- dataContext.insertHashShuffleData(rand.nextInt(1000), rand.nextInt(1000) + 1000,
- new byte[][] { "GHI".getBytes(), "JKLMNOPQRSTUVWXYZ".getBytes() } );
+ dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000), new byte[][] {
+ "ABC".getBytes(StandardCharsets.UTF_8),
+ "DEF".getBytes(StandardCharsets.UTF_8)});
+ dataContext.insertHashShuffleData(rand.nextInt(1000), rand.nextInt(1000) + 1000, new byte[][] {
+ "GHI".getBytes(StandardCharsets.UTF_8),
+ "JKLMNOPQRSTUVWXYZ".getBytes(StandardCharsets.UTF_8)});
return dataContext;
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
index 427a831..e16166a 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
@@ -21,6 +21,7 @@ import javax.annotation.Nonnull;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Map;
@@ -825,14 +826,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
@Override
public String toString() {
- try {
- return new String(getBytes(), "utf-8");
- } catch (UnsupportedEncodingException e) {
- // Turn the exception into unchecked so we can find out about it at runtime, but
- // don't need to add lots of boilerplate code everywhere.
- throwException(e);
- return "unknown"; // we will never reach here.
- }
+ return new String(getBytes(), StandardCharsets.UTF_8);
}
@Override
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 05d1c31..8f30677 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -19,6 +19,7 @@ package org.apache.spark.api.python
import java.io._
import java.net._
+import java.nio.charset.StandardCharsets
import java.util.{ArrayList => JArrayList, Collections, List => JList, Map => JMap}
import scala.collection.JavaConverters._
@@ -26,7 +27,6 @@ import scala.collection.mutable
import scala.language.existentials
import scala.util.control.NonFatal
-import com.google.common.base.Charsets.UTF_8
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{InputFormat, JobConf, OutputFormat}
@@ -165,7 +165,7 @@ private[spark] class PythonRunner(
val exLength = stream.readInt()
val obj = new Array[Byte](exLength)
stream.readFully(obj)
- throw new PythonException(new String(obj, UTF_8),
+ throw new PythonException(new String(obj, StandardCharsets.UTF_8),
writerThread.exception.getOrElse(null))
case SpecialLengths.END_OF_DATA_SECTION =>
// We've finished the data section of the output, but we can still
@@ -624,7 +624,7 @@ private[spark] object PythonRDD extends Logging {
}
def writeUTF(str: String, dataOut: DataOutputStream) {
- val bytes = str.getBytes(UTF_8)
+ val bytes = str.getBytes(StandardCharsets.UTF_8)
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
}
@@ -817,7 +817,7 @@ private[spark] object PythonRDD extends Logging {
private
class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] {
- override def call(arr: Array[Byte]) : String = new String(arr, UTF_8)
+ override def call(arr: Array[Byte]) : String = new String(arr, StandardCharsets.UTF_8)
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index a2a2f89..433764b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -19,6 +19,7 @@ package org.apache.spark.api.python
import java.io.{DataInputStream, DataOutputStream, InputStream, OutputStreamWriter}
import java.net.{InetAddress, ServerSocket, Socket, SocketException}
+import java.nio.charset.StandardCharsets
import java.util.Arrays
import scala.collection.mutable
@@ -121,7 +122,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)
// Tell the worker our port
- val out = new OutputStreamWriter(worker.getOutputStream)
+ val out = new OutputStreamWriter(worker.getOutputStream, StandardCharsets.UTF_8)
out.write(serverSocket.getLocalPort + "\n")
out.flush()
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
index 9549784..34cb7c6 100644
--- a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
@@ -19,10 +19,10 @@ package org.apache.spark.api.python
import java.{util => ju}
import java.io.{DataInput, DataOutput}
+import java.nio.charset.StandardCharsets
import scala.collection.JavaConverters._
-import com.google.common.base.Charsets.UTF_8
import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
@@ -134,7 +134,7 @@ object WriteInputFormatTestDataGenerator {
sc.parallelize(intKeys).saveAsSequenceFile(intPath)
sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath)
sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath)
- sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes(UTF_8)) }
+ sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes(StandardCharsets.UTF_8)) }
).saveAsSequenceFile(bytesPath)
val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), (1, false))
sc.parallelize(bools).saveAsSequenceFile(boolPath)
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
index af815f8..c7fb192 100644
--- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
@@ -18,6 +18,7 @@
package org.apache.spark.api.r
import java.io.{DataInputStream, DataOutputStream}
+import java.nio.charset.StandardCharsets
import java.sql.{Date, Time, Timestamp}
import scala.collection.JavaConverters._
@@ -109,7 +110,7 @@ private[spark] object SerDe {
val bytes = new Array[Byte](len)
in.readFully(bytes)
assert(bytes(len - 1) == 0)
- val str = new String(bytes.dropRight(1), "UTF-8")
+ val str = new String(bytes.dropRight(1), StandardCharsets.UTF_8)
str
}
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index 434aadd..305994a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy
import java.io._
import java.net.URL
+import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeoutException
import scala.collection.mutable.ListBuffer
@@ -348,7 +349,8 @@ private class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile
def readState() {
try {
- val masterStream = new InputStreamReader(new URL("http://%s:8080/json".format(ip)).openStream)
+ val masterStream = new InputStreamReader(
+ new URL("http://%s:8080/json".format(ip)).openStream, StandardCharsets.UTF_8)
val json = JsonMethods.parse(masterStream)
val workers = json \ "workers"
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 175756b..a62096d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -20,6 +20,7 @@ package org.apache.spark.deploy
import java.io.{ByteArrayOutputStream, PrintStream}
import java.lang.reflect.InvocationTargetException
import java.net.URI
+import java.nio.charset.StandardCharsets
import java.util.{List => JList}
import java.util.jar.JarFile
@@ -608,7 +609,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
stream.flush()
// Get the output and discard any unnecessary lines from it.
- Source.fromString(new String(out.toByteArray())).getLines
+ Source.fromString(new String(out.toByteArray(), StandardCharsets.UTF_8)).getLines
.filter { line =>
!line.startsWith("log4j") && !line.startsWith("usage")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
index 006e2e1..d3e092a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.rest
import java.io.{DataOutputStream, FileNotFoundException}
import java.net.{ConnectException, HttpURLConnection, SocketException, URL}
+import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeoutException
import javax.servlet.http.HttpServletResponse
@@ -28,7 +29,6 @@ import scala.concurrent.duration._
import scala.io.Source
import com.fasterxml.jackson.core.JsonProcessingException
-import com.google.common.base.Charsets
import org.apache.spark.{Logging, SPARK_VERSION => sparkVersion, SparkConf}
import org.apache.spark.util.Utils
@@ -211,7 +211,7 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {
try {
val out = new DataOutputStream(conn.getOutputStream)
Utils.tryWithSafeFinally {
- out.write(json.getBytes(Charsets.UTF_8))
+ out.write(json.getBytes(StandardCharsets.UTF_8))
} {
out.close()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 6049db6..7f4fe26 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -18,10 +18,10 @@
package org.apache.spark.deploy.worker
import java.io._
+import java.nio.charset.StandardCharsets
import scala.collection.JavaConverters._
-import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
import org.apache.hadoop.fs.Path
@@ -174,7 +174,7 @@ private[deploy] class DriverRunner(
val stderr = new File(baseDir, "stderr")
val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"")
val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)
- Files.append(header, stderr, UTF_8)
+ Files.append(header, stderr, StandardCharsets.UTF_8)
CommandUtils.redirectStream(process.getErrorStream, stderr)
}
runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index c6687a4..208a1bb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -18,10 +18,10 @@
package org.apache.spark.deploy.worker
import java.io._
+import java.nio.charset.StandardCharsets
import scala.collection.JavaConverters._
-import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
import org.apache.spark.{Logging, SecurityManager, SparkConf}
@@ -168,7 +168,7 @@ private[deploy] class ExecutorRunner(
stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
val stderr = new File(executorDir, "stderr")
- Files.write(header, stderr, UTF_8)
+ Files.write(header, stderr, StandardCharsets.UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 8354e2a..2d76d08 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -19,11 +19,11 @@ package org.apache.spark.scheduler
import java.io._
import java.net.URI
+import java.nio.charset.StandardCharsets
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
-import com.google.common.base.Charsets
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
import org.apache.hadoop.fs.permission.FsPermission
@@ -254,7 +254,7 @@ private[spark] object EventLoggingListener extends Logging {
def initEventLog(logStream: OutputStream): Unit = {
val metadata = SparkListenerLogStart(SPARK_VERSION)
val metadataJson = compact(JsonProtocol.logStartToJson(metadata)) + "\n"
- logStream.write(metadataJson.getBytes(Charsets.UTF_8))
+ logStream.write(metadataJson.getBytes(StandardCharsets.UTF_8))
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
index 3d5b710..1a8e545 100644
--- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
@@ -19,6 +19,7 @@ package org.apache.spark.serializer
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
import scala.collection.mutable
@@ -86,7 +87,7 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
schemaBytes.arrayOffset() + schemaBytes.position(),
schemaBytes.remaining())
val bytes = IOUtils.toByteArray(codec.compressedInputStream(bis))
- new Schema.Parser().parse(new String(bytes, "UTF-8"))
+ new Schema.Parser().parse(new String(bytes, StandardCharsets.UTF_8))
})
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index b4c4951..b5a98ce 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer
import java.nio.channels.Channels
+import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.util.{Locale, Properties, Random, UUID}
import java.util.concurrent._
@@ -1904,7 +1905,7 @@ private[spark] object Utils extends Logging {
require(file.exists(), s"Properties file $file does not exist")
require(file.isFile(), s"Properties file $file is not a normal file")
- val inReader = new InputStreamReader(new FileInputStream(file), "UTF-8")
+ val inReader = new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8)
try {
val properties = new Properties()
properties.load(inReader)
@@ -2344,7 +2345,7 @@ private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.Ou
def read(): Int = if (iterator.hasNext) iterator.next() else -1
}
- val reader = new BufferedReader(new InputStreamReader(input))
+ val reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8))
val stringBuilder = new StringBuilder
var line = reader.readLine()
while (line != null) {
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index e6a4ab7..a7e74c0 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -21,6 +21,7 @@ import java.io.*;
import java.nio.channels.FileChannel;
import java.nio.ByteBuffer;
import java.net.URI;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -45,7 +46,6 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.base.Throwables;
-import com.google.common.base.Charsets;
import com.google.common.io.Files;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
@@ -1058,7 +1058,7 @@ public class JavaAPISuite implements Serializable {
rdd.saveAsTextFile(outputDir);
// Read the plain text file and check it's OK
File outputFile = new File(outputDir, "part-00000");
- String content = Files.toString(outputFile, Charsets.UTF_8);
+ String content = Files.toString(outputFile, StandardCharsets.UTF_8);
Assert.assertEquals("1\n2\n3\n4\n", content);
// Also try reading it in as a text file RDD
List<String> expected = Arrays.asList("1", "2", "3", "4");
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
index b4fa33f..a350270 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
@@ -17,6 +17,7 @@
package org.apache.spark.shuffle.sort;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Random;
@@ -41,7 +42,7 @@ public class ShuffleInMemorySorterSuite {
private static String getStringFromDataPage(Object baseObject, long baseOffset, int strLength) {
final byte[] strBytes = new byte[strLength];
Platform.copyMemory(baseObject, baseOffset, strBytes, Platform.BYTE_ARRAY_OFFSET, strLength);
- return new String(strBytes);
+ return new String(strBytes, StandardCharsets.UTF_8);
}
@Test
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index b757ddc..a79ed58 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -80,7 +80,6 @@ public class UnsafeExternalSorterSuite {
}
};
- SparkConf sparkConf;
File tempDir;
@Mock(answer = RETURNS_SMART_NULLS) BlockManager blockManager;
@Mock(answer = RETURNS_SMART_NULLS) DiskBlockManager diskBlockManager;
@@ -99,7 +98,6 @@ public class UnsafeExternalSorterSuite {
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
- sparkConf = new SparkConf();
tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "unsafe-test");
spillFilesCreated.clear();
taskContext = mock(TaskContext.class);
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
index ff41768..90849ab 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
@@ -17,6 +17,7 @@
package org.apache.spark.util.collection.unsafe.sort;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.junit.Assert;
@@ -41,7 +42,7 @@ public class UnsafeInMemorySorterSuite {
private static String getStringFromDataPage(Object baseObject, long baseOffset, int length) {
final byte[] strBytes = new byte[length];
Platform.copyMemory(baseObject, baseOffset, strBytes, Platform.BYTE_ARRAY_OFFSET, length);
- return new String(strBytes);
+ return new String(strBytes, StandardCharsets.UTF_8);
}
@Test
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 556afd0..841fd02 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -18,12 +18,12 @@
package org.apache.spark
import java.io.File
+import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import scala.concurrent.Await
import scala.concurrent.duration.Duration
-import com.google.common.base.Charsets._
import com.google.common.io.Files
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
@@ -115,8 +115,8 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
val absolutePath2 = file2.getAbsolutePath
try {
- Files.write("somewords1", file1, UTF_8)
- Files.write("somewords2", file2, UTF_8)
+ Files.write("somewords1", file1, StandardCharsets.UTF_8)
+ Files.write("somewords2", file2, StandardCharsets.UTF_8)
val length1 = file1.length()
val length2 = file2.length()
@@ -243,11 +243,12 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
try {
// Create 5 text files.
- Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", file1, UTF_8)
- Files.write("someline1 in file2\nsomeline2 in file2", file2, UTF_8)
- Files.write("someline1 in file3", file3, UTF_8)
- Files.write("someline1 in file4\nsomeline2 in file4", file4, UTF_8)
- Files.write("someline1 in file2\nsomeline2 in file5", file5, UTF_8)
+ Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", file1,
+ StandardCharsets.UTF_8)
+ Files.write("someline1 in file2\nsomeline2 in file2", file2, StandardCharsets.UTF_8)
+ Files.write("someline1 in file3", file3, StandardCharsets.UTF_8)
+ Files.write("someline1 in file4\nsomeline2 in file4", file4, StandardCharsets.UTF_8)
+ Files.write("someline1 in file2\nsomeline2 in file5", file5, StandardCharsets.UTF_8)
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala
index 41f2a5c..05b4e67 100644
--- a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.api.python
import java.io.{ByteArrayOutputStream, DataOutputStream}
+import java.nio.charset.StandardCharsets
import org.apache.spark.SparkFunSuite
@@ -35,10 +36,12 @@ class PythonRDDSuite extends SparkFunSuite {
// The correctness will be tested in Python
PythonRDD.writeIteratorToStream(Iterator("a", null), buffer)
PythonRDD.writeIteratorToStream(Iterator(null, "a"), buffer)
- PythonRDD.writeIteratorToStream(Iterator("a".getBytes, null), buffer)
- PythonRDD.writeIteratorToStream(Iterator(null, "a".getBytes), buffer)
+ PythonRDD.writeIteratorToStream(Iterator("a".getBytes(StandardCharsets.UTF_8), null), buffer)
+ PythonRDD.writeIteratorToStream(Iterator(null, "a".getBytes(StandardCharsets.UTF_8)), buffer)
PythonRDD.writeIteratorToStream(Iterator((null, null), ("a", null), (null, "b")), buffer)
- PythonRDD.writeIteratorToStream(
- Iterator((null, null), ("a".getBytes, null), (null, "b".getBytes)), buffer)
+ PythonRDD.writeIteratorToStream(Iterator(
+ (null, null),
+ ("a".getBytes(StandardCharsets.UTF_8), null),
+ (null, "b".getBytes(StandardCharsets.UTF_8))), buffer)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 41ac60e..91fef77 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -18,10 +18,10 @@
package org.apache.spark.deploy
import java.io._
+import java.nio.charset.StandardCharsets
import scala.collection.mutable.ArrayBuffer
-import com.google.common.base.Charsets.UTF_8
import com.google.common.io.ByteStreams
import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.scalatest.concurrent.Timeouts
@@ -593,7 +593,7 @@ class SparkSubmitSuite
val tmpDir = Utils.createTempDir()
val defaultsConf = new File(tmpDir.getAbsolutePath, "spark-defaults.conf")
- val writer = new OutputStreamWriter(new FileOutputStream(defaultsConf))
+ val writer = new OutputStreamWriter(new FileOutputStream(defaultsConf), StandardCharsets.UTF_8)
for ((key, value) <- defaults) writer.write(s"$key $value\n")
writer.close()
@@ -661,7 +661,7 @@ object UserClasspathFirstTest {
val ccl = Thread.currentThread().getContextClassLoader()
val resource = ccl.getResourceAsStream("test.resource")
val bytes = ByteStreams.toByteArray(resource)
- val contents = new String(bytes, 0, bytes.length, UTF_8)
+ val contents = new String(bytes, 0, bytes.length, StandardCharsets.UTF_8)
if (contents != "USER") {
throw new SparkException("Should have read user resource, but instead read: " + contents)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 8e8007f..5fd599e 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -20,13 +20,13 @@ package org.apache.spark.deploy.history
import java.io.{BufferedOutputStream, ByteArrayInputStream, ByteArrayOutputStream, File,
FileOutputStream, OutputStreamWriter}
import java.net.URI
+import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import java.util.zip.{ZipInputStream, ZipOutputStream}
import scala.concurrent.duration._
import scala.language.postfixOps
-import com.google.common.base.Charsets
import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.json4s.jackson.JsonMethods._
@@ -320,8 +320,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
var entry = inputStream.getNextEntry
entry should not be null
while (entry != null) {
- val actual = new String(ByteStreams.toByteArray(inputStream), Charsets.UTF_8)
- val expected = Files.toString(logs.find(_.getName == entry.getName).get, Charsets.UTF_8)
+ val actual = new String(ByteStreams.toByteArray(inputStream), StandardCharsets.UTF_8)
+ val expected =
+ Files.toString(logs.find(_.getName == entry.getName).get, StandardCharsets.UTF_8)
actual should be (expected)
totalEntries += 1
entry = inputStream.getNextEntry
@@ -415,7 +416,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
if (isNewFormat) {
EventLoggingListener.initEventLog(new FileOutputStream(file))
}
- val writer = new OutputStreamWriter(bstream, "UTF-8")
+ val writer = new OutputStreamWriter(bstream, StandardCharsets.UTF_8)
Utils.tryWithSafeFinally {
events.foreach(e => writer.write(compact(render(JsonProtocol.sparkEventToJson(e))) + "\n"))
} {
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index e5cd2ed..5822261 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -18,6 +18,7 @@ package org.apache.spark.deploy.history
import java.io.{File, FileInputStream, FileWriter, InputStream, IOException}
import java.net.{HttpURLConnection, URL}
+import java.nio.charset.StandardCharsets
import java.util.zip.ZipInputStream
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
@@ -25,7 +26,6 @@ import scala.concurrent.duration._
import scala.language.postfixOps
import com.codahale.metrics.Counter
-import com.google.common.base.Charsets
import com.google.common.io.{ByteStreams, Files}
import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
@@ -216,8 +216,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
val expectedFile = {
new File(logDir, entry.getName)
}
- val expected = Files.toString(expectedFile, Charsets.UTF_8)
- val actual = new String(ByteStreams.toByteArray(zipStream), Charsets.UTF_8)
+ val expected = Files.toString(expectedFile, StandardCharsets.UTF_8)
+ val actual = new String(ByteStreams.toByteArray(zipStream), StandardCharsets.UTF_8)
actual should be (expected)
filesCompared += 1
}
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index ee889bf..a7bb9aa 100644
--- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -19,11 +19,11 @@ package org.apache.spark.deploy.rest
import java.io.DataOutputStream
import java.net.{HttpURLConnection, URL}
+import java.nio.charset.StandardCharsets
import javax.servlet.http.HttpServletResponse
import scala.collection.mutable
-import com.google.common.base.Charsets
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._
import org.scalatest.BeforeAndAfterEach
@@ -498,7 +498,7 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with BeforeAndAfterEach {
if (body.nonEmpty) {
conn.setDoOutput(true)
val out = new DataOutputStream(conn.getOutputStream)
- out.write(body.getBytes(Charsets.UTF_8))
+ out.write(body.getBytes(StandardCharsets.UTF_8))
out.close()
}
conn
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
index 47dbcb8..02806a1 100644
--- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.network.netty
import java.io.InputStreamReader
import java.nio._
-import java.nio.charset.Charset
+import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import scala.concurrent.{Await, Promise}
@@ -103,7 +103,8 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi
val blockManager = mock[BlockDataManager]
val blockId = ShuffleBlockId(0, 1, 2)
val blockString = "Hello, world!"
- val blockBuffer = new NioManagedBuffer(ByteBuffer.wrap(blockString.getBytes))
+ val blockBuffer = new NioManagedBuffer(ByteBuffer.wrap(
+ blockString.getBytes(StandardCharsets.UTF_8)))
when(blockManager.getBlockData(blockId)).thenReturn(blockBuffer)
val securityManager0 = new SecurityManager(conf0)
@@ -117,7 +118,7 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi
val result = fetchBlock(exec0, exec1, "1", blockId) match {
case Success(buf) =>
val actualString = CharStreams.toString(
- new InputStreamReader(buf.createInputStream(), Charset.forName("UTF-8")))
+ new InputStreamReader(buf.createInputStream(), StandardCharsets.UTF_8))
actualString should equal(blockString)
buf.release()
Success()
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
index b367cc8..d30eafd 100644
--- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -18,12 +18,12 @@
package org.apache.spark.util
import java.io._
+import java.nio.charset.StandardCharsets
import java.util.concurrent.CountDownLatch
import scala.collection.mutable.HashSet
import scala.reflect._
-import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
import org.apache.log4j.{Appender, Level, Logger}
import org.apache.log4j.spi.LoggingEvent
@@ -48,11 +48,11 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
test("basic file appender") {
val testString = (1 to 1000).mkString(", ")
- val inputStream = new ByteArrayInputStream(testString.getBytes(UTF_8))
+ val inputStream = new ByteArrayInputStream(testString.getBytes(StandardCharsets.UTF_8))
val appender = new FileAppender(inputStream, testFile)
inputStream.close()
appender.awaitTermination()
- assert(Files.toString(testFile, UTF_8) === testString)
+ assert(Files.toString(testFile, StandardCharsets.UTF_8) === testString)
}
test("rolling file appender - time-based rolling") {
@@ -100,7 +100,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
val allGeneratedFiles = new HashSet[String]()
val items = (1 to 10).map { _.toString * 10000 }
for (i <- 0 until items.size) {
- testOutputStream.write(items(i).getBytes(UTF_8))
+ testOutputStream.write(items(i).getBytes(StandardCharsets.UTF_8))
testOutputStream.flush()
allGeneratedFiles ++= RollingFileAppender.getSortedRolledOverFiles(
testFile.getParentFile.toString, testFile.getName).map(_.toString)
@@ -267,7 +267,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
// send data to appender through the input stream, and wait for the data to be written
val expectedText = textToAppend.mkString("")
for (i <- 0 until textToAppend.size) {
- outputStream.write(textToAppend(i).getBytes(UTF_8))
+ outputStream.write(textToAppend(i).getBytes(StandardCharsets.UTF_8))
outputStream.flush()
Thread.sleep(sleepTimeBetweenTexts)
}
@@ -282,7 +282,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
logInfo("Filtered files: \n" + generatedFiles.mkString("\n"))
assert(generatedFiles.size > 1)
val allText = generatedFiles.map { file =>
- Files.toString(file, UTF_8)
+ Files.toString(file, StandardCharsets.UTF_8)
}.mkString("")
assert(allText === expectedText)
generatedFiles
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 412c0ac..093d1bd 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -21,6 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, FileOutputStr
import java.lang.{Double => JDouble, Float => JFloat}
import java.net.{BindException, ServerSocket, URI}
import java.nio.{ByteBuffer, ByteOrder}
+import java.nio.charset.StandardCharsets
import java.text.DecimalFormatSymbols
import java.util.Locale
import java.util.concurrent.TimeUnit
@@ -28,7 +29,6 @@ import java.util.concurrent.TimeUnit
import scala.collection.mutable.ListBuffer
import scala.util.Random
-import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
@@ -268,7 +268,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
val tmpDir2 = Utils.createTempDir()
val f1Path = tmpDir2 + "/f1"
val f1 = new FileOutputStream(f1Path)
- f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(UTF_8))
+ f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(StandardCharsets.UTF_8))
f1.close()
// Read first few bytes
@@ -295,9 +295,9 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
test("reading offset bytes across multiple files") {
val tmpDir = Utils.createTempDir()
val files = (1 to 3).map(i => new File(tmpDir, i.toString))
- Files.write("0123456789", files(0), UTF_8)
- Files.write("abcdefghij", files(1), UTF_8)
- Files.write("ABCDEFGHIJ", files(2), UTF_8)
+ Files.write("0123456789", files(0), StandardCharsets.UTF_8)
+ Files.write("abcdefghij", files(1), StandardCharsets.UTF_8)
+ Files.write("ABCDEFGHIJ", files(2), StandardCharsets.UTF_8)
// Read first few bytes in the 1st file
assert(Utils.offsetBytes(files, 0, 5) === "01234")
@@ -529,7 +529,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
try {
System.setProperty("spark.test.fileNameLoadB", "2")
Files.write("spark.test.fileNameLoadA true\n" +
- "spark.test.fileNameLoadB 1\n", outFile, UTF_8)
+ "spark.test.fileNameLoadB 1\n", outFile, StandardCharsets.UTF_8)
val properties = Utils.getPropertiesFromFile(outFile.getAbsolutePath)
properties
.filter { case (k, v) => k.startsWith("spark.")}
@@ -559,7 +559,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
val innerSourceDir = Utils.createTempDir(root = sourceDir.getPath)
val sourceFile = File.createTempFile("someprefix", "somesuffix", innerSourceDir)
val targetDir = new File(tempDir, "target-dir")
- Files.write("some text", sourceFile, UTF_8)
+ Files.write("some text", sourceFile, StandardCharsets.UTF_8)
val path =
if (Utils.isWindows) {
@@ -801,7 +801,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
|trap "" SIGTERM
|sleep 10
""".stripMargin
- Files.write(cmd.getBytes(), file)
+ Files.write(cmd.getBytes(StandardCharsets.UTF_8), file)
file.getAbsoluteFile.setExecutable(true)
val process = new ProcessBuilder(file.getAbsolutePath).start()
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/docs/streaming-custom-receivers.md
----------------------------------------------------------------------
diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md
index 8454774..732c83d 100644
--- a/docs/streaming-custom-receivers.md
+++ b/docs/streaming-custom-receivers.md
@@ -72,7 +72,8 @@ class CustomReceiver(host: String, port: Int)
socket = new Socket(host, port)
// Until stopped or connection broken continue reading
- val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
+ val reader = new BufferedReader(
+ new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
userInput = reader.readLine()
while(!isStopped && userInput != null) {
store(userInput)
@@ -135,7 +136,8 @@ public class JavaCustomReceiver extends Receiver<String> {
// connect to the server
socket = new Socket(host, port);
- BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+ BufferedReader reader = new BufferedReader(
+ new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
// Until stopped or connection broken continue reading
while (!isStopped() && (userInput = reader.readLine()) != null) {
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
index 5de5634..4544ad2 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
@@ -36,6 +36,7 @@ import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.ConnectException;
import java.net.Socket;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Iterator;
import java.util.regex.Pattern;
@@ -130,7 +131,8 @@ public class JavaCustomReceiver extends Receiver<String> {
try {
// connect to the server
socket = new Socket(host, port);
- reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+ reader = new BufferedReader(
+ new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
// Until stopped or connection broken continue reading
while (!isStopped() && (userInput = reader.readLine()) != null) {
System.out.println("Received data '" + userInput + "'");
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
index 5ce5778..d67da27 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
@@ -20,6 +20,7 @@ package org.apache.spark.examples.streaming
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
+import java.nio.charset.StandardCharsets
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StorageLevel
@@ -83,7 +84,8 @@ class CustomReceiver(host: String, port: Int)
logInfo("Connecting to " + host + ":" + port)
socket = new Socket(host, port)
logInfo("Connected to " + host + ":" + port)
- val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
+ val reader = new BufferedReader(
+ new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
userInput = reader.readLine()
while(!isStopped && userInput != null) {
store(userInput)
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
index 7f6cecf..e8ca1e7 100644
--- a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
+++ b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.streaming.flume.sink
import java.net.InetSocketAddress
+import java.nio.charset.StandardCharsets
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
@@ -184,7 +185,8 @@ class SparkSinkSuite extends FunSuite {
private def putEvents(ch: MemoryChannel, count: Int): Unit = {
val tx = ch.getTransaction
tx.begin()
- (1 to count).foreach(x => ch.put(EventBuilder.withBody(x.toString.getBytes)))
+ (1 to count).foreach(x =>
+ ch.put(EventBuilder.withBody(x.toString.getBytes(StandardCharsets.UTF_8))))
tx.commit()
tx.close()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
index 3f87ce4..945cfa7 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
@@ -19,12 +19,12 @@ package org.apache.spark.streaming.flume
import java.net.{InetSocketAddress, ServerSocket}
import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
import java.util.{List => JList}
import java.util.Collections
import scala.collection.JavaConverters._
-import com.google.common.base.Charsets.UTF_8
import org.apache.avro.ipc.NettyTransceiver
import org.apache.avro.ipc.specific.SpecificRequestor
import org.apache.commons.lang3.RandomUtils
@@ -65,7 +65,7 @@ private[flume] class FlumeTestUtils {
val inputEvents = input.asScala.map { item =>
val event = new AvroFlumeEvent
- event.setBody(ByteBuffer.wrap(item.getBytes(UTF_8)))
+ event.setBody(ByteBuffer.wrap(item.getBytes(StandardCharsets.UTF_8)))
event.setHeaders(Collections.singletonMap("test", "header"))
event
}
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
index 9515d07..1a96df6 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
@@ -17,12 +17,12 @@
package org.apache.spark.streaming.flume
+import java.nio.charset.StandardCharsets
import java.util.{Collections, List => JList, Map => JMap}
import java.util.concurrent._
import scala.collection.mutable.ArrayBuffer
-import com.google.common.base.Charsets.UTF_8
import org.apache.flume.event.EventBuilder
import org.apache.flume.Context
import org.apache.flume.channel.MemoryChannel
@@ -193,7 +193,8 @@ private[flume] class PollingFlumeTestUtils {
val tx = channel.getTransaction
tx.begin()
for (j <- 0 until eventsPerBatch) {
- channel.put(EventBuilder.withBody(s"${channel.getName}-$t".getBytes(UTF_8),
+ channel.put(EventBuilder.withBody(
+ s"${channel.getName}-$t".getBytes(StandardCharsets.UTF_8),
Collections.singletonMap(s"test-$t", "header")))
t += 1
}
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 0cb875c..72d9053 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -19,12 +19,12 @@ package org.apache.spark.streaming.kafka
import java.io.OutputStream
import java.lang.{Integer => JInt, Long => JLong}
+import java.nio.charset.StandardCharsets
import java.util.{List => JList, Map => JMap, Set => JSet}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
-import com.google.common.base.Charsets.UTF_8
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.{Decoder, DefaultDecoder, StringDecoder}
@@ -787,7 +787,7 @@ private object KafkaUtilsPythonHelper {
def pickle(obj: Object, out: OutputStream, pickler: Pickler) {
if (obj == this) {
out.write(Opcodes.GLOBAL)
- out.write(s"$module\nKafkaMessageAndMetadata\n".getBytes(UTF_8))
+ out.write(s"$module\nKafkaMessageAndMetadata\n".getBytes(StandardCharsets.UTF_8))
} else {
pickler.save(this)
val msgAndMetaData = obj.asInstanceOf[PythonMessageAndMetadata]
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index 0ace453..026387e 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming.kinesis
import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
@@ -242,7 +243,7 @@ private[kinesis] class SimpleDataGenerator(
val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
data.foreach { num =>
val str = num.toString
- val data = ByteBuffer.wrap(str.getBytes())
+ val data = ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8))
val putRecordRequest = new PutRecordRequest().withStreamName(streamName)
.withData(data)
.withPartitionKey(str)
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
index fdb270e..0b455e5 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
@@ -17,6 +17,7 @@
package org.apache.spark.streaming.kinesis
import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -51,7 +52,7 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG
val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
data.foreach { num =>
val str = num.toString
- val data = ByteBuffer.wrap(str.getBytes())
+ val data = ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8))
val future = producer.addUserRecord(streamName, str, data)
val kinesisCallBack = new FutureCallback[UserRecordResult]() {
override def onFailure(t: Throwable): Unit = {} // do nothing
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 079bd8a..cbad6f7 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -17,6 +17,8 @@
package org.apache.spark.streaming.mqtt
+import java.nio.charset.StandardCharsets
+
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
import org.eclipse.paho.client.mqttv3.MqttCallback
import org.eclipse.paho.client.mqttv3.MqttClient
@@ -75,7 +77,7 @@ class MQTTReceiver(
// Handles Mqtt message
override def messageArrived(topic: String, message: MqttMessage) {
- store(new String(message.getPayload(), "utf-8"))
+ store(new String(message.getPayload(), StandardCharsets.UTF_8))
}
override def deliveryComplete(token: IMqttDeliveryToken) {
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
index 26c6dc4..3680c13 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
@@ -18,10 +18,10 @@
package org.apache.spark.streaming.mqtt
import java.net.{ServerSocket, URI}
+import java.nio.charset.StandardCharsets
import scala.language.postfixOps
-import com.google.common.base.Charsets.UTF_8
import org.apache.activemq.broker.{BrokerService, TransportConnector}
import org.apache.commons.lang3.RandomUtils
import org.eclipse.paho.client.mqttv3._
@@ -85,7 +85,7 @@ private[mqtt] class MQTTTestUtils extends Logging {
client.connect()
if (client.isConnected) {
val msgTopic = client.getTopic(topic)
- val message = new MqttMessage(data.getBytes(UTF_8))
+ val message = new MqttMessage(data.getBytes(StandardCharsets.UTF_8))
message.setQos(1)
message.setRetained(true)
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/graphx/src/test/scala/org/apache/spark/graphx/GraphLoaderSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphLoaderSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphLoaderSuite.scala
index bff9f32..e55b05f 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphLoaderSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphLoaderSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.graphx
import java.io.File
import java.io.FileOutputStream
import java.io.OutputStreamWriter
+import java.nio.charset.StandardCharsets
import org.apache.spark.SparkFunSuite
import org.apache.spark.util.Utils
@@ -30,7 +31,7 @@ class GraphLoaderSuite extends SparkFunSuite with LocalSparkContext {
withSpark { sc =>
val tmpDir = Utils.createTempDir()
val graphFile = new File(tmpDir.getAbsolutePath, "graph.txt")
- val writer = new OutputStreamWriter(new FileOutputStream(graphFile))
+ val writer = new OutputStreamWriter(new FileOutputStream(graphFile), StandardCharsets.UTF_8)
for (i <- (1 until 101)) writer.write(s"$i 0\n")
writer.close()
try {
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
index 4641032..20387e0 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
@@ -23,6 +23,7 @@ import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -102,7 +103,7 @@ abstract class AbstractCommandBuilder {
File javaOpts = new File(join(File.separator, getConfDir(), "java-opts"));
if (javaOpts.isFile()) {
BufferedReader br = new BufferedReader(new InputStreamReader(
- new FileInputStream(javaOpts), "UTF-8"));
+ new FileInputStream(javaOpts), StandardCharsets.UTF_8));
try {
String line;
while ((line = br.readLine()) != null) {
@@ -301,7 +302,7 @@ abstract class AbstractCommandBuilder {
FileInputStream fd = null;
try {
fd = new FileInputStream(propsFile);
- props.load(new InputStreamReader(fd, "UTF-8"));
+ props.load(new InputStreamReader(fd, StandardCharsets.UTF_8));
for (Map.Entry<Object, Object> e : props.entrySet()) {
e.setValue(e.getValue().toString().trim());
}
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java
index 6e71201..c7959ae 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java
@@ -21,6 +21,7 @@ import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadFactory;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -42,7 +43,7 @@ class OutputRedirector {
OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
this.active = true;
- this.reader = new BufferedReader(new InputStreamReader(in));
+ this.reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
this.thread = tf.newThread(new Runnable() {
@Override
public void run() {
http://git-wip-us.apache.org/repos/asf/spark/blob/18408528/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
----------------------------------------------------------------------
diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
index d367318..a85afb5 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
@@ -199,11 +199,7 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite {
for (String arg : cmd) {
if (arg.startsWith("-XX:MaxPermSize=")) {
- if (isDriver) {
- assertEquals("-XX:MaxPermSize=256m", arg);
- } else {
- assertEquals("-XX:MaxPermSize=256m", arg);
- }
+ assertEquals("-XX:MaxPermSize=256m", arg);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org