You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/02/09 23:17:32 UTC
spark git commit: [SPARK-4905][STREAMING] FlumeStreamSuite fix.
Repository: spark
Updated Branches:
refs/heads/master 6fe70d843 -> 0765af9b2
[SPARK-4905][STREAMING] FlumeStreamSuite fix.
Using String constructor instead of CharsetDecoder to see if it fixes the issue of empty strings in Flume test output.
Author: Hari Shreedharan <hs...@apache.org>
Closes #4371 from harishreedharan/Flume-stream-attempted-fix and squashes the following commits:
550d363 [Hari Shreedharan] Fix imports.
8695950 [Hari Shreedharan] Use Charsets.UTF_8 instead of "UTF-8" in String constructors.
af3ba14 [Hari Shreedharan] [SPARK-4905][STREAMING] FlumeStreamSuite fix.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0765af9b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0765af9b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0765af9b
Branch: refs/heads/master
Commit: 0765af9b21e9204c410c7a849c7201bc3eda8cc3
Parents: 6fe70d8
Author: Hari Shreedharan <hs...@apache.org>
Authored: Mon Feb 9 14:17:14 2015 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Mon Feb 9 14:17:14 2015 -0800
----------------------------------------------------------------------
.../org/apache/spark/streaming/flume/FlumeStreamSuite.scala | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/0765af9b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index f333e38..322de7b 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -19,13 +19,13 @@ package org.apache.spark.streaming.flume
import java.net.{InetSocketAddress, ServerSocket}
import java.nio.ByteBuffer
-import java.nio.charset.Charset
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.concurrent.duration._
import scala.language.postfixOps
+import com.google.common.base.Charsets
import org.apache.avro.ipc.NettyTransceiver
import org.apache.avro.ipc.specific.SpecificRequestor
import org.apache.flume.source.avro
@@ -108,7 +108,7 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L
val inputEvents = input.map { item =>
val event = new AvroFlumeEvent
- event.setBody(ByteBuffer.wrap(item.getBytes("UTF-8")))
+ event.setBody(ByteBuffer.wrap(item.getBytes(Charsets.UTF_8)))
event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
event
}
@@ -138,14 +138,13 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L
status should be (avro.Status.OK)
}
- val decoder = Charset.forName("UTF-8").newDecoder()
eventually(timeout(10 seconds), interval(100 milliseconds)) {
val outputEvents = outputBuffer.flatten.map { _.event }
outputEvents.foreach {
event =>
event.getHeaders.get("test") should be("header")
}
- val output = outputEvents.map(event => decoder.decode(event.getBody()).toString)
+ val output = outputEvents.map(event => new String(event.getBody.array(), Charsets.UTF_8))
output should be (input)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org