You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tdas <gi...@git.apache.org> on 2014/10/12 02:22:56 UTC

[GitHub] spark pull request: [SPARK-3912][Streaming] Fixed flakyFlumeStream...

GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/2773

    [SPARK-3912][Streaming] Fixed flakyFlumeStreamSuite

    @harishreedharan @pwendell 
    See JIRA for diagnosis of the problem 
    https://issues.apache.org/jira/browse/SPARK-3912
    
    The solution was to reimplement it. 
    1. Find a free port (by binding and releasing a server-scoket), and then use that port
    2. Remove thread.sleep()s, instead repeatedly try to create a sender and send data and check whether data was sent. Use eventually() to minimize waiting time.
    3. Check whether all the data was received, without caring about batches.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark flume-test-fix

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2773.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2773
    
----
commit 93cd7f62bf31c9015f30eb25439d368bac3c57c5
Author: Tathagata Das <ta...@gmail.com>
Date:   2014-10-12T00:04:01Z

    Reimplimented FlumeStreamSuite to be more robust.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3912][Streaming] Fixed flakyFlumeStream...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2773#issuecomment-58770566
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21650/consoleFull) for   PR 2773 at commit [`93cd7f6`](https://github.com/apache/spark/commit/93cd7f62bf31c9015f30eb25439d368bac3c57c5).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with Logging `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3912][Streaming] Fixed flakyFlumeStream...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2773#issuecomment-58775579
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21659/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3912][Streaming] Fixed flakyFlumeStream...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/2773#issuecomment-58956850
  
    +1. This looks good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3912][Streaming] Fixed flakyFlumeStream...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2773#discussion_r18795789
  
    --- Diff: external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala ---
    @@ -17,103 +17,141 @@
     
     package org.apache.spark.streaming.flume
     
    -import scala.collection.JavaConversions._
    -import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
    -
    -import java.net.InetSocketAddress
    +import java.net.{InetSocketAddress, ServerSocket}
     import java.nio.ByteBuffer
     import java.nio.charset.Charset
     
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
    +import scala.concurrent.duration._
    +import scala.language.postfixOps
    +
     import org.apache.avro.ipc.NettyTransceiver
     import org.apache.avro.ipc.specific.SpecificRequestor
    +import org.apache.flume.source.avro
     import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
    +import org.jboss.netty.channel.ChannelPipeline
    +import org.jboss.netty.channel.socket.SocketChannel
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    +import org.jboss.netty.handler.codec.compression._
    +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
    +import org.scalatest.concurrent.Eventually._
     
    +import org.apache.spark.{Logging, SparkConf}
     import org.apache.spark.storage.StorageLevel
    -import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
    -import org.apache.spark.streaming.util.ManualClock
    +import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
    +import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerReceiverStarted}
     import org.apache.spark.util.Utils
     
    -import org.jboss.netty.channel.ChannelPipeline
    -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    -import org.jboss.netty.channel.socket.SocketChannel
    -import org.jboss.netty.handler.codec.compression._
    +class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {
    +  val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite")
    +
    +  var ssc: StreamingContext = null
    +  var transceiver: NettyTransceiver = null
     
    -class FlumeStreamSuite extends TestSuiteBase {
    +  after {
    +    if (ssc != null) {
    +      ssc.stop()
    +    }
    +    if (transceiver != null) {
    +      transceiver.close()
    +    }
    +  }
     
       test("flume input stream") {
    -    runFlumeStreamTest(false)
    +    testFlumeStream(testCompression = false)
       }
     
       test("flume input compressed stream") {
    -    runFlumeStreamTest(true)
    +    testFlumeStream(testCompression = true)
    +  }
    +
    +  /** Run test on flume stream */
    +  private def testFlumeStream(testCompression: Boolean): Unit = {
    +    val input = (1 to 100).map { _.toString }
    +    val testPort = findFreePort()
    +    val outputBuffer = startContext(testPort, testCompression)
    +    writeAndVerify(input, testPort, outputBuffer, testCompression)
    +  }
    +
    +  /** Find a free port */
    +  private def findFreePort(): Int = {
    +    Utils.startServiceOnPort(23456, (trialPort: Int) => {
    +      val socket = new ServerSocket(trialPort)
    +      socket.close()
    +      (null, trialPort)
    +    })._2
       }
    -  
    -  def runFlumeStreamTest(enableDecompression: Boolean) {
    -    // Set up the streaming context and input streams
    -    val ssc = new StreamingContext(conf, batchDuration)
    -    val (flumeStream, testPort) =
    -      Utils.startServiceOnPort(9997, (trialPort: Int) => {
    -        val dstream = FlumeUtils.createStream(
    -          ssc, "localhost", trialPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
    -        (dstream, trialPort)
    -      })
     
    +  /** Setup and start the streaming context */
    +  private def startContext(
    +      testPort: Int, testCompression: Boolean): (ArrayBuffer[Seq[SparkFlumeEvent]]) = {
    +    ssc = new StreamingContext(conf, Milliseconds(200))
    +    val flumeStream = FlumeUtils.createStream(
    +      ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression)
         val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
           with SynchronizedBuffer[Seq[SparkFlumeEvent]]
         val outputStream = new TestOutputStream(flumeStream, outputBuffer)
         outputStream.register()
         ssc.start()
    +    outputBuffer
    +  }
     
    -    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    val input = Seq(1, 2, 3, 4, 5)
    -    Thread.sleep(1000)
    -    val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
    -    var client: AvroSourceProtocol = null
    -
    -    if (enableDecompression) {
    -      client = SpecificRequestor.getClient(
    -          classOf[AvroSourceProtocol], 
    -          new NettyTransceiver(new InetSocketAddress("localhost", testPort), 
    -          new CompressionChannelFactory(6)))
    -    } else {
    -      client = SpecificRequestor.getClient(
    -        classOf[AvroSourceProtocol], transceiver)
    -    }
    +  /** Send data to the flume receiver and verify whether the data was received */
    +  private def writeAndVerify(
    +      input: Seq[String],
    +      testPort: Int,
    +      outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]],
    +      enableCompression: Boolean
    +    ) {
    +    val testAddress = new InetSocketAddress("localhost", testPort)
     
    -    for (i <- 0 until input.size) {
    +    val inputEvents = input.map { item =>
           val event = new AvroFlumeEvent
    -      event.setBody(ByteBuffer.wrap(input(i).toString.getBytes("utf-8")))
    +      event.setBody(ByteBuffer.wrap(item.getBytes("UTF-8")))
           event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
    -      client.append(event)
    -      Thread.sleep(500)
    -      clock.addToTime(batchDuration.milliseconds)
    +      event
         }
     
    -    Thread.sleep(1000)
    -
    -    val startTime = System.currentTimeMillis()
    -    while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
    -      logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
    -      Thread.sleep(100)
    +    eventually(timeout(10 seconds), interval(100 milliseconds)) {
    +      // if last attempted transceiver had succeeded, close it
    +      if (transceiver != null) {
    +        transceiver.close()
    +        transceiver = null
    +      }
    +
    +      // Create transceiver
    +      transceiver = {
    +        if (enableCompression) {
    +          new NettyTransceiver(testAddress, new CompressionChannelFactory(6))
    +        } else {
    +          new NettyTransceiver(testAddress)
    +        }
    +      }
    +
    +      // Create Avro client with the transceiver
    +      val client = SpecificRequestor.getClient(classOf[AvroSourceProtocol], transceiver)
    +      client should not be null
    +
    +      // Send data
    +      val status = client.appendBatch(inputEvents.toList)
    +      status should be (avro.Status.OK)
         }
    -    Thread.sleep(1000)
    -    val timeTaken = System.currentTimeMillis() - startTime
    -    assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
    -    logInfo("Stopping context")
    -    ssc.stop()
    -
    -    val decoder = Charset.forName("UTF-8").newDecoder()
    -
    -    assert(outputBuffer.size === input.length)
    -    for (i <- 0 until outputBuffer.size) {
    -      assert(outputBuffer(i).size === 1)
    -      val str = decoder.decode(outputBuffer(i).head.event.getBody)
    -      assert(str.toString === input(i).toString)
    -      assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
    +    
    +    val decoder = Charset.forName("UTF-8").newDecoder()    
    +    eventually(timeout(10 seconds), interval(100 milliseconds)) {
    --- End diff --
    
    This is absolutely necessary! Since we have no control over how long the spark streaming will take to receive and run the jobs and return the data, this eventually tests repeatedly whether the current state of the output buffer has all the records that were sent or not. 
    
    This was handler earlier using manual clock increments and sleeps to allow the job to complete, which was (unnecessarily) increasing processing times.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3912][Streaming] Fixed flakyFlumeStream...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2773#discussion_r18792880
  
    --- Diff: external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala ---
    @@ -17,103 +17,141 @@
     
     package org.apache.spark.streaming.flume
     
    -import scala.collection.JavaConversions._
    -import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
    -
    -import java.net.InetSocketAddress
    +import java.net.{InetSocketAddress, ServerSocket}
     import java.nio.ByteBuffer
     import java.nio.charset.Charset
     
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
    +import scala.concurrent.duration._
    +import scala.language.postfixOps
    +
     import org.apache.avro.ipc.NettyTransceiver
     import org.apache.avro.ipc.specific.SpecificRequestor
    +import org.apache.flume.source.avro
     import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
    +import org.jboss.netty.channel.ChannelPipeline
    +import org.jboss.netty.channel.socket.SocketChannel
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    +import org.jboss.netty.handler.codec.compression._
    +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
    +import org.scalatest.concurrent.Eventually._
     
    +import org.apache.spark.{Logging, SparkConf}
     import org.apache.spark.storage.StorageLevel
    -import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
    -import org.apache.spark.streaming.util.ManualClock
    +import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
    +import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerReceiverStarted}
     import org.apache.spark.util.Utils
     
    -import org.jboss.netty.channel.ChannelPipeline
    -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    -import org.jboss.netty.channel.socket.SocketChannel
    -import org.jboss.netty.handler.codec.compression._
    +class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {
    +  val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite")
    +
    +  var ssc: StreamingContext = null
    +  var transceiver: NettyTransceiver = null
     
    -class FlumeStreamSuite extends TestSuiteBase {
    +  after {
    +    if (ssc != null) {
    +      ssc.stop()
    +    }
    +    if (transceiver != null) {
    +      transceiver.close()
    +    }
    +  }
     
       test("flume input stream") {
    -    runFlumeStreamTest(false)
    +    testFlumeStream(testCompression = false)
       }
     
       test("flume input compressed stream") {
    -    runFlumeStreamTest(true)
    +    testFlumeStream(testCompression = true)
    +  }
    +
    +  /** Run test on flume stream */
    +  private def testFlumeStream(testCompression: Boolean): Unit = {
    +    val input = (1 to 100).map { _.toString }
    +    val testPort = findFreePort()
    +    val outputBuffer = startContext(testPort, testCompression)
    +    writeAndVerify(input, testPort, outputBuffer, testCompression)
    +  }
    +
    +  /** Find a free port */
    +  private def findFreePort(): Int = {
    +    Utils.startServiceOnPort(23456, (trialPort: Int) => {
    +      val socket = new ServerSocket(trialPort)
    +      socket.close()
    +      (null, trialPort)
    +    })._2
       }
    -  
    -  def runFlumeStreamTest(enableDecompression: Boolean) {
    -    // Set up the streaming context and input streams
    -    val ssc = new StreamingContext(conf, batchDuration)
    -    val (flumeStream, testPort) =
    -      Utils.startServiceOnPort(9997, (trialPort: Int) => {
    -        val dstream = FlumeUtils.createStream(
    -          ssc, "localhost", trialPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
    -        (dstream, trialPort)
    -      })
     
    +  /** Setup and start the streaming context */
    +  private def startContext(
    +      testPort: Int, testCompression: Boolean): (ArrayBuffer[Seq[SparkFlumeEvent]]) = {
    +    ssc = new StreamingContext(conf, Milliseconds(200))
    +    val flumeStream = FlumeUtils.createStream(
    +      ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression)
         val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
           with SynchronizedBuffer[Seq[SparkFlumeEvent]]
         val outputStream = new TestOutputStream(flumeStream, outputBuffer)
         outputStream.register()
         ssc.start()
    +    outputBuffer
    +  }
     
    -    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    val input = Seq(1, 2, 3, 4, 5)
    -    Thread.sleep(1000)
    -    val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
    -    var client: AvroSourceProtocol = null
    -
    -    if (enableDecompression) {
    -      client = SpecificRequestor.getClient(
    -          classOf[AvroSourceProtocol], 
    -          new NettyTransceiver(new InetSocketAddress("localhost", testPort), 
    -          new CompressionChannelFactory(6)))
    -    } else {
    -      client = SpecificRequestor.getClient(
    -        classOf[AvroSourceProtocol], transceiver)
    -    }
    +  /** Send data to the flume receiver and verify whether the data was received */
    +  private def writeAndVerify(
    +      input: Seq[String],
    +      testPort: Int,
    +      outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]],
    +      enableCompression: Boolean
    +    ) {
    +    val testAddress = new InetSocketAddress("localhost", testPort)
     
    -    for (i <- 0 until input.size) {
    +    val inputEvents = input.map { item =>
           val event = new AvroFlumeEvent
    -      event.setBody(ByteBuffer.wrap(input(i).toString.getBytes("utf-8")))
    +      event.setBody(ByteBuffer.wrap(item.getBytes("UTF-8")))
           event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
    -      client.append(event)
    -      Thread.sleep(500)
    -      clock.addToTime(batchDuration.milliseconds)
    +      event
         }
     
    -    Thread.sleep(1000)
    -
    -    val startTime = System.currentTimeMillis()
    -    while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
    -      logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
    -      Thread.sleep(100)
    +    eventually(timeout(10 seconds), interval(100 milliseconds)) {
    --- End diff --
    
    Why do we need to retry this test multiple times? The usual case where the test fails is mainly because of bind issues, correct? Since findFreePort (sort of) takes care of that..this does not seem to help. 
    
    There is a small race condition that can be taken care of, using eventually though - where the free port is taken before the bind, in which case we can use a new free port, by calling findFreePort inside the eventually.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3912][Streaming] Fixed flakyFlumeStream...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2773#issuecomment-58769438
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21650/consoleFull) for   PR 2773 at commit [`93cd7f6`](https://github.com/apache/spark/commit/93cd7f62bf31c9015f30eb25439d368bac3c57c5).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3912][Streaming] Fixed flakyFlumeStream...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2773#issuecomment-58775578
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21659/consoleFull) for   PR 2773 at commit [`93cd7f6`](https://github.com/apache/spark/commit/93cd7f62bf31c9015f30eb25439d368bac3c57c5).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with Logging `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3912][Streaming] Fixed flakyFlumeStream...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2773#issuecomment-58774444
  
    Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3912][Streaming] Fixed flakyFlumeStream...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2773#issuecomment-58770568
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21650/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3912][Streaming] Fixed flakyFlumeStream...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2773#discussion_r18794109
  
    --- Diff: external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala ---
    @@ -17,103 +17,141 @@
     
     package org.apache.spark.streaming.flume
     
    -import scala.collection.JavaConversions._
    -import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
    -
    -import java.net.InetSocketAddress
    +import java.net.{InetSocketAddress, ServerSocket}
     import java.nio.ByteBuffer
     import java.nio.charset.Charset
     
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
    +import scala.concurrent.duration._
    +import scala.language.postfixOps
    +
     import org.apache.avro.ipc.NettyTransceiver
     import org.apache.avro.ipc.specific.SpecificRequestor
    +import org.apache.flume.source.avro
     import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
    +import org.jboss.netty.channel.ChannelPipeline
    +import org.jboss.netty.channel.socket.SocketChannel
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    +import org.jboss.netty.handler.codec.compression._
    +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
    +import org.scalatest.concurrent.Eventually._
     
    +import org.apache.spark.{Logging, SparkConf}
     import org.apache.spark.storage.StorageLevel
    -import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
    -import org.apache.spark.streaming.util.ManualClock
    +import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
    +import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerReceiverStarted}
     import org.apache.spark.util.Utils
     
    -import org.jboss.netty.channel.ChannelPipeline
    -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    -import org.jboss.netty.channel.socket.SocketChannel
    -import org.jboss.netty.handler.codec.compression._
    +class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {
    +  val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite")
    +
    +  var ssc: StreamingContext = null
    +  var transceiver: NettyTransceiver = null
     
    -class FlumeStreamSuite extends TestSuiteBase {
    +  after {
    +    if (ssc != null) {
    +      ssc.stop()
    +    }
    +    if (transceiver != null) {
    +      transceiver.close()
    +    }
    +  }
     
       test("flume input stream") {
    -    runFlumeStreamTest(false)
    +    testFlumeStream(testCompression = false)
       }
     
       test("flume input compressed stream") {
    -    runFlumeStreamTest(true)
    +    testFlumeStream(testCompression = true)
    +  }
    +
    +  /** Run test on flume stream */
    +  private def testFlumeStream(testCompression: Boolean): Unit = {
    +    val input = (1 to 100).map { _.toString }
    +    val testPort = findFreePort()
    +    val outputBuffer = startContext(testPort, testCompression)
    +    writeAndVerify(input, testPort, outputBuffer, testCompression)
    +  }
    +
    +  /** Find a free port */
    +  private def findFreePort(): Int = {
    +    Utils.startServiceOnPort(23456, (trialPort: Int) => {
    +      val socket = new ServerSocket(trialPort)
    +      socket.close()
    +      (null, trialPort)
    +    })._2
       }
    -  
    -  def runFlumeStreamTest(enableDecompression: Boolean) {
    -    // Set up the streaming context and input streams
    -    val ssc = new StreamingContext(conf, batchDuration)
    -    val (flumeStream, testPort) =
    -      Utils.startServiceOnPort(9997, (trialPort: Int) => {
    -        val dstream = FlumeUtils.createStream(
    -          ssc, "localhost", trialPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
    -        (dstream, trialPort)
    -      })
     
    +  /** Setup and start the streaming context */
    +  private def startContext(
    +      testPort: Int, testCompression: Boolean): (ArrayBuffer[Seq[SparkFlumeEvent]]) = {
    +    ssc = new StreamingContext(conf, Milliseconds(200))
    +    val flumeStream = FlumeUtils.createStream(
    +      ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression)
         val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
           with SynchronizedBuffer[Seq[SparkFlumeEvent]]
         val outputStream = new TestOutputStream(flumeStream, outputBuffer)
         outputStream.register()
         ssc.start()
    +    outputBuffer
    +  }
     
    -    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    val input = Seq(1, 2, 3, 4, 5)
    -    Thread.sleep(1000)
    -    val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
    -    var client: AvroSourceProtocol = null
    -
    -    if (enableDecompression) {
    -      client = SpecificRequestor.getClient(
    -          classOf[AvroSourceProtocol], 
    -          new NettyTransceiver(new InetSocketAddress("localhost", testPort), 
    -          new CompressionChannelFactory(6)))
    -    } else {
    -      client = SpecificRequestor.getClient(
    -        classOf[AvroSourceProtocol], transceiver)
    -    }
    +  /** Send data to the flume receiver and verify whether the data was received */
    +  private def writeAndVerify(
    +      input: Seq[String],
    +      testPort: Int,
    +      outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]],
    +      enableCompression: Boolean
    +    ) {
    +    val testAddress = new InetSocketAddress("localhost", testPort)
     
    -    for (i <- 0 until input.size) {
    +    val inputEvents = input.map { item =>
           val event = new AvroFlumeEvent
    -      event.setBody(ByteBuffer.wrap(input(i).toString.getBytes("utf-8")))
    +      event.setBody(ByteBuffer.wrap(item.getBytes("UTF-8")))
           event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
    -      client.append(event)
    -      Thread.sleep(500)
    -      clock.addToTime(batchDuration.milliseconds)
    +      event
         }
     
    -    Thread.sleep(1000)
    -
    -    val startTime = System.currentTimeMillis()
    -    while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
    -      logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
    -      Thread.sleep(100)
    +    eventually(timeout(10 seconds), interval(100 milliseconds)) {
    +      // if last attempted transceiver had succeeded, close it
    +      if (transceiver != null) {
    +        transceiver.close()
    +        transceiver = null
    +      }
    +
    +      // Create transceiver
    +      transceiver = {
    +        if (enableCompression) {
    +          new NettyTransceiver(testAddress, new CompressionChannelFactory(6))
    +        } else {
    +          new NettyTransceiver(testAddress)
    +        }
    +      }
    +
    +      // Create Avro client with the transceiver
    +      val client = SpecificRequestor.getClient(classOf[AvroSourceProtocol], transceiver)
    +      client should not be null
    +
    +      // Send data
    +      val status = client.appendBatch(inputEvents.toList)
    +      status should be (avro.Status.OK)
         }
    -    Thread.sleep(1000)
    -    val timeTaken = System.currentTimeMillis() - startTime
    -    assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
    -    logInfo("Stopping context")
    -    ssc.stop()
    -
    -    val decoder = Charset.forName("UTF-8").newDecoder()
    -
    -    assert(outputBuffer.size === input.length)
    -    for (i <- 0 until outputBuffer.size) {
    -      assert(outputBuffer(i).size === 1)
    -      val str = decoder.decode(outputBuffer(i).head.event.getBody)
    -      assert(str.toString === input(i).toString)
    -      assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
    +    
    +    val decoder = Charset.forName("UTF-8").newDecoder()    
    +    eventually(timeout(10 seconds), interval(100 milliseconds)) {
    --- End diff --
    
    Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3912][Streaming] Fixed flakyFlumeStream...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/2773


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3912][Streaming] Fixed flakyFlumeStream...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2773#discussion_r18795860
  
    --- Diff: external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala ---
    @@ -17,103 +17,141 @@
     
     package org.apache.spark.streaming.flume
     
    -import scala.collection.JavaConversions._
    -import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
    -
    -import java.net.InetSocketAddress
    +import java.net.{InetSocketAddress, ServerSocket}
     import java.nio.ByteBuffer
     import java.nio.charset.Charset
     
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
    +import scala.concurrent.duration._
    +import scala.language.postfixOps
    +
     import org.apache.avro.ipc.NettyTransceiver
     import org.apache.avro.ipc.specific.SpecificRequestor
    +import org.apache.flume.source.avro
     import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
    +import org.jboss.netty.channel.ChannelPipeline
    +import org.jboss.netty.channel.socket.SocketChannel
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    +import org.jboss.netty.handler.codec.compression._
    +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
    +import org.scalatest.concurrent.Eventually._
     
    +import org.apache.spark.{Logging, SparkConf}
     import org.apache.spark.storage.StorageLevel
    -import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
    -import org.apache.spark.streaming.util.ManualClock
    +import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
    +import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerReceiverStarted}
     import org.apache.spark.util.Utils
     
    -import org.jboss.netty.channel.ChannelPipeline
    -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    -import org.jboss.netty.channel.socket.SocketChannel
    -import org.jboss.netty.handler.codec.compression._
    +class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {
    +  val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite")
    +
    +  var ssc: StreamingContext = null
    +  var transceiver: NettyTransceiver = null
     
    -class FlumeStreamSuite extends TestSuiteBase {
    +  after {
    +    if (ssc != null) {
    +      ssc.stop()
    +    }
    +    if (transceiver != null) {
    +      transceiver.close()
    +    }
    +  }
     
       test("flume input stream") {
    -    runFlumeStreamTest(false)
    +    testFlumeStream(testCompression = false)
       }
     
       test("flume input compressed stream") {
    -    runFlumeStreamTest(true)
    +    testFlumeStream(testCompression = true)
    +  }
    +
    +  /** Run test on flume stream */
    +  private def testFlumeStream(testCompression: Boolean): Unit = {
    +    val input = (1 to 100).map { _.toString }
    +    val testPort = findFreePort()
    +    val outputBuffer = startContext(testPort, testCompression)
    +    writeAndVerify(input, testPort, outputBuffer, testCompression)
    +  }
    +
    +  /** Find a free port */
    +  private def findFreePort(): Int = {
    +    Utils.startServiceOnPort(23456, (trialPort: Int) => {
    +      val socket = new ServerSocket(trialPort)
    +      socket.close()
    +      (null, trialPort)
    +    })._2
       }
    -  
    -  def runFlumeStreamTest(enableDecompression: Boolean) {
    -    // Set up the streaming context and input streams
    -    val ssc = new StreamingContext(conf, batchDuration)
    -    val (flumeStream, testPort) =
    -      Utils.startServiceOnPort(9997, (trialPort: Int) => {
    -        val dstream = FlumeUtils.createStream(
    -          ssc, "localhost", trialPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
    -        (dstream, trialPort)
    -      })
     
    +  /** Setup and start the streaming context */
    +  private def startContext(
    +      testPort: Int, testCompression: Boolean): (ArrayBuffer[Seq[SparkFlumeEvent]]) = {
    +    ssc = new StreamingContext(conf, Milliseconds(200))
    +    val flumeStream = FlumeUtils.createStream(
    +      ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression)
         val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
           with SynchronizedBuffer[Seq[SparkFlumeEvent]]
         val outputStream = new TestOutputStream(flumeStream, outputBuffer)
         outputStream.register()
         ssc.start()
    +    outputBuffer
    +  }
     
    -    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    val input = Seq(1, 2, 3, 4, 5)
    -    Thread.sleep(1000)
    -    val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
    -    var client: AvroSourceProtocol = null
    -
    -    if (enableDecompression) {
    -      client = SpecificRequestor.getClient(
    -          classOf[AvroSourceProtocol], 
    -          new NettyTransceiver(new InetSocketAddress("localhost", testPort), 
    -          new CompressionChannelFactory(6)))
    -    } else {
    -      client = SpecificRequestor.getClient(
    -        classOf[AvroSourceProtocol], transceiver)
    -    }
    +  /** Send data to the flume receiver and verify whether the data was received */
    +  private def writeAndVerify(
    +      input: Seq[String],
    +      testPort: Int,
    +      outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]],
    +      enableCompression: Boolean
    +    ) {
    +    val testAddress = new InetSocketAddress("localhost", testPort)
     
    -    for (i <- 0 until input.size) {
    +    val inputEvents = input.map { item =>
           val event = new AvroFlumeEvent
    -      event.setBody(ByteBuffer.wrap(input(i).toString.getBytes("utf-8")))
    +      event.setBody(ByteBuffer.wrap(item.getBytes("UTF-8")))
           event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
    -      client.append(event)
    -      Thread.sleep(500)
    -      clock.addToTime(batchDuration.milliseconds)
    +      event
         }
     
    -    Thread.sleep(1000)
    -
    -    val startTime = System.currentTimeMillis()
    -    while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
    -      logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
    -      Thread.sleep(100)
    +    eventually(timeout(10 seconds), interval(100 milliseconds)) {
    --- End diff --
    
    OK, sounds good!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3912][Streaming] Fixed flakyFlumeStream...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2773#discussion_r18795594
  
    --- Diff: external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala ---
    @@ -17,103 +17,141 @@
     
     package org.apache.spark.streaming.flume
     
    -import scala.collection.JavaConversions._
    -import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
    -
    -import java.net.InetSocketAddress
    +import java.net.{InetSocketAddress, ServerSocket}
     import java.nio.ByteBuffer
     import java.nio.charset.Charset
     
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
    +import scala.concurrent.duration._
    +import scala.language.postfixOps
    +
     import org.apache.avro.ipc.NettyTransceiver
     import org.apache.avro.ipc.specific.SpecificRequestor
    +import org.apache.flume.source.avro
     import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
    +import org.jboss.netty.channel.ChannelPipeline
    +import org.jboss.netty.channel.socket.SocketChannel
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    +import org.jboss.netty.handler.codec.compression._
    +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
    +import org.scalatest.concurrent.Eventually._
     
    +import org.apache.spark.{Logging, SparkConf}
     import org.apache.spark.storage.StorageLevel
    -import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
    -import org.apache.spark.streaming.util.ManualClock
    +import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
    +import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerReceiverStarted}
     import org.apache.spark.util.Utils
     
    -import org.jboss.netty.channel.ChannelPipeline
    -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    -import org.jboss.netty.channel.socket.SocketChannel
    -import org.jboss.netty.handler.codec.compression._
    +class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {
    +  val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite")
    +
    +  var ssc: StreamingContext = null
    +  var transceiver: NettyTransceiver = null
     
    -class FlumeStreamSuite extends TestSuiteBase {
    +  after {
    +    if (ssc != null) {
    +      ssc.stop()
    +    }
    +    if (transceiver != null) {
    +      transceiver.close()
    +    }
    +  }
     
       test("flume input stream") {
    -    runFlumeStreamTest(false)
    +    testFlumeStream(testCompression = false)
       }
     
       test("flume input compressed stream") {
    -    runFlumeStreamTest(true)
    +    testFlumeStream(testCompression = true)
    +  }
    +
    +  /** Run test on flume stream */
    +  private def testFlumeStream(testCompression: Boolean): Unit = {
    +    val input = (1 to 100).map { _.toString }
    +    val testPort = findFreePort()
    +    val outputBuffer = startContext(testPort, testCompression)
    +    writeAndVerify(input, testPort, outputBuffer, testCompression)
    +  }
    +
    +  /** Find a free port */
    +  private def findFreePort(): Int = {
    +    Utils.startServiceOnPort(23456, (trialPort: Int) => {
    +      val socket = new ServerSocket(trialPort)
    +      socket.close()
    +      (null, trialPort)
    +    })._2
       }
    -  
    -  def runFlumeStreamTest(enableDecompression: Boolean) {
    -    // Set up the streaming context and input streams
    -    val ssc = new StreamingContext(conf, batchDuration)
    -    val (flumeStream, testPort) =
    -      Utils.startServiceOnPort(9997, (trialPort: Int) => {
    -        val dstream = FlumeUtils.createStream(
    -          ssc, "localhost", trialPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
    -        (dstream, trialPort)
    -      })
     
    +  /** Setup and start the streaming context */
    +  private def startContext(
    +      testPort: Int, testCompression: Boolean): (ArrayBuffer[Seq[SparkFlumeEvent]]) = {
    +    ssc = new StreamingContext(conf, Milliseconds(200))
    +    val flumeStream = FlumeUtils.createStream(
    +      ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression)
         val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
           with SynchronizedBuffer[Seq[SparkFlumeEvent]]
         val outputStream = new TestOutputStream(flumeStream, outputBuffer)
         outputStream.register()
         ssc.start()
    +    outputBuffer
    +  }
     
    -    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    val input = Seq(1, 2, 3, 4, 5)
    -    Thread.sleep(1000)
    -    val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
    -    var client: AvroSourceProtocol = null
    -
    -    if (enableDecompression) {
    -      client = SpecificRequestor.getClient(
    -          classOf[AvroSourceProtocol], 
    -          new NettyTransceiver(new InetSocketAddress("localhost", testPort), 
    -          new CompressionChannelFactory(6)))
    -    } else {
    -      client = SpecificRequestor.getClient(
    -        classOf[AvroSourceProtocol], transceiver)
    -    }
    +  /** Send data to the flume receiver and verify whether the data was received */
    +  private def writeAndVerify(
    +      input: Seq[String],
    +      testPort: Int,
    +      outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]],
    +      enableCompression: Boolean
    +    ) {
    +    val testAddress = new InetSocketAddress("localhost", testPort)
     
    -    for (i <- 0 until input.size) {
    +    val inputEvents = input.map { item =>
           val event = new AvroFlumeEvent
    -      event.setBody(ByteBuffer.wrap(input(i).toString.getBytes("utf-8")))
    +      event.setBody(ByteBuffer.wrap(item.getBytes("UTF-8")))
           event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
    -      client.append(event)
    -      Thread.sleep(500)
    -      clock.addToTime(batchDuration.milliseconds)
    +      event
         }
     
    -    Thread.sleep(1000)
    -
    -    val startTime = System.currentTimeMillis()
    -    while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
    -      logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
    -      Thread.sleep(100)
    +    eventually(timeout(10 seconds), interval(100 milliseconds)) {
    --- End diff --
    
    Because I found that a lot of time the test was failing because of the uncertainty on when the Flume receiver is ready to receive the new connection. Even after the connection gets accepted, sending data does not return Status.OK. I am not sure what is the reason behind this but this seems like a fairly robust way to send all the data once in a single shot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3912][Streaming] Fixed flakyFlumeStream...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2773#discussion_r18795869
  
    --- Diff: external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala ---
    @@ -17,103 +17,141 @@
     
     package org.apache.spark.streaming.flume
     
    -import scala.collection.JavaConversions._
    -import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
    -
    -import java.net.InetSocketAddress
    +import java.net.{InetSocketAddress, ServerSocket}
     import java.nio.ByteBuffer
     import java.nio.charset.Charset
     
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
    +import scala.concurrent.duration._
    +import scala.language.postfixOps
    +
     import org.apache.avro.ipc.NettyTransceiver
     import org.apache.avro.ipc.specific.SpecificRequestor
    +import org.apache.flume.source.avro
     import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
    +import org.jboss.netty.channel.ChannelPipeline
    +import org.jboss.netty.channel.socket.SocketChannel
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    +import org.jboss.netty.handler.codec.compression._
    +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
    +import org.scalatest.concurrent.Eventually._
     
    +import org.apache.spark.{Logging, SparkConf}
     import org.apache.spark.storage.StorageLevel
    -import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
    -import org.apache.spark.streaming.util.ManualClock
    +import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
    +import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerReceiverStarted}
     import org.apache.spark.util.Utils
     
    -import org.jboss.netty.channel.ChannelPipeline
    -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    -import org.jboss.netty.channel.socket.SocketChannel
    -import org.jboss.netty.handler.codec.compression._
    +class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {
    +  val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite")
    +
    +  var ssc: StreamingContext = null
    +  var transceiver: NettyTransceiver = null
     
    -class FlumeStreamSuite extends TestSuiteBase {
    +  after {
    +    if (ssc != null) {
    +      ssc.stop()
    +    }
    +    if (transceiver != null) {
    +      transceiver.close()
    +    }
    +  }
     
       test("flume input stream") {
    -    runFlumeStreamTest(false)
    +    testFlumeStream(testCompression = false)
       }
     
       test("flume input compressed stream") {
    -    runFlumeStreamTest(true)
    +    testFlumeStream(testCompression = true)
    +  }
    +
    +  /** Run test on flume stream */
    +  private def testFlumeStream(testCompression: Boolean): Unit = {
    +    val input = (1 to 100).map { _.toString }
    +    val testPort = findFreePort()
    +    val outputBuffer = startContext(testPort, testCompression)
    +    writeAndVerify(input, testPort, outputBuffer, testCompression)
    +  }
    +
    +  /** Find a free port */
    +  private def findFreePort(): Int = {
    +    Utils.startServiceOnPort(23456, (trialPort: Int) => {
    +      val socket = new ServerSocket(trialPort)
    +      socket.close()
    +      (null, trialPort)
    +    })._2
       }
    -  
    -  def runFlumeStreamTest(enableDecompression: Boolean) {
    -    // Set up the streaming context and input streams
    -    val ssc = new StreamingContext(conf, batchDuration)
    -    val (flumeStream, testPort) =
    -      Utils.startServiceOnPort(9997, (trialPort: Int) => {
    -        val dstream = FlumeUtils.createStream(
    -          ssc, "localhost", trialPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
    -        (dstream, trialPort)
    -      })
     
    +  /** Setup and start the streaming context */
    +  private def startContext(
    +      testPort: Int, testCompression: Boolean): (ArrayBuffer[Seq[SparkFlumeEvent]]) = {
    +    ssc = new StreamingContext(conf, Milliseconds(200))
    +    val flumeStream = FlumeUtils.createStream(
    +      ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression)
         val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
           with SynchronizedBuffer[Seq[SparkFlumeEvent]]
         val outputStream = new TestOutputStream(flumeStream, outputBuffer)
         outputStream.register()
         ssc.start()
    +    outputBuffer
    +  }
     
    -    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    val input = Seq(1, 2, 3, 4, 5)
    -    Thread.sleep(1000)
    -    val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
    -    var client: AvroSourceProtocol = null
    -
    -    if (enableDecompression) {
    -      client = SpecificRequestor.getClient(
    -          classOf[AvroSourceProtocol], 
    -          new NettyTransceiver(new InetSocketAddress("localhost", testPort), 
    -          new CompressionChannelFactory(6)))
    -    } else {
    -      client = SpecificRequestor.getClient(
    -        classOf[AvroSourceProtocol], transceiver)
    -    }
    +  /** Send data to the flume receiver and verify whether the data was received */
    +  private def writeAndVerify(
    +      input: Seq[String],
    +      testPort: Int,
    +      outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]],
    +      enableCompression: Boolean
    +    ) {
    +    val testAddress = new InetSocketAddress("localhost", testPort)
     
    -    for (i <- 0 until input.size) {
    +    val inputEvents = input.map { item =>
           val event = new AvroFlumeEvent
    -      event.setBody(ByteBuffer.wrap(input(i).toString.getBytes("utf-8")))
    +      event.setBody(ByteBuffer.wrap(item.getBytes("UTF-8")))
           event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
    -      client.append(event)
    -      Thread.sleep(500)
    -      clock.addToTime(batchDuration.milliseconds)
    +      event
         }
     
    -    Thread.sleep(1000)
    -
    -    val startTime = System.currentTimeMillis()
    -    while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
    -      logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
    -      Thread.sleep(100)
    +    eventually(timeout(10 seconds), interval(100 milliseconds)) {
    +      // if last attempted transceiver had succeeded, close it
    +      if (transceiver != null) {
    +        transceiver.close()
    +        transceiver = null
    +      }
    +
    +      // Create transceiver
    +      transceiver = {
    +        if (enableCompression) {
    +          new NettyTransceiver(testAddress, new CompressionChannelFactory(6))
    +        } else {
    +          new NettyTransceiver(testAddress)
    +        }
    +      }
    +
    +      // Create Avro client with the transceiver
    +      val client = SpecificRequestor.getClient(classOf[AvroSourceProtocol], transceiver)
    +      client should not be null
    +
    +      // Send data
    +      val status = client.appendBatch(inputEvents.toList)
    +      status should be (avro.Status.OK)
         }
    -    Thread.sleep(1000)
    -    val timeTaken = System.currentTimeMillis() - startTime
    -    assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
    -    logInfo("Stopping context")
    -    ssc.stop()
    -
    -    val decoder = Charset.forName("UTF-8").newDecoder()
    -
    -    assert(outputBuffer.size === input.length)
    -    for (i <- 0 until outputBuffer.size) {
    -      assert(outputBuffer(i).size === 1)
    -      val str = decoder.decode(outputBuffer(i).head.event.getBody)
    -      assert(str.toString === input(i).toString)
    -      assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
    +    
    +    val decoder = Charset.forName("UTF-8").newDecoder()    
    +    eventually(timeout(10 seconds), interval(100 milliseconds)) {
    --- End diff --
    
    Makes sense. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3912][Streaming] Fixed flakyFlumeStream...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2773#issuecomment-58774530
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21659/consoleFull) for   PR 2773 at commit [`93cd7f6`](https://github.com/apache/spark/commit/93cd7f62bf31c9015f30eb25439d368bac3c57c5).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org