You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by harishreedharan <gi...@git.apache.org> on 2014/08/15 02:27:46 UTC

[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

GitHub user harishreedharan opened a pull request:

    https://github.com/apache/spark/pull/1958

    [SPARK-3054][STREAMING] Add unit tests for Spark Sink.

    This patch adds unit tests for Spark Sink.
    
    It also removes the private[flume] for Spark Sink,
    since the sink is instantiated from Flume configuration (looks like this is ignored by reflection which is used by
    Flume, but we should still remove it anyway).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/harishreedharan/spark spark-sink-test

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1958.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1958
    
----
commit c86d615fbc90f46aea6359a66d10195be69de62a
Author: Hari Shreedharan <hs...@apache.org>
Date:   2014-08-15T00:19:51Z

    [SPARK-3054][STREAMING] Add unit tests for Spark Sink.
    
    This patch adds unit tests for Spark Sink.
    
    It also removes the private[flume] for Spark Sink,
    since the sink is instantiated from Flume configuration (looks like this is ignored by reflection which is used by
    Flume, but we should still remove it anyway).

commit a24aac80236e729b2117b2c11eff2d1f456eae4d
Author: Hari Shreedharan <hs...@apache.org>
Date:   2014-08-15T00:26:44Z

    Remove unused var

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52698714
  
    still failing unit test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1958#discussion_r16384773
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala ---
    @@ -53,7 +53,6 @@ import org.apache.flume.sink.AbstractSink
      *
      */
     
    -private[flume]
    --- End diff --
    
    Since this class would be called from Flume. Flume will create an instance of this class to run the sink - so theoretically it should not be private to this package.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52730742
  
    Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52699042
  
    Yeah, it passes locally. Seems like some flakiness. Let me debug.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1958#discussion_r16448274
  
    --- Diff: external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala ---
    @@ -0,0 +1,204 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume.sink
    +
    +import java.net.InetSocketAddress
    +import java.util.concurrent.atomic.AtomicInteger
    +import java.util.concurrent.{TimeUnit, CountDownLatch, Executors}
    +
    +import scala.collection.JavaConversions._
    +import scala.concurrent.{ExecutionContext, Future}
    +import scala.util.{Failure, Success}
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder
    +import org.apache.avro.ipc.NettyTransceiver
    +import org.apache.avro.ipc.specific.SpecificRequestor
    +import org.apache.flume.Context
    +import org.apache.flume.channel.MemoryChannel
    +import org.apache.flume.event.EventBuilder
    +import org.apache.spark.streaming.TestSuiteBase
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    +
    +class SparkSinkSuite extends TestSuiteBase {
    +  val eventsPerBatch = 1000
    +  val channelCapacity = 5000
    +
    +  test("Success") {
    +    val (channel, sink) = initializeChannelAndSink()
    +    channel.start()
    +    sink.start()
    +
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    client.ack(events.getSequenceNumber)
    +    assert(events.getEvents.size() === 1000)
    +    assertChannelIsEmpty(channel)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Nack") {
    +    val (channel, sink) = initializeChannelAndSink()
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    client.nack(events.getSequenceNumber)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Timeout") {
    +    val (channel, sink) = initializeChannelAndSink(Map(SparkSinkConfig
    +      .CONF_TRANSACTION_TIMEOUT -> 1.toString))
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    Thread.sleep(1000)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Multiple consumers") {
    +    testMultipleConsumers(failSome = false)
    +  }
    +
    +  test("Multiple consumers with some failures") {
    +    testMultipleConsumers(failSome = true)
    +  }
    +
    +  def testMultipleConsumers(failSome: Boolean): Unit = {
    +    implicit val executorContext = ExecutionContext
    +      .fromExecutorService(Executors.newFixedThreadPool(5))
    +    val (channel, sink) = initializeChannelAndSink()
    +    channel.start()
    +    sink.start()
    +    (1 to 5).foreach(_ => putEvents(channel, eventsPerBatch))
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +    val transceiversAndClients = getTransceiverAndClient(address, 5)
    +    val batchCounter = new CountDownLatch(5)
    +    val counter = new AtomicInteger(0)
    +    transceiversAndClients.foreach(x => {
    +      Future {
    --- End diff --
    
    super nit, no change necessary. A simpler implementation is possible. No latch necessary.
    ```
    val futures = transceiversAndClients.map( x => {
      Future {
          ....
      }
    )
    Await.result(Future.sequence(futures), timeout)    // Future.sequence converst seq of Future to a single Future of seq.
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52688804
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18865/consoleFull) for   PR 1958 at commit [`120b81e`](https://github.com/apache/spark/commit/120b81eafc5ccd90196211b5926b1257bac917d3).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52262303
  
    QA tests have started for PR 1958. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18583/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1958#discussion_r16401354
  
    --- Diff: external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume.sink
    +
    +import java.net.InetSocketAddress
    +import java.util.concurrent.atomic.AtomicInteger
    +import java.util.concurrent.{CountDownLatch, Executors}
    +
    +import scala.collection.JavaConversions._
    +import scala.concurrent.{Promise, Future}
    +import scala.util.{Failure, Success, Try}
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder
    +import org.apache.avro.ipc.NettyTransceiver
    +import org.apache.avro.ipc.specific.SpecificRequestor
    +import org.apache.flume.Context
    +import org.apache.flume.channel.MemoryChannel
    +import org.apache.flume.event.EventBuilder
    +import org.apache.spark.streaming.TestSuiteBase
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    +
    +class SparkSinkSuite extends TestSuiteBase {
    +  val eventsPerBatch = 1000
    +  val channelCapacity = 5000
    +
    +  test("Success") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    client.ack(events.getSequenceNumber)
    +    assert(events.getEvents.size() === 1000)
    +    assertChannelIsEmpty(channel)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Nack") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    client.nack(events.getSequenceNumber)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Timeout") {
    +    val (channel, sink) = initializeChannelAndSink(Option(Map(SparkSinkConfig
    +      .CONF_TRANSACTION_TIMEOUT -> 1.toString)))
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    Thread.sleep(1000)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Multiple consumers") {
    +    multipleClients(failSome = false)
    +  }
    +
    +  test("Multiple consumers With Some Failures") {
    +    multipleClients(failSome = true)
    +  }
    +
    +  def multipleClients(failSome: Boolean): Unit = {
    +    import scala.concurrent.ExecutionContext.Implicits.global
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    (1 to 5).map(_ => putEvents(channel, eventsPerBatch))
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val transAndClient = getTransceiverAndClient(address, 5)
    +    val batchCounter = new CountDownLatch(5)
    +    val counter = new AtomicInteger(0)
    +    transAndClient.foreach(x => {
    +      val promise = Promise[EventBatch]()
    +      val future = promise.future
    +      Future {
    +        val client = x._2
    +        var events: EventBatch = null
    +        Try {
    +          events = client.getEventBatch(1000)
    +          if(!failSome || counter.getAndIncrement() % 2 == 0) {
    +            client.ack(events.getSequenceNumber)
    +          } else {
    +            client.nack(events.getSequenceNumber)
    +          }
    +        }.map(_ => promise.success(events)).recover({
    +          case e => promise.failure(e)
    +        })
    +      }
    +      future.onComplete {
    +        case Success(events) => assert(events.getEvents.size() === 1000)
    +          batchCounter.countDown()
    +        case Failure(t) => batchCounter.countDown()
    +          throw t
    +      }
    +    })
    +    batchCounter.await()
    +    if(failSome) {
    +      assert(availableChannelSlots(channel) === 3000)
    +    } else {
    +      assertChannelIsEmpty(channel)
    +    }
    +    sink.stop()
    +    channel.stop()
    +    transAndClient.foreach(x => x._1.close())
    +  }
    +
    +  def initializeChannelAndSink(overrides: Option[Map[String, String]]): (MemoryChannel,
    +    SparkSink) = {
    +    val channel = new MemoryChannel()
    +    val channelContext = new Context()
    +
    +    channelContext.put("capacity", channelCapacity.toString)
    +    channelContext.put("transactionCapacity", 1000.toString)
    +    channelContext.put("keep-alive", 0.toString)
    +    overrides.foreach(channelContext.putAll(_))
    +    channel.configure(channelContext)
    +
    +    val sink = new SparkSink()
    +    val sinkContext = new Context()
    +    sinkContext.put(SparkSinkConfig.CONF_HOSTNAME, "0.0.0.0")
    +    sinkContext.put(SparkSinkConfig.CONF_PORT, 0.toString)
    +    sink.configure(sinkContext)
    +    sink.setChannel(channel)
    +    (channel, sink)
    +  }
    +
    +  private def putEvents(ch: MemoryChannel, count: Int): Unit = {
    +    val tx = ch.getTransaction
    +    tx.begin()
    +    (1 to count).map(x => ch.put(EventBuilder.withBody(x.toString.getBytes)))
    +    tx.commit()
    +    tx.close()
    +  }
    +
    +  private def getTransceiverAndClient(address: InetSocketAddress,
    +    count: Int): Seq[(NettyTransceiver, SparkFlumeProtocol.Callback)] = {
    --- End diff --
    
    Can you make this as "Unit = {"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1958#discussion_r16438954
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala ---
    @@ -53,7 +53,6 @@ import org.apache.flume.sink.AbstractSink
      *
      */
     
    -private[flume]
    --- End diff --
    
    In that case, can you add a line right at the top that this class is not intended to be used inside Spark application. Just in case it appears in the scala docs / java docs. I will try to see how to eliminate this module from appearing in the docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52277772
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18599/consoleFull) for   PR 1958 at commit [`f2c56c9`](https://github.com/apache/spark/commit/f2c56c976bc6faa83b8357c80caad1f4839eb06d).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1958#discussion_r16401133
  
    --- Diff: external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume.sink
    +
    +import java.net.InetSocketAddress
    +import java.util.concurrent.atomic.AtomicInteger
    +import java.util.concurrent.{CountDownLatch, Executors}
    +
    +import scala.collection.JavaConversions._
    +import scala.concurrent.{Promise, Future}
    +import scala.util.{Failure, Success, Try}
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder
    +import org.apache.avro.ipc.NettyTransceiver
    +import org.apache.avro.ipc.specific.SpecificRequestor
    +import org.apache.flume.Context
    +import org.apache.flume.channel.MemoryChannel
    +import org.apache.flume.event.EventBuilder
    +import org.apache.spark.streaming.TestSuiteBase
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    +
    +class SparkSinkSuite extends TestSuiteBase {
    +  val eventsPerBatch = 1000
    +  val channelCapacity = 5000
    +
    +  test("Success") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    client.ack(events.getSequenceNumber)
    +    assert(events.getEvents.size() === 1000)
    +    assertChannelIsEmpty(channel)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Nack") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    client.nack(events.getSequenceNumber)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Timeout") {
    +    val (channel, sink) = initializeChannelAndSink(Option(Map(SparkSinkConfig
    +      .CONF_TRANSACTION_TIMEOUT -> 1.toString)))
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    Thread.sleep(1000)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Multiple consumers") {
    +    multipleClients(failSome = false)
    +  }
    +
    +  test("Multiple consumers With Some Failures") {
    +    multipleClients(failSome = true)
    +  }
    +
    +  def multipleClients(failSome: Boolean): Unit = {
    +    import scala.concurrent.ExecutionContext.Implicits.global
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    (1 to 5).map(_ => putEvents(channel, eventsPerBatch))
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val transAndClient = getTransceiverAndClient(address, 5)
    +    val batchCounter = new CountDownLatch(5)
    +    val counter = new AtomicInteger(0)
    +    transAndClient.foreach(x => {
    +      val promise = Promise[EventBatch]()
    +      val future = promise.future
    +      Future {
    +        val client = x._2
    +        var events: EventBatch = null
    +        Try {
    +          events = client.getEventBatch(1000)
    +          if(!failSome || counter.getAndIncrement() % 2 == 0) {
    +            client.ack(events.getSequenceNumber)
    +          } else {
    +            client.nack(events.getSequenceNumber)
    +          }
    +        }.map(_ => promise.success(events)).recover({
    --- End diff --
    
    I think it should be `recover { case e => promise.failure(e) }` 
    Isnt it easier to have the `promise.success(events)` within the Try, and just do `recover { case e => promise.failure (e) }`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52562903
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18788/consoleFull) for   PR 1958 at commit [`c9190d1`](https://github.com/apache/spark/commit/c9190d11b4644c1588ff63264bf588f7a243147f).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1958#discussion_r16384357
  
    --- Diff: external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume.sink
    +
    +import java.net.InetSocketAddress
    +import java.util.concurrent.atomic.AtomicInteger
    +import java.util.concurrent.{CountDownLatch, Executors}
    +
    +import scala.collection.JavaConversions._
    +import scala.concurrent.{Promise, Future}
    +import scala.util.{Failure, Success, Try}
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder
    +import org.apache.avro.ipc.NettyTransceiver
    +import org.apache.avro.ipc.specific.SpecificRequestor
    +import org.apache.flume.Context
    +import org.apache.flume.channel.MemoryChannel
    +import org.apache.flume.event.EventBuilder
    +import org.apache.spark.streaming.TestSuiteBase
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    +
    +class SparkSinkSuite extends TestSuiteBase {
    +  val eventsPerBatch = 1000
    +  val channelCapacity = 5000
    +
    +  test("Success test") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    client.ack(events.getSequenceNumber)
    +    assert(events.getEvents.size() === 1000)
    +    assertChannelIsEmpty(channel)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Nack") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    client.nack(events.getSequenceNumber)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Timeout") {
    +    val (channel, sink) = initializeChannelAndSink(Option(Map(SparkSinkConfig
    +      .CONF_TRANSACTION_TIMEOUT -> 1.toString)))
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    Thread.sleep(1000)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Multiple consumers") {
    +    multipleClients(failSome = false)
    +  }
    +
    +  test("Multiple consumers With Some Failures") {
    +    multipleClients(failSome = true)
    +  }
    +
    +  def multipleClients(failSome: Boolean): Unit = {
    +    import scala.concurrent.ExecutionContext.Implicits.global
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    (1 to 5).map(_ => putEvents(channel, eventsPerBatch))
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val transAndClient = getTransceiverAndClient(address, 5)
    +    val batchCounter = new CountDownLatch(5)
    +    val counter = new AtomicInteger(0)
    +    transAndClient.foreach(x => {
    +      val promise = Promise[EventBatch]()
    +      val future = promise.future
    +      Future {
    +        val client = x._2
    +        var events: EventBatch = null
    +        Try {
    +          events = client.getEventBatch(1000)
    +          if(!failSome || counter.getAndIncrement() % 2 == 0) {
    +            client.ack(events.getSequenceNumber)
    +          } else {
    +            client.nack(events.getSequenceNumber)
    +          }
    +        }.map(_ => promise.success(events)).recover({
    +          case e => promise.failure(e)
    +        })
    +      }
    +      future.onComplete {
    +        case Success(events) => assert(events.getEvents.size() === 1000)
    +          batchCounter.countDown()
    +        case Failure(t) => batchCounter.countDown()
    +          throw t
    +      }
    +    })
    +    batchCounter.await()
    +    if(failSome) {
    +      assert(availableChannelSlots(channel) === 3000)
    +    } else {
    +      assertChannelIsEmpty(channel)
    +    }
    +    sink.stop()
    +    channel.stop()
    +    transAndClient.foreach(x => x._1.close())
    +  }
    +
    +  def initializeChannelAndSink(overrides: Option[Map[String, String]]):
    +  (MemoryChannel, SparkSink) = {
    +    val channel = new MemoryChannel()
    +    val channelContext = new Context()
    +
    +    channelContext.put("capacity", channelCapacity.toString)
    +    channelContext.put("transactionCapacity", 1000.toString)
    +    channelContext.put("keep-alive", 0.toString)
    +    overrides.foreach(channelContext.putAll(_))
    +    channel.configure(channelContext)
    +
    +    val sink = new SparkSink()
    +    val sinkContext = new Context()
    +    sinkContext.put(SparkSinkConfig.CONF_HOSTNAME, "0.0.0.0")
    +    sinkContext.put(SparkSinkConfig.CONF_PORT, 0.toString)
    +    sink.configure(sinkContext)
    +    sink.setChannel(channel)
    +    (channel, sink)
    +  }
    +
    +  private def putEvents(ch: MemoryChannel, count: Int): Unit = {
    +    val tx = ch.getTransaction
    +    tx.begin()
    +    (1 to count).map(x => ch.put(EventBuilder.withBody(x.toString.getBytes)))
    +    tx.commit()
    +    tx.close()
    +  }
    +
    +  private def getTransceiverAndClient(address: InetSocketAddress, count: Int):
    +  Seq[(NettyTransceiver, SparkFlumeProtocol.Callback)] = {
    --- End diff --
    
    Incorrect indentation. This line should have 2 space indent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52917580
  
    I see @pwendell  temporarily removed the test: https://github.com/apache/spark/commit/1d5e84a99076d3e0168dd2f4626c7911e7ba49e7#diff-d41d8cd98f00b204e9800998ecf8427e
    
    @tdas @harishreedharan  can the other commit go in or can you please file jira to followup.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52339243
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18622/consoleFull) for   PR 1958 at commit [`7b9b649`](https://github.com/apache/spark/commit/7b9b649612bd61dae44f4b0212160b59fca86b73).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52332252
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18611/consoleFull) for   PR 1958 at commit [`f2c56c9`](https://github.com/apache/spark/commit/f2c56c976bc6faa83b8357c80caad1f4839eb06d).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52265221
  
    QA results for PR 1958:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18583/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1958#discussion_r16401356
  
    --- Diff: external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume.sink
    +
    +import java.net.InetSocketAddress
    +import java.util.concurrent.atomic.AtomicInteger
    +import java.util.concurrent.{CountDownLatch, Executors}
    +
    +import scala.collection.JavaConversions._
    +import scala.concurrent.{Promise, Future}
    +import scala.util.{Failure, Success, Try}
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder
    +import org.apache.avro.ipc.NettyTransceiver
    +import org.apache.avro.ipc.specific.SpecificRequestor
    +import org.apache.flume.Context
    +import org.apache.flume.channel.MemoryChannel
    +import org.apache.flume.event.EventBuilder
    +import org.apache.spark.streaming.TestSuiteBase
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    +
    +class SparkSinkSuite extends TestSuiteBase {
    +  val eventsPerBatch = 1000
    +  val channelCapacity = 5000
    +
    +  test("Success") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    client.ack(events.getSequenceNumber)
    +    assert(events.getEvents.size() === 1000)
    +    assertChannelIsEmpty(channel)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Nack") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    client.nack(events.getSequenceNumber)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Timeout") {
    +    val (channel, sink) = initializeChannelAndSink(Option(Map(SparkSinkConfig
    +      .CONF_TRANSACTION_TIMEOUT -> 1.toString)))
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    Thread.sleep(1000)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Multiple consumers") {
    +    multipleClients(failSome = false)
    +  }
    +
    +  test("Multiple consumers With Some Failures") {
    +    multipleClients(failSome = true)
    +  }
    +
    +  def multipleClients(failSome: Boolean): Unit = {
    +    import scala.concurrent.ExecutionContext.Implicits.global
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    (1 to 5).map(_ => putEvents(channel, eventsPerBatch))
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val transAndClient = getTransceiverAndClient(address, 5)
    +    val batchCounter = new CountDownLatch(5)
    +    val counter = new AtomicInteger(0)
    +    transAndClient.foreach(x => {
    +      val promise = Promise[EventBatch]()
    +      val future = promise.future
    +      Future {
    +        val client = x._2
    +        var events: EventBatch = null
    +        Try {
    +          events = client.getEventBatch(1000)
    +          if(!failSome || counter.getAndIncrement() % 2 == 0) {
    +            client.ack(events.getSequenceNumber)
    +          } else {
    +            client.nack(events.getSequenceNumber)
    +          }
    +        }.map(_ => promise.success(events)).recover({
    +          case e => promise.failure(e)
    +        })
    +      }
    +      future.onComplete {
    +        case Success(events) => assert(events.getEvents.size() === 1000)
    +          batchCounter.countDown()
    +        case Failure(t) => batchCounter.countDown()
    +          throw t
    +      }
    +    })
    +    batchCounter.await()
    +    if(failSome) {
    +      assert(availableChannelSlots(channel) === 3000)
    +    } else {
    +      assertChannelIsEmpty(channel)
    +    }
    +    sink.stop()
    +    channel.stop()
    +    transAndClient.foreach(x => x._1.close())
    +  }
    +
    +  def initializeChannelAndSink(overrides: Option[Map[String, String]]): (MemoryChannel,
    +    SparkSink) = {
    +    val channel = new MemoryChannel()
    +    val channelContext = new Context()
    +
    +    channelContext.put("capacity", channelCapacity.toString)
    +    channelContext.put("transactionCapacity", 1000.toString)
    +    channelContext.put("keep-alive", 0.toString)
    +    overrides.foreach(channelContext.putAll(_))
    +    channel.configure(channelContext)
    +
    +    val sink = new SparkSink()
    +    val sinkContext = new Context()
    +    sinkContext.put(SparkSinkConfig.CONF_HOSTNAME, "0.0.0.0")
    +    sinkContext.put(SparkSinkConfig.CONF_PORT, 0.toString)
    +    sink.configure(sinkContext)
    +    sink.setChannel(channel)
    +    (channel, sink)
    +  }
    +
    +  private def putEvents(ch: MemoryChannel, count: Int): Unit = {
    +    val tx = ch.getTransaction
    +    tx.begin()
    +    (1 to count).map(x => ch.put(EventBuilder.withBody(x.toString.getBytes)))
    +    tx.commit()
    +    tx.close()
    +  }
    +
    +  private def getTransceiverAndClient(address: InetSocketAddress,
    +    count: Int): Seq[(NettyTransceiver, SparkFlumeProtocol.Callback)] = {
    +
    +    (1 to count).map(_ => {
    +      lazy val channelFactoryExecutor =
    +        Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
    +          setNameFormat("Flume Receiver Channel Thread - %d").build())
    +      lazy val channelFactory =
    +        new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
    +      val transceiver = new NettyTransceiver(address, channelFactory)
    +      val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
    +      (transceiver, client)
    +    })
    +  }
    +
    +  private def assertChannelIsEmpty(channel: MemoryChannel) = {
    --- End diff --
    
    Can you make this as "Unit = {"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52566816
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18788/consoleFull) for   PR 1958 at commit [`c9190d1`](https://github.com/apache/spark/commit/c9190d11b4644c1588ff63264bf588f7a243147f).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52731129
  
    Could not find Apache license headers in the following files:
     !????? /home/jenkins/workspace/SparkPullRequestBuilder/mllib/checkpoint/.temp.crc
     !????? /home/jenkins/workspace/SparkPullRequestBuilder/mllib/checkpoint/temp
    
    No idea what is happening here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1958#discussion_r16400682
  
    --- Diff: external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume.sink
    +
    +import java.net.InetSocketAddress
    +import java.util.concurrent.atomic.AtomicInteger
    +import java.util.concurrent.{CountDownLatch, Executors}
    +
    +import scala.collection.JavaConversions._
    +import scala.concurrent.{Promise, Future}
    +import scala.util.{Failure, Success, Try}
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder
    +import org.apache.avro.ipc.NettyTransceiver
    +import org.apache.avro.ipc.specific.SpecificRequestor
    +import org.apache.flume.Context
    +import org.apache.flume.channel.MemoryChannel
    +import org.apache.flume.event.EventBuilder
    +import org.apache.spark.streaming.TestSuiteBase
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    +
    +class SparkSinkSuite extends TestSuiteBase {
    +  val eventsPerBatch = 1000
    +  val channelCapacity = 5000
    +
    +  test("Success") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    client.ack(events.getSequenceNumber)
    +    assert(events.getEvents.size() === 1000)
    +    assertChannelIsEmpty(channel)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Nack") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    client.nack(events.getSequenceNumber)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Timeout") {
    +    val (channel, sink) = initializeChannelAndSink(Option(Map(SparkSinkConfig
    +      .CONF_TRANSACTION_TIMEOUT -> 1.toString)))
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    Thread.sleep(1000)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Multiple consumers") {
    +    multipleClients(failSome = false)
    +  }
    +
    +  test("Multiple consumers With Some Failures") {
    --- End diff --
    
    nit: inconsistent capitalization, lets rather have "Multple consumers with some failures"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1958#discussion_r16401169
  
    --- Diff: external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume.sink
    +
    +import java.net.InetSocketAddress
    +import java.util.concurrent.atomic.AtomicInteger
    +import java.util.concurrent.{CountDownLatch, Executors}
    +
    +import scala.collection.JavaConversions._
    +import scala.concurrent.{Promise, Future}
    +import scala.util.{Failure, Success, Try}
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder
    +import org.apache.avro.ipc.NettyTransceiver
    +import org.apache.avro.ipc.specific.SpecificRequestor
    +import org.apache.flume.Context
    +import org.apache.flume.channel.MemoryChannel
    +import org.apache.flume.event.EventBuilder
    +import org.apache.spark.streaming.TestSuiteBase
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    +
    +class SparkSinkSuite extends TestSuiteBase {
    +  val eventsPerBatch = 1000
    +  val channelCapacity = 5000
    +
    +  test("Success") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    client.ack(events.getSequenceNumber)
    +    assert(events.getEvents.size() === 1000)
    +    assertChannelIsEmpty(channel)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Nack") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    client.nack(events.getSequenceNumber)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Timeout") {
    +    val (channel, sink) = initializeChannelAndSink(Option(Map(SparkSinkConfig
    +      .CONF_TRANSACTION_TIMEOUT -> 1.toString)))
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    Thread.sleep(1000)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Multiple consumers") {
    +    multipleClients(failSome = false)
    +  }
    +
    +  test("Multiple consumers With Some Failures") {
    +    multipleClients(failSome = true)
    +  }
    +
    +  def multipleClients(failSome: Boolean): Unit = {
    +    import scala.concurrent.ExecutionContext.Implicits.global
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    (1 to 5).map(_ => putEvents(channel, eventsPerBatch))
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val transAndClient = getTransceiverAndClient(address, 5)
    +    val batchCounter = new CountDownLatch(5)
    +    val counter = new AtomicInteger(0)
    +    transAndClient.foreach(x => {
    +      val promise = Promise[EventBatch]()
    +      val future = promise.future
    +      Future {
    +        val client = x._2
    +        var events: EventBatch = null
    +        Try {
    +          events = client.getEventBatch(1000)
    +          if(!failSome || counter.getAndIncrement() % 2 == 0) {
    +            client.ack(events.getSequenceNumber)
    +          } else {
    +            client.nack(events.getSequenceNumber)
    +          }
    +        }.map(_ => promise.success(events)).recover({
    +          case e => promise.failure(e)
    +        })
    +      }
    +      future.onComplete {
    +        case Success(events) => assert(events.getEvents.size() === 1000)
    --- End diff --
    
    for multiple lines, its started on the next line.
    
    ```
    case Success(events) => 
        assert (.....
        batchCounter.countDown()
    case Failure(t) => 
         batchCounter.countDown()
         throw t
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52525221
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18756/consoleFull) for   PR 1958 at commit [`7fedc5a`](https://github.com/apache/spark/commit/7fedc5a6222f3a2d55e73fb3e8a1fb1ccabf2157).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52741251
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18930/consoleFull) for   PR 1958 at commit [`e3110b9`](https://github.com/apache/spark/commit/e3110b9b551fb0019d6095a16504e8b25d70f962).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52597090
  
    unit test is failing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52695066
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18865/consoleFull) for   PR 1958 at commit [`120b81e`](https://github.com/apache/spark/commit/120b81eafc5ccd90196211b5926b1257bac917d3).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1958#discussion_r16401593
  
    --- Diff: external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume.sink
    +
    +import java.net.InetSocketAddress
    +import java.util.concurrent.atomic.AtomicInteger
    +import java.util.concurrent.{CountDownLatch, Executors}
    +
    +import scala.collection.JavaConversions._
    +import scala.concurrent.{Promise, Future}
    +import scala.util.{Failure, Success, Try}
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder
    +import org.apache.avro.ipc.NettyTransceiver
    +import org.apache.avro.ipc.specific.SpecificRequestor
    +import org.apache.flume.Context
    +import org.apache.flume.channel.MemoryChannel
    +import org.apache.flume.event.EventBuilder
    +import org.apache.spark.streaming.TestSuiteBase
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    +
    +class SparkSinkSuite extends TestSuiteBase {
    +  val eventsPerBatch = 1000
    +  val channelCapacity = 5000
    +
    +  test("Success") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    client.ack(events.getSequenceNumber)
    +    assert(events.getEvents.size() === 1000)
    +    assertChannelIsEmpty(channel)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Nack") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    client.nack(events.getSequenceNumber)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Timeout") {
    +    val (channel, sink) = initializeChannelAndSink(Option(Map(SparkSinkConfig
    +      .CONF_TRANSACTION_TIMEOUT -> 1.toString)))
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    Thread.sleep(1000)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Multiple consumers") {
    +    multipleClients(failSome = false)
    +  }
    +
    +  test("Multiple consumers With Some Failures") {
    +    multipleClients(failSome = true)
    +  }
    +
    +  def multipleClients(failSome: Boolean): Unit = {
    +    import scala.concurrent.ExecutionContext.Implicits.global
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    (1 to 5).map(_ => putEvents(channel, eventsPerBatch))
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val transAndClient = getTransceiverAndClient(address, 5)
    +    val batchCounter = new CountDownLatch(5)
    +    val counter = new AtomicInteger(0)
    +    transAndClient.foreach(x => {
    +      val promise = Promise[EventBatch]()
    +      val future = promise.future
    +      Future {
    --- End diff --
    
    Why make one Future, than another Promise + Future combination? Just the single Future is sufficient. Though, I question the use of ExecutionContext, we dont know the # of threads in that context, so not sure what parallelism we achieve here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52374864
  
    Same failures as before - not caused by this patch


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52708814
  
    Looks like the tests have passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1958#discussion_r16384289
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala ---
    @@ -53,7 +53,6 @@ import org.apache.flume.sink.AbstractSink
      *
      */
     
    -private[flume]
    --- End diff --
    
    Why was this removed? We dont want to expose this as a public class as this class will then appear in the Scala docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52740385
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18927/consoleFull) for   PR 1958 at commit [`e3110b9`](https://github.com/apache/spark/commit/e3110b9b551fb0019d6095a16504e8b25d70f962).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1958#discussion_r16400756
  
    --- Diff: external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume.sink
    +
    +import java.net.InetSocketAddress
    +import java.util.concurrent.atomic.AtomicInteger
    +import java.util.concurrent.{CountDownLatch, Executors}
    +
    +import scala.collection.JavaConversions._
    +import scala.concurrent.{Promise, Future}
    +import scala.util.{Failure, Success, Try}
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder
    +import org.apache.avro.ipc.NettyTransceiver
    +import org.apache.avro.ipc.specific.SpecificRequestor
    +import org.apache.flume.Context
    +import org.apache.flume.channel.MemoryChannel
    +import org.apache.flume.event.EventBuilder
    +import org.apache.spark.streaming.TestSuiteBase
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    +
    +class SparkSinkSuite extends TestSuiteBase {
    +  val eventsPerBatch = 1000
    +  val channelCapacity = 5000
    +
    +  test("Success") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    client.ack(events.getSequenceNumber)
    +    assert(events.getEvents.size() === 1000)
    +    assertChannelIsEmpty(channel)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Nack") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    client.nack(events.getSequenceNumber)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Timeout") {
    +    val (channel, sink) = initializeChannelAndSink(Option(Map(SparkSinkConfig
    +      .CONF_TRANSACTION_TIMEOUT -> 1.toString)))
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    Thread.sleep(1000)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Multiple consumers") {
    +    multipleClients(failSome = false)
    +  }
    +
    +  test("Multiple consumers With Some Failures") {
    +    multipleClients(failSome = true)
    +  }
    +
    +  def multipleClients(failSome: Boolean): Unit = {
    +    import scala.concurrent.ExecutionContext.Implicits.global
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    (1 to 5).map(_ => putEvents(channel, eventsPerBatch))
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val transAndClient = getTransceiverAndClient(address, 5)
    --- End diff --
    
    nit: transAndClients


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52532136
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18756/consoleFull) for   PR 1958 at commit [`7fedc5a`](https://github.com/apache/spark/commit/7fedc5a6222f3a2d55e73fb3e8a1fb1ccabf2157).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52710518
  
    Jenkins, test this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1958#discussion_r16401267
  
    --- Diff: external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume.sink
    +
    +import java.net.InetSocketAddress
    +import java.util.concurrent.atomic.AtomicInteger
    +import java.util.concurrent.{CountDownLatch, Executors}
    +
    +import scala.collection.JavaConversions._
    +import scala.concurrent.{Promise, Future}
    +import scala.util.{Failure, Success, Try}
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder
    +import org.apache.avro.ipc.NettyTransceiver
    +import org.apache.avro.ipc.specific.SpecificRequestor
    +import org.apache.flume.Context
    +import org.apache.flume.channel.MemoryChannel
    +import org.apache.flume.event.EventBuilder
    +import org.apache.spark.streaming.TestSuiteBase
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    +
    +class SparkSinkSuite extends TestSuiteBase {
    +  val eventsPerBatch = 1000
    +  val channelCapacity = 5000
    +
    +  test("Success") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    client.ack(events.getSequenceNumber)
    +    assert(events.getEvents.size() === 1000)
    +    assertChannelIsEmpty(channel)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Nack") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    client.nack(events.getSequenceNumber)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Timeout") {
    +    val (channel, sink) = initializeChannelAndSink(Option(Map(SparkSinkConfig
    +      .CONF_TRANSACTION_TIMEOUT -> 1.toString)))
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    Thread.sleep(1000)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Multiple consumers") {
    +    multipleClients(failSome = false)
    +  }
    +
    +  test("Multiple consumers With Some Failures") {
    +    multipleClients(failSome = true)
    +  }
    +
    +  def multipleClients(failSome: Boolean): Unit = {
    +    import scala.concurrent.ExecutionContext.Implicits.global
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    (1 to 5).map(_ => putEvents(channel, eventsPerBatch))
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val transAndClient = getTransceiverAndClient(address, 5)
    +    val batchCounter = new CountDownLatch(5)
    +    val counter = new AtomicInteger(0)
    +    transAndClient.foreach(x => {
    +      val promise = Promise[EventBatch]()
    +      val future = promise.future
    +      Future {
    +        val client = x._2
    +        var events: EventBatch = null
    +        Try {
    +          events = client.getEventBatch(1000)
    +          if(!failSome || counter.getAndIncrement() % 2 == 0) {
    +            client.ack(events.getSequenceNumber)
    +          } else {
    +            client.nack(events.getSequenceNumber)
    +          }
    +        }.map(_ => promise.success(events)).recover({
    +          case e => promise.failure(e)
    +        })
    +      }
    +      future.onComplete {
    +        case Success(events) => assert(events.getEvents.size() === 1000)
    +          batchCounter.countDown()
    +        case Failure(t) => batchCounter.countDown()
    +          throw t
    +      }
    +    })
    +    batchCounter.await()
    +    if(failSome) {
    +      assert(availableChannelSlots(channel) === 3000)
    +    } else {
    +      assertChannelIsEmpty(channel)
    +    }
    +    sink.stop()
    +    channel.stop()
    +    transAndClient.foreach(x => x._1.close())
    +  }
    +
    +  def initializeChannelAndSink(overrides: Option[Map[String, String]]): (MemoryChannel,
    +    SparkSink) = {
    +    val channel = new MemoryChannel()
    +    val channelContext = new Context()
    +
    +    channelContext.put("capacity", channelCapacity.toString)
    +    channelContext.put("transactionCapacity", 1000.toString)
    +    channelContext.put("keep-alive", 0.toString)
    +    overrides.foreach(channelContext.putAll(_))
    +    channel.configure(channelContext)
    +
    +    val sink = new SparkSink()
    +    val sinkContext = new Context()
    +    sinkContext.put(SparkSinkConfig.CONF_HOSTNAME, "0.0.0.0")
    +    sinkContext.put(SparkSinkConfig.CONF_PORT, 0.toString)
    +    sink.configure(sinkContext)
    +    sink.setChannel(channel)
    +    (channel, sink)
    +  }
    +
    +  private def putEvents(ch: MemoryChannel, count: Int): Unit = {
    +    val tx = ch.getTransaction
    +    tx.begin()
    +    (1 to count).map(x => ch.put(EventBuilder.withBody(x.toString.getBytes)))
    +    tx.commit()
    +    tx.close()
    +  }
    +
    +  private def getTransceiverAndClient(address: InetSocketAddress,
    +    count: Int): Seq[(NettyTransceiver, SparkFlumeProtocol.Callback)] = {
    +
    +    (1 to count).map(_ => {
    +      lazy val channelFactoryExecutor =
    +        Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
    +          setNameFormat("Flume Receiver Channel Thread - %d").build())
    +      lazy val channelFactory =
    +        new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
    +      val transceiver = new NettyTransceiver(address, channelFactory)
    +      val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
    +      (transceiver, client)
    +    })
    +  }
    +
    +  private def assertChannelIsEmpty(channel: MemoryChannel) = {
    +    assert(availableChannelSlots(channel) === 5000)
    --- End diff --
    
    This should be equaled to "channelCapacity"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52703193
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18878/consoleFull) for   PR 1958 at commit [`e3110b9`](https://github.com/apache/spark/commit/e3110b9b551fb0019d6095a16504e8b25d70f962).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52731023
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18916/consoleFull) for   PR 1958 at commit [`e3110b9`](https://github.com/apache/spark/commit/e3110b9b551fb0019d6095a16504e8b25d70f962).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1958#discussion_r16385030
  
    --- Diff: external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume.sink
    +
    +import java.net.InetSocketAddress
    +import java.util.concurrent.atomic.AtomicInteger
    +import java.util.concurrent.{CountDownLatch, Executors}
    +
    +import scala.collection.JavaConversions._
    +import scala.concurrent.{Promise, Future}
    +import scala.util.{Failure, Success, Try}
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder
    +import org.apache.avro.ipc.NettyTransceiver
    +import org.apache.avro.ipc.specific.SpecificRequestor
    +import org.apache.flume.Context
    +import org.apache.flume.channel.MemoryChannel
    +import org.apache.flume.event.EventBuilder
    +import org.apache.spark.streaming.TestSuiteBase
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    +
    +class SparkSinkSuite extends TestSuiteBase {
    +  val eventsPerBatch = 1000
    +  val channelCapacity = 5000
    +
    +  test("Success test") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    client.ack(events.getSequenceNumber)
    +    assert(events.getEvents.size() === 1000)
    +    assertChannelIsEmpty(channel)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Nack") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    client.nack(events.getSequenceNumber)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Timeout") {
    +    val (channel, sink) = initializeChannelAndSink(Option(Map(SparkSinkConfig
    +      .CONF_TRANSACTION_TIMEOUT -> 1.toString)))
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    Thread.sleep(1000)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Multiple consumers") {
    +    multipleClients(failSome = false)
    +  }
    +
    +  test("Multiple consumers With Some Failures") {
    +    multipleClients(failSome = true)
    +  }
    +
    +  def multipleClients(failSome: Boolean): Unit = {
    +    import scala.concurrent.ExecutionContext.Implicits.global
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    (1 to 5).map(_ => putEvents(channel, eventsPerBatch))
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val transAndClient = getTransceiverAndClient(address, 5)
    +    val batchCounter = new CountDownLatch(5)
    +    val counter = new AtomicInteger(0)
    +    transAndClient.foreach(x => {
    +      val promise = Promise[EventBatch]()
    +      val future = promise.future
    +      Future {
    +        val client = x._2
    +        var events: EventBatch = null
    +        Try {
    +          events = client.getEventBatch(1000)
    +          if(!failSome || counter.getAndIncrement() % 2 == 0) {
    +            client.ack(events.getSequenceNumber)
    +          } else {
    +            client.nack(events.getSequenceNumber)
    +          }
    +        }.map(_ => promise.success(events)).recover({
    +          case e => promise.failure(e)
    +        })
    +      }
    +      future.onComplete {
    +        case Success(events) => assert(events.getEvents.size() === 1000)
    +          batchCounter.countDown()
    +        case Failure(t) => batchCounter.countDown()
    +          throw t
    +      }
    +    })
    +    batchCounter.await()
    +    if(failSome) {
    +      assert(availableChannelSlots(channel) === 3000)
    +    } else {
    +      assertChannelIsEmpty(channel)
    +    }
    +    sink.stop()
    +    channel.stop()
    +    transAndClient.foreach(x => x._1.close())
    +  }
    +
    +  def initializeChannelAndSink(overrides: Option[Map[String, String]]):
    +  (MemoryChannel, SparkSink) = {
    --- End diff --
    
    Wrong Indentation 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52710115
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18878/consoleFull) for   PR 1958 at commit [`e3110b9`](https://github.com/apache/spark/commit/e3110b9b551fb0019d6095a16504e8b25d70f962).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52376357
  
    Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-53011746
  
    I have backported this to branch-1.1. However @pwendell still found some
    other mysterious issues with SparkSinkSuite that broke the maven build, so
    he deleted sparkSinkSuite temporarily to create another snapshot release of
    1.1. I am investigating this right now.
    
    TD
    
    
    On Thu, Aug 21, 2014 at 10:37 AM, Hari Shreedharan <notifications@github.com
    > wrote:
    
    > I believe the commit which added the getPort() method had merge conflicts
    > in the 1.1 branch, which is why @tdas <https://github.com/tdas> did not
    > backport it. We can probably backport all commits that went into the
    > flume-sink module to take care of this.
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/1958#issuecomment-52954913>.
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52740904
  
    Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52954913
  
    I believe the commit which added the `getPort()` method had merge conflicts in the 1.1 branch, which is why @tdas did not backport it. We can probably backport all commits that went into the flume-sink module to take care of this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1958#discussion_r16280387
  
    --- Diff: external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala ---
    @@ -0,0 +1,208 @@
    +package org.apache.spark.streaming.flume.sink
    +
    +import java.net.InetSocketAddress
    +import java.util.concurrent.atomic.AtomicInteger
    +import java.util.concurrent.{CountDownLatch, Executors}
    +
    +import scala.collection.JavaConversions._
    +import scala.concurrent.{Promise, Future}
    +import scala.util.{Failure, Success, Try}
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder
    +import org.apache.avro.ipc.NettyTransceiver
    +import org.apache.avro.ipc.specific.SpecificRequestor
    +import org.apache.flume.Context
    +import org.apache.flume.channel.MemoryChannel
    +import org.apache.flume.event.EventBuilder
    +import org.apache.spark.streaming.TestSuiteBase
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    +
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    --- End diff --
    
    Hey, Hari, ASF header should be at the top of file :).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52344694
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18622/consoleFull) for   PR 1958 at commit [`7b9b649`](https://github.com/apache/spark/commit/7b9b649612bd61dae44f4b0212160b59fca86b73).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/1958


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1958#discussion_r16281516
  
    --- Diff: external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala ---
    @@ -0,0 +1,208 @@
    +package org.apache.spark.streaming.flume.sink
    +
    +import java.net.InetSocketAddress
    +import java.util.concurrent.atomic.AtomicInteger
    +import java.util.concurrent.{CountDownLatch, Executors}
    +
    +import scala.collection.JavaConversions._
    +import scala.concurrent.{Promise, Future}
    +import scala.util.{Failure, Success, Try}
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder
    +import org.apache.avro.ipc.NettyTransceiver
    +import org.apache.avro.ipc.specific.SpecificRequestor
    +import org.apache.flume.Context
    +import org.apache.flume.channel.MemoryChannel
    +import org.apache.flume.event.EventBuilder
    +import org.apache.spark.streaming.TestSuiteBase
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    +
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    --- End diff --
    
    Thanks! Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52731014
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18916/consoleFull) for   PR 1958 at commit [`e3110b9`](https://github.com/apache/spark/commit/e3110b9b551fb0019d6095a16504e8b25d70f962).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52713686
  
    Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52709005
  
    Right, good. But I want to run it another couple of times to test its flakiness. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52791480
  
    @tdas @harishreedharan  This is causing yarn builds to fail on branch-1.1.
    
    I think you are missing commit: https://github.com/apache/spark/commit/95470a03ae85d7d37d75f73435425a0e22918bc9#diff-74323d4d6986ea21770bf3f49c091e5b 
    
    /external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala:66: value getPort is not a member of org.apache.spark.streaming.flume.sink.SparkSink
    [ERROR]     val port = sink.getPort


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1958#discussion_r16385095
  
    --- Diff: external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume.sink
    +
    +import java.net.InetSocketAddress
    +import java.util.concurrent.atomic.AtomicInteger
    +import java.util.concurrent.{CountDownLatch, Executors}
    +
    +import scala.collection.JavaConversions._
    +import scala.concurrent.{Promise, Future}
    +import scala.util.{Failure, Success, Try}
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder
    +import org.apache.avro.ipc.NettyTransceiver
    +import org.apache.avro.ipc.specific.SpecificRequestor
    +import org.apache.flume.Context
    +import org.apache.flume.channel.MemoryChannel
    +import org.apache.flume.event.EventBuilder
    +import org.apache.spark.streaming.TestSuiteBase
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    +
    +class SparkSinkSuite extends TestSuiteBase {
    +  val eventsPerBatch = 1000
    +  val channelCapacity = 5000
    +
    +  test("Success test") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    client.ack(events.getSequenceNumber)
    +    assert(events.getEvents.size() === 1000)
    +    assertChannelIsEmpty(channel)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Nack") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    client.nack(events.getSequenceNumber)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Timeout") {
    +    val (channel, sink) = initializeChannelAndSink(Option(Map(SparkSinkConfig
    +      .CONF_TRANSACTION_TIMEOUT -> 1.toString)))
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    Thread.sleep(1000)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Multiple consumers") {
    +    multipleClients(failSome = false)
    +  }
    +
    +  test("Multiple consumers With Some Failures") {
    +    multipleClients(failSome = true)
    +  }
    +
    +  def multipleClients(failSome: Boolean): Unit = {
    +    import scala.concurrent.ExecutionContext.Implicits.global
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    (1 to 5).map(_ => putEvents(channel, eventsPerBatch))
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val transAndClient = getTransceiverAndClient(address, 5)
    +    val batchCounter = new CountDownLatch(5)
    +    val counter = new AtomicInteger(0)
    +    transAndClient.foreach(x => {
    +      val promise = Promise[EventBatch]()
    +      val future = promise.future
    +      Future {
    +        val client = x._2
    +        var events: EventBatch = null
    +        Try {
    +          events = client.getEventBatch(1000)
    +          if(!failSome || counter.getAndIncrement() % 2 == 0) {
    +            client.ack(events.getSequenceNumber)
    +          } else {
    +            client.nack(events.getSequenceNumber)
    +          }
    +        }.map(_ => promise.success(events)).recover({
    +          case e => promise.failure(e)
    +        })
    +      }
    +      future.onComplete {
    +        case Success(events) => assert(events.getEvents.size() === 1000)
    +          batchCounter.countDown()
    +        case Failure(t) => batchCounter.countDown()
    +          throw t
    +      }
    +    })
    +    batchCounter.await()
    +    if(failSome) {
    +      assert(availableChannelSlots(channel) === 3000)
    +    } else {
    +      assertChannelIsEmpty(channel)
    +    }
    +    sink.stop()
    +    channel.stop()
    +    transAndClient.foreach(x => x._1.close())
    +  }
    +
    +  def initializeChannelAndSink(overrides: Option[Map[String, String]]):
    +  (MemoryChannel, SparkSink) = {
    +    val channel = new MemoryChannel()
    +    val channelContext = new Context()
    +
    +    channelContext.put("capacity", channelCapacity.toString)
    +    channelContext.put("transactionCapacity", 1000.toString)
    +    channelContext.put("keep-alive", 0.toString)
    +    overrides.foreach(channelContext.putAll(_))
    +    channel.configure(channelContext)
    +
    +    val sink = new SparkSink()
    +    val sinkContext = new Context()
    +    sinkContext.put(SparkSinkConfig.CONF_HOSTNAME, "0.0.0.0")
    +    sinkContext.put(SparkSinkConfig.CONF_PORT, 0.toString)
    +    sink.configure(sinkContext)
    +    sink.setChannel(channel)
    +    (channel, sink)
    +  }
    +
    +  private def putEvents(ch: MemoryChannel, count: Int): Unit = {
    +    val tx = ch.getTransaction
    +    tx.begin()
    +    (1 to count).map(x => ch.put(EventBuilder.withBody(x.toString.getBytes)))
    +    tx.commit()
    +    tx.close()
    +  }
    +
    +  private def getTransceiverAndClient(address: InetSocketAddress, count: Int):
    +  Seq[(NettyTransceiver, SparkFlumeProtocol.Callback)] = {
    +
    +    (1 to count).map(_ => {
    +      lazy val channelFactoryExecutor =
    +        Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
    +          setNameFormat("Flume Receiver Channel Thread - %d").build())
    +      lazy val channelFactory =
    +        new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
    +      val transceiver = new NettyTransceiver(address, channelFactory)
    +      val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
    +      (transceiver, client)
    +    })
    +  }
    +
    +  private def assertChannelIsEmpty(channel: MemoryChannel) = {
    +    assert(availableChannelSlots(channel) === 5000)
    +  }
    +
    +  private def availableChannelSlots(channel: MemoryChannel): Int = {
    +    val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
    +    queueRemaining.setAccessible(true)
    +    val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
    +    m.invoke(queueRemaining.get(channel)).asInstanceOf[Int]
    +  }
    +
    --- End diff --
    
    unnecessary space.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52737616
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18927/consoleFull) for   PR 1958 at commit [`e3110b9`](https://github.com/apache/spark/commit/e3110b9b551fb0019d6095a16504e8b25d70f962).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52810248
  
    Correct. This patch uses a method which was introduced in that one. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52737388
  
    Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52702892
  
    Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52689514
  
    LGTM, will merge when tests pass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1958#discussion_r16400657
  
    --- Diff: external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume.sink
    +
    +import java.net.InetSocketAddress
    +import java.util.concurrent.atomic.AtomicInteger
    +import java.util.concurrent.{CountDownLatch, Executors}
    +
    +import scala.collection.JavaConversions._
    +import scala.concurrent.{Promise, Future}
    +import scala.util.{Failure, Success, Try}
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder
    +import org.apache.avro.ipc.NettyTransceiver
    +import org.apache.avro.ipc.specific.SpecificRequestor
    +import org.apache.flume.Context
    +import org.apache.flume.channel.MemoryChannel
    +import org.apache.flume.event.EventBuilder
    +import org.apache.spark.streaming.TestSuiteBase
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    +
    +class SparkSinkSuite extends TestSuiteBase {
    +  val eventsPerBatch = 1000
    +  val channelCapacity = 5000
    +
    +  test("Success") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    client.ack(events.getSequenceNumber)
    +    assert(events.getEvents.size() === 1000)
    +    assertChannelIsEmpty(channel)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Nack") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    client.nack(events.getSequenceNumber)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Timeout") {
    +    val (channel, sink) = initializeChannelAndSink(Option(Map(SparkSinkConfig
    +      .CONF_TRANSACTION_TIMEOUT -> 1.toString)))
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    Thread.sleep(1000)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Multiple consumers") {
    +    multipleClients(failSome = false)
    +  }
    +
    +  test("Multiple consumers With Some Failures") {
    +    multipleClients(failSome = true)
    +  }
    +
    +  def multipleClients(failSome: Boolean): Unit = {
    --- End diff --
    
    nit: this should be named "testMultipleClients"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52327168
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18611/consoleFull) for   PR 1958 at commit [`f2c56c9`](https://github.com/apache/spark/commit/f2c56c976bc6faa83b8357c80caad1f4839eb06d).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52761646
  
    Alright, I am merging this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52326820
  
    Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1958#discussion_r16385317
  
    --- Diff: external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume.sink
    +
    +import java.net.InetSocketAddress
    +import java.util.concurrent.atomic.AtomicInteger
    +import java.util.concurrent.{CountDownLatch, Executors}
    +
    +import scala.collection.JavaConversions._
    +import scala.concurrent.{Promise, Future}
    +import scala.util.{Failure, Success, Try}
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder
    +import org.apache.avro.ipc.NettyTransceiver
    +import org.apache.avro.ipc.specific.SpecificRequestor
    +import org.apache.flume.Context
    +import org.apache.flume.channel.MemoryChannel
    +import org.apache.flume.event.EventBuilder
    +import org.apache.spark.streaming.TestSuiteBase
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    +
    +class SparkSinkSuite extends TestSuiteBase {
    +  val eventsPerBatch = 1000
    +  val channelCapacity = 5000
    +
    +  test("Success test") {
    --- End diff --
    
    Maybe keep the naming consistent. If this is called "Success Test", then name the next ones as "Failure Test" and "Timeout Test". You can even remove the "Test"
    
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1958#discussion_r16401236
  
    --- Diff: external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume.sink
    +
    +import java.net.InetSocketAddress
    +import java.util.concurrent.atomic.AtomicInteger
    +import java.util.concurrent.{CountDownLatch, Executors}
    +
    +import scala.collection.JavaConversions._
    +import scala.concurrent.{Promise, Future}
    +import scala.util.{Failure, Success, Try}
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder
    +import org.apache.avro.ipc.NettyTransceiver
    +import org.apache.avro.ipc.specific.SpecificRequestor
    +import org.apache.flume.Context
    +import org.apache.flume.channel.MemoryChannel
    +import org.apache.flume.event.EventBuilder
    +import org.apache.spark.streaming.TestSuiteBase
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    +
    +class SparkSinkSuite extends TestSuiteBase {
    +  val eventsPerBatch = 1000
    +  val channelCapacity = 5000
    +
    +  test("Success") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    client.ack(events.getSequenceNumber)
    +    assert(events.getEvents.size() === 1000)
    +    assertChannelIsEmpty(channel)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Nack") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    client.nack(events.getSequenceNumber)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Timeout") {
    +    val (channel, sink) = initializeChannelAndSink(Option(Map(SparkSinkConfig
    +      .CONF_TRANSACTION_TIMEOUT -> 1.toString)))
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    Thread.sleep(1000)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Multiple consumers") {
    +    multipleClients(failSome = false)
    +  }
    +
    +  test("Multiple consumers With Some Failures") {
    +    multipleClients(failSome = true)
    +  }
    +
    +  def multipleClients(failSome: Boolean): Unit = {
    +    import scala.concurrent.ExecutionContext.Implicits.global
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    (1 to 5).map(_ => putEvents(channel, eventsPerBatch))
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val transAndClient = getTransceiverAndClient(address, 5)
    +    val batchCounter = new CountDownLatch(5)
    +    val counter = new AtomicInteger(0)
    +    transAndClient.foreach(x => {
    +      val promise = Promise[EventBatch]()
    +      val future = promise.future
    +      Future {
    +        val client = x._2
    +        var events: EventBatch = null
    +        Try {
    +          events = client.getEventBatch(1000)
    +          if(!failSome || counter.getAndIncrement() % 2 == 0) {
    +            client.ack(events.getSequenceNumber)
    +          } else {
    +            client.nack(events.getSequenceNumber)
    +          }
    +        }.map(_ => promise.success(events)).recover({
    +          case e => promise.failure(e)
    +        })
    +      }
    +      future.onComplete {
    +        case Success(events) => assert(events.getEvents.size() === 1000)
    +          batchCounter.countDown()
    +        case Failure(t) => batchCounter.countDown()
    +          throw t
    +      }
    +    })
    +    batchCounter.await()
    +    if(failSome) {
    +      assert(availableChannelSlots(channel) === 3000)
    +    } else {
    +      assertChannelIsEmpty(channel)
    +    }
    +    sink.stop()
    +    channel.stop()
    +    transAndClient.foreach(x => x._1.close())
    +  }
    +
    +  def initializeChannelAndSink(overrides: Option[Map[String, String]]): (MemoryChannel,
    --- End diff --
    
    Also, add `private` to keep it consistent to other methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52708133
  
    Yeah, that sleep might be sufficient. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1958#discussion_r16400150
  
    --- Diff: external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume.sink
    +
    +import java.net.InetSocketAddress
    +import java.util.concurrent.atomic.AtomicInteger
    +import java.util.concurrent.{CountDownLatch, Executors}
    +
    +import scala.collection.JavaConversions._
    +import scala.concurrent.{Promise, Future}
    +import scala.util.{Failure, Success, Try}
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder
    +import org.apache.avro.ipc.NettyTransceiver
    +import org.apache.avro.ipc.specific.SpecificRequestor
    +import org.apache.flume.Context
    +import org.apache.flume.channel.MemoryChannel
    +import org.apache.flume.event.EventBuilder
    +import org.apache.spark.streaming.TestSuiteBase
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    +
    +class SparkSinkSuite extends TestSuiteBase {
    +  val eventsPerBatch = 1000
    +  val channelCapacity = 5000
    +
    +  test("Success") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    client.ack(events.getSequenceNumber)
    +    assert(events.getEvents.size() === 1000)
    +    assertChannelIsEmpty(channel)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Nack") {
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    client.nack(events.getSequenceNumber)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Timeout") {
    +    val (channel, sink) = initializeChannelAndSink(Option(Map(SparkSinkConfig
    +      .CONF_TRANSACTION_TIMEOUT -> 1.toString)))
    +    channel.start()
    +    sink.start()
    +    putEvents(channel, eventsPerBatch)
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
    +    val events = client.getEventBatch(1000)
    +    assert(events.getEvents.size() === 1000)
    +    Thread.sleep(1000)
    +    assert(availableChannelSlots(channel) === 4000)
    +    sink.stop()
    +    channel.stop()
    +    transceiver.close()
    +  }
    +
    +  test("Multiple consumers") {
    +    multipleClients(failSome = false)
    +  }
    +
    +  test("Multiple consumers With Some Failures") {
    +    multipleClients(failSome = true)
    +  }
    +
    +  def multipleClients(failSome: Boolean): Unit = {
    +    import scala.concurrent.ExecutionContext.Implicits.global
    +    val (channel, sink) = initializeChannelAndSink(None)
    +    channel.start()
    +    sink.start()
    +    (1 to 5).map(_ => putEvents(channel, eventsPerBatch))
    +    val port = sink.getPort
    +    val address = new InetSocketAddress("0.0.0.0", port)
    +
    +    val transAndClient = getTransceiverAndClient(address, 5)
    +    val batchCounter = new CountDownLatch(5)
    +    val counter = new AtomicInteger(0)
    +    transAndClient.foreach(x => {
    +      val promise = Promise[EventBatch]()
    +      val future = promise.future
    +      Future {
    +        val client = x._2
    +        var events: EventBatch = null
    +        Try {
    +          events = client.getEventBatch(1000)
    +          if(!failSome || counter.getAndIncrement() % 2 == 0) {
    +            client.ack(events.getSequenceNumber)
    +          } else {
    +            client.nack(events.getSequenceNumber)
    +          }
    +        }.map(_ => promise.success(events)).recover({
    +          case e => promise.failure(e)
    +        })
    +      }
    +      future.onComplete {
    +        case Success(events) => assert(events.getEvents.size() === 1000)
    +          batchCounter.countDown()
    +        case Failure(t) => batchCounter.countDown()
    +          throw t
    +      }
    +    })
    +    batchCounter.await()
    +    if(failSome) {
    +      assert(availableChannelSlots(channel) === 3000)
    +    } else {
    +      assertChannelIsEmpty(channel)
    +    }
    +    sink.stop()
    +    channel.stop()
    +    transAndClient.foreach(x => x._1.close())
    +  }
    +
    +  def initializeChannelAndSink(overrides: Option[Map[String, String]]): (MemoryChannel,
    --- End diff --
    
    nit: This is a little weird. Option is not necessary (add None and Some unnecessarily increases verbosity), as it can simply be `def initializeChannelAndSink(overrides: Map[String, String] = Map.empty)` 
    
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52337867
  
    Failure is unrelated:
    [info] SparkSinkSuite:
    [info] - Success test
    [info] - Nack
    [info] - Timeout
    [info] - Multiple consumers
    [info] - Multiple consumers With Some Failures
    [info] ScalaTest
    [info] Run completed in 10 minutes, 40 seconds.
    [info] Total number of tests run: 5
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 5, failed 0, canceled 0, ignored 0, pending 0
    --
    Failures are in the SparkSubmitSuite:
    [info] *** 2 TESTS FAILED ***
    [error] Failed: Total 829, Failed 2, Errors 0, Passed 827, Ignored 6
    [error] Failed tests:
    [error] 	org.apache.spark.deploy.SparkSubmitSuite
    [error] (core/test:test) sbt.TestsFailedException: Tests unsuccessful


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3054][STREAMING] Add unit tests for Spa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1958#issuecomment-52745587
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18930/consoleFull) for   PR 1958 at commit [`e3110b9`](https://github.com/apache/spark/commit/e3110b9b551fb0019d6095a16504e8b25d70f962).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org