You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by harishreedharan <gi...@git.apache.org> on 2014/08/20 21:35:17 UTC

[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

GitHub user harishreedharan opened a pull request:

    https://github.com/apache/spark/pull/2065

    [SPARK-3154][STREAMING] Make FlumePollingInputDStream shutdown cleaner.

    Currently lot of errors get thrown from Avro IPC layer when the dstream
    or sink is shutdown. This PR cleans it up. Some refactoring is done in the
    receiver code to put all of the RPC code into a single Try and just recover
    from that. The sink code has also been cleaned up.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/harishreedharan/spark clean-flume-shutdown

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2065.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2065
    
----
commit ed608c8fd620cd9d15c57666fbacdffa61c29dbf
Author: Hari Shreedharan <hs...@apache.org>
Date:   2014-08-20T19:31:30Z

    [SPARK-3154][STREAMING] Make FlumePollingInputDStream shutdown cleaner.
    
    Currently lot of errors get thrown from Avro IPC layer when the dstream
    or sink is shutdown. This PR cleans it up. Some refactoring is done in the
    receiver code to put all of the RPC code into a single Try and just recover
    from that. The sink code has also been cleaned up.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16754182
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala ---
    @@ -62,19 +74,30 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
        */
       override def getEventBatch(n: Int): EventBatch = {
         logDebug("Got getEventBatch call from Spark.")
    -    val sequenceNumber = seqBase + seqCounter.incrementAndGet()
    -    val processor = new TransactionProcessor(channel, sequenceNumber,
    -      n, transactionTimeout, backOffInterval, this)
    -    transactionExecutorOpt.foreach(executor => {
    -      executor.submit(processor)
    -    })
    -    // Wait until a batch is available - will be an error if error message is non-empty
    -    val batch = processor.getEventBatch
    -    if (!SparkSinkUtils.isErrorBatch(batch)) {
    -      processorMap.put(sequenceNumber.toString, processor)
    -      logDebug("Sending event batch with sequence number: " + sequenceNumber)
    +    if (stopped) {
    +      new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList())
    +    } else {
    +      val sequenceNumber = seqBase + seqCounter.incrementAndGet()
    +      activeProcessorMapLock.lock()
    +      val processor = new TransactionProcessor(channel, sequenceNumber,
    +        n, transactionTimeout, backOffInterval, this)
    +      try {
    +        activeProcessors.add(processor)
    --- End diff --
    
    Ignore this comment, I have a more fundamental question in another comment. That makes this comment a moot point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/2065#issuecomment-52873308
  
    Some errors (like the ChannelClosedException) getting logged on the sink side are inevitable. This is because Avro throws that when the other side closes the connection on the sink. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2065#issuecomment-52832628
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18984/consoleFull) for   PR 2065 at commit [`ed608c8`](https://github.com/apache/spark/commit/ed608c8fd620cd9d15c57666fbacdffa61c29dbf).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/2065


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16756449
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala ---
    @@ -88,59 +87,7 @@ private[streaming] class FlumePollingReceiver(
         for (i <- 0 until parallelism) {
           logInfo("Starting Flume Polling Receiver worker threads starting..")
    --- End diff --
    
    "Starting" present twice 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16756261
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala ---
    @@ -62,19 +67,30 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
        */
       override def getEventBatch(n: Int): EventBatch = {
         logDebug("Got getEventBatch call from Spark.")
    -    val sequenceNumber = seqBase + seqCounter.incrementAndGet()
    -    val processor = new TransactionProcessor(channel, sequenceNumber,
    -      n, transactionTimeout, backOffInterval, this)
    -    transactionExecutorOpt.foreach(executor => {
    -      executor.submit(processor)
    -    })
    -    // Wait until a batch is available - will be an error if error message is non-empty
    -    val batch = processor.getEventBatch
    -    if (!SparkSinkUtils.isErrorBatch(batch)) {
    -      processorMap.put(sequenceNumber.toString, processor)
    -      logDebug("Sending event batch with sequence number: " + sequenceNumber)
    +    if (stopped) {
    +      new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList())
    +    } else {
    +      val sequenceNumber = seqBase + seqCounter.incrementAndGet()
    +      val processor = getTransactionProcessor(sequenceNumber, n)
    +      transactionExecutorOpt.foreach(_.submit(processor))
    +      // Wait until a batch is available - will be an error if error message is non-empty
    +      val batch = processor.getEventBatch
    +      if (SparkSinkUtils.isErrorBatch(batch)) {
    +        // Remove the processor if it is an error batch since no ACK is sent.
    +        removeAndGetProcessor(sequenceNumber)
    +        logDebug("Sending event batch with sequence number: " + sequenceNumber)
    +      }
    +      batch
    +    }
    +  }
    +
    +  private def getTransactionProcessor(seq: String, n: Int): TransactionProcessor = {
    --- End diff --
    
    This name is misleading! This `get` is not the same as the get in `removeAndGetProcessor` . This is more like a `create`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2065#issuecomment-52839517
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18984/consoleFull) for   PR 2065 at commit [`ed608c8`](https://github.com/apache/spark/commit/ed608c8fd620cd9d15c57666fbacdffa61c29dbf).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16754560
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingRunnable.scala ---
    @@ -0,0 +1,163 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.ArrayBuffer
    +
    +import com.google.common.base.Throwables
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.streaming.flume.sink._
    +
    +private[flume] class FlumePollingRunnable(val parent: FlumePollingReceiver) extends Runnable with
    +Logging {
    --- End diff --
    
    In correct formatting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2065#issuecomment-52969414
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19063/consoleFull) for   PR 2065 at commit [`e7b8d82`](https://github.com/apache/spark/commit/e7b8d8214eeea8d5433ea7144fca48f925b33881).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16518442
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala ---
    @@ -115,7 +122,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
                       ByteBuffer.wrap(event.getBody)))
                     gotEventsInThisTxn = true
                   case None =>
    -                if (!gotEventsInThisTxn) {
    +                if (!gotEventsInThisTxn && !stopped) {
                       logDebug("Sleeping for " + backOffInterval + " millis as no events were read in" +
                         " the current transaction")
                       TimeUnit.MILLISECONDS.sleep(backOffInterval)
    --- End diff --
    
    I actually restored the `shutdownNow()` methods since we handle the interrupts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16549291
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala ---
    @@ -144,6 +168,21 @@ private[streaming] class FlumePollingReceiver(
         }
       }
     
    +  private def sendNack(batchReceived: Boolean, client: SparkFlumeProtocol.Callback,
    +    seq: CharSequence): Unit = {
    +    Try {
    --- End diff --
    
    So is there an advantage to using Try here instead of the usual try..catch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2065#issuecomment-53533488
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19298/consoleFull) for   PR 2065 at commit [`f93a07c`](https://github.com/apache/spark/commit/f93a07c9e6cd4b34a276e9c75aa601fdafa67ef7).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2065#issuecomment-52966251
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19064/consoleFull) for   PR 2065 at commit [`9001d26`](https://github.com/apache/spark/commit/9001d266907c0831a52d548fb809cd8b68e9fc49).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16758756
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala ---
    @@ -225,4 +237,16 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
         processAckOrNack()
         null
       }
    +
    +  override def hashCode(): Int =  {
    --- End diff --
    
    These two methods are not necessary any more!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2065#issuecomment-52962158
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19063/consoleFull) for   PR 2065 at commit [`e7b8d82`](https://github.com/apache/spark/commit/e7b8d8214eeea8d5433ea7144fca48f925b33881).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16754361
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingRunnable.scala ---
    @@ -0,0 +1,163 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.ArrayBuffer
    +
    +import com.google.common.base.Throwables
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.streaming.flume.sink._
    +
    +private[flume] class FlumePollingRunnable(val parent: FlumePollingReceiver) extends Runnable with
    +Logging {
    +
    +  def run(): Unit = {
    +    while (!parent.isStopped()) {
    +      val connection = parent.getConnections.poll()
    +      val client = connection.client
    +      var batchReceived = false
    +      var seq: CharSequence = null
    +      try {
    +        getBatch(client) match {
    +          case Some(eventBatch) =>
    +            batchReceived = true
    +            seq = eventBatch.getSequenceNumber
    +            val events = toSparkFlumeEvents(eventBatch.getEvents)
    +            logDebug(
    +              "Received batch of " + events.size + " events with sequence number: " + seq)
    +            if (store(events)) {
    +              ack(client, seq)
    +            } else {
    +              sendNack(batchReceived, client, seq)
    +            }
    +          case None =>
    +        }
    +      } catch {
    +        case e: Exception =>
    +          Throwables.getRootCause(e) match {
    +            // If the cause was an InterruptedException, then check if the receiver is stopped -
    +            // if yes, just break out of the loop. Else send a Nack and log a warning.
    +            // In the unlikely case, the cause was not an Exception,
    +            // then just throw it out and exit.
    +            case interrupted: InterruptedException =>
    +              if (!parent.isStopped()) {
    +                logWarning("Interrupted while receiving data from Flume", interrupted)
    +                sendNack(batchReceived, client, seq)
    +              }
    +            case exception: Exception =>
    +              logWarning("Error while receiving data from Flume", exception)
    +              sendNack(batchReceived, client, seq)
    +          }
    +      } finally {
    +        parent.getConnections.add(connection)
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Gets a batch of events from the specified client. This method does not handle any exceptions
    +   * which will be propogated to the caller.
    +   * @param client Client to get events from
    +   * @return [[Some]] which contains the event batch if Flume sent any events back, else [[None]]
    +   */
    +  private def getBatch(client: SparkFlumeProtocol.Callback): Option[EventBatch] = {
    +    val eventBatch = client.getEventBatch(parent.getMaxBatchSize)
    +    if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    +      // No error, proceed with processing data
    +      Some(eventBatch)
    +    } else {
    +      logWarning(
    +        "Did not receive events from Flume agent due to error on the Flume " +
    --- End diff --
    
    This does not need to be on a new line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16687067
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala ---
    @@ -90,53 +93,77 @@ private[streaming] class FlumePollingReceiver(
           // Threads that pull data from Flume.
           receiverExecutor.submit(new Runnable {
             override def run(): Unit = {
    -          while (true) {
    -            val connection = connections.poll()
    -            val client = connection.client
    -            try {
    -              val eventBatch = client.getEventBatch(maxBatchSize)
    -              if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    -                // No error, proceed with processing data
    -                val seq = eventBatch.getSequenceNumber
    -                val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
    -                logDebug(
    -                  "Received batch of " + events.size() + " events with sequence number: " + seq)
    -                try {
    -                  // Convert each Flume event to a serializable SparkFlumeEvent
    -                  val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
    -                  var j = 0
    -                  while (j < events.size()) {
    -                    buffer += toSparkFlumeEvent(events(j))
    -                    j += 1
    +          val loop = new Breaks
    +          loop.breakable {
    --- End diff --
    
    Why have a breakable loop here (with a while loop in the middle)? if receiver.isStopped is true then the whole thing should shutdown anyways. There should not be a loop around that while loop. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2065#issuecomment-53537410
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19298/consoleFull) for   PR 2065 at commit [`f93a07c`](https://github.com/apache/spark/commit/f93a07c9e6cd4b34a276e9c75aa601fdafa67ef7).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `rem In this case, leave out the main class (org.apache.spark.deploy.SparkSubmit) and use our own.`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16756389
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.ArrayBuffer
    +
    +import com.google.common.base.Throwables
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.streaming.flume.sink._
    +
    +private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends Runnable with
    +  Logging {
    +
    +  def run(): Unit = {
    +    while (!receiver.isStopped()) {
    +      val connection = receiver.getConnections.poll()
    --- End diff --
    
    Shouldnt this be inside the try as well?? Its a non-trivial code with potential chance for exception. The next line as well. What if `connection.client` is null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2065#issuecomment-53512084
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19250/consoleFull) for   PR 2065 at commit [`445e700`](https://github.com/apache/spark/commit/445e70045380287e72d4aff617bef9565337287c).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16752978
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala ---
    @@ -62,19 +74,30 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
        */
       override def getEventBatch(n: Int): EventBatch = {
         logDebug("Got getEventBatch call from Spark.")
    -    val sequenceNumber = seqBase + seqCounter.incrementAndGet()
    -    val processor = new TransactionProcessor(channel, sequenceNumber,
    -      n, transactionTimeout, backOffInterval, this)
    -    transactionExecutorOpt.foreach(executor => {
    -      executor.submit(processor)
    -    })
    -    // Wait until a batch is available - will be an error if error message is non-empty
    -    val batch = processor.getEventBatch
    -    if (!SparkSinkUtils.isErrorBatch(batch)) {
    -      processorMap.put(sequenceNumber.toString, processor)
    -      logDebug("Sending event batch with sequence number: " + sequenceNumber)
    +    if (stopped) {
    +      new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList())
    +    } else {
    +      val sequenceNumber = seqBase + seqCounter.incrementAndGet()
    +      activeProcessorMapLock.lock()
    +      val processor = new TransactionProcessor(channel, sequenceNumber,
    +        n, transactionTimeout, backOffInterval, this)
    +      try {
    +        activeProcessors.add(processor)
    --- End diff --
    
    Why would adding the active processor fail?
    Also instead of adding another lock object, you can simply to `activeProcessors.synchronized { ... }`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16549232
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala ---
    @@ -18,23 +18,27 @@ package org.apache.spark.streaming.flume
     
     
     import java.net.InetSocketAddress
    -import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, Executors}
    +import java.util.concurrent.{TimeUnit, LinkedBlockingQueue, Executors}
    +
    +import com.google.common.base.Throwables
     
     import scala.collection.JavaConversions._
     import scala.collection.mutable.ArrayBuffer
     import scala.reflect.ClassTag
    +import scala.util.Try
    +import scala.util.control.Breaks
     
     import com.google.common.util.concurrent.ThreadFactoryBuilder
     import org.apache.avro.ipc.NettyTransceiver
     import org.apache.avro.ipc.specific.SpecificRequestor
    -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    -
     import org.apache.spark.Logging
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.StreamingContext
     import org.apache.spark.streaming.dstream.ReceiverInputDStream
     import org.apache.spark.streaming.receiver.Receiver
     import org.apache.spark.streaming.flume.sink._
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
    --- End diff --
    
    nit: this was in the right place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16549316
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala ---
    @@ -144,6 +168,21 @@ private[streaming] class FlumePollingReceiver(
         }
       }
     
    +  private def sendNack(batchReceived: Boolean, client: SparkFlumeProtocol.Callback,
    +    seq: CharSequence): Unit = {
    +    Try {
    +      if (batchReceived) {
    +        // Let Flume know that the events need to be pushed back into the channel.
    +        logDebug("Sending nack for sequence number: " + seq)
    +        client.nack(seq) // If the agent is down, even this could fail and throw
    +        logDebug("Nack sent for sequence number: " + seq)
    +      }
    +    }.recover({
    +      case e: Exception => logError(
    --- End diff --
    
    nit: move everything to same line, or `logError(` to next line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2065#issuecomment-53526713
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19280/consoleFull) for   PR 2065 at commit [`a0a8852`](https://github.com/apache/spark/commit/a0a88526d0534446e0b19f3646381b05c69e7ce3).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2065#issuecomment-53542841
  
    This looks real good now. Thanks @harishreedharan for all changes and the wonderful refactoring. I am going to quickly test this in my local flume set up for double confirmation. If it works out, will merge this in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16758820
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.ArrayBuffer
    +
    +import com.google.common.base.Throwables
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.streaming.flume.sink._
    +
    +private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends Runnable with
    +  Logging {
    +
    +  def run(): Unit = {
    +    while (!receiver.isStopped()) {
    +      val connection = receiver.getConnections.poll()
    +      val client = connection.client
    +      var batchReceived = false
    +      var seq: CharSequence = null
    +      try {
    +        getBatch(client) match {
    +          case Some(eventBatch) =>
    +            batchReceived = true
    +            seq = eventBatch.getSequenceNumber
    +            val events = toSparkFlumeEvents(eventBatch.getEvents)
    +            if (store(events)) {
    +              sendAck(client, seq)
    +            } else {
    +              sendNack(batchReceived, client, seq)
    +            }
    +          case None =>
    +        }
    +      } catch {
    +        case e: Exception =>
    +          Throwables.getRootCause(e) match {
    +            // If the cause was an InterruptedException, then check if the receiver is stopped -
    +            // if yes, just break out of the loop. Else send a Nack and log a warning.
    +            // In the unlikely case, the cause was not an Exception,
    +            // then just throw it out and exit.
    +            case interrupted: InterruptedException =>
    +              if (!receiver.isStopped()) {
    +                logWarning("Interrupted while receiving data from Flume", interrupted)
    +                sendNack(batchReceived, client, seq)
    +              }
    +            case exception: Exception =>
    +              logWarning("Error while receiving data from Flume", exception)
    +              sendNack(batchReceived, client, seq)
    +          }
    +      } finally {
    +        receiver.getConnections.add(connection)
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Gets a batch of events from the specified client. This method does not handle any exceptions
    +   * which will be propogated to the caller.
    +   * @param client Client to get events from
    +   * @return [[Some]] which contains the event batch if Flume sent any events back, else [[None]]
    +   */
    +  private def getBatch(client: SparkFlumeProtocol.Callback): Option[EventBatch] = {
    +    val eventBatch = client.getEventBatch(receiver.getMaxBatchSize)
    +    if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    +      // No error, proceed with processing data
    +      logDebug("Received batch of " + eventBatch.getEvents.size + " events with sequence number: "
    +        + eventBatch.getSequenceNumber)
    +      Some(eventBatch)
    +    } else {
    +      logWarning(
    --- End diff --
    
    nit: Again, spilling into three lines is weird.
    ```
    logWarning("Did not receive events from Flume agent due to error on the Flume agent: " + 
       eventBatch.getErrorMsg)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2065#issuecomment-53515670
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19250/consoleFull) for   PR 2065 at commit [`445e700`](https://github.com/apache/spark/commit/445e70045380287e72d4aff617bef9565337287c).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ExternalSorter(object):`
      * `protected class AttributeEquals(val a: Attribute) `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2065#issuecomment-52943823
  
    Mostly nits left; looks OK after you look at the `EventBatch` issue I pointed out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16511500
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala ---
    @@ -62,12 +70,16 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
        */
       override def getEventBatch(n: Int): EventBatch = {
         logDebug("Got getEventBatch call from Spark.")
    +    if(stopped) {
    --- End diff --
    
    nit: space after if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2065#issuecomment-52862309
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19020/consoleFull) for   PR 2065 at commit [`598efa7`](https://github.com/apache/spark/commit/598efa7106ffaabbfc0de149825a2c451a216ae1).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16754551
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingRunnable.scala ---
    @@ -0,0 +1,163 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.ArrayBuffer
    +
    +import com.google.common.base.Throwables
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.streaming.flume.sink._
    +
    +private[flume] class FlumePollingRunnable(val parent: FlumePollingReceiver) extends Runnable with
    --- End diff --
    
    Better to call the FlumePollingReceiver object as "receiver" . Parent is kind arbitrary (its not a tree or a DAG like data structure).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16728148
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala ---
    @@ -62,19 +70,24 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
        */
       override def getEventBatch(n: Int): EventBatch = {
         logDebug("Got getEventBatch call from Spark.")
    -    val sequenceNumber = seqBase + seqCounter.incrementAndGet()
    -    val processor = new TransactionProcessor(channel, sequenceNumber,
    -      n, transactionTimeout, backOffInterval, this)
    -    transactionExecutorOpt.foreach(executor => {
    -      executor.submit(processor)
    -    })
    -    // Wait until a batch is available - will be an error if error message is non-empty
    -    val batch = processor.getEventBatch
    -    if (!SparkSinkUtils.isErrorBatch(batch)) {
    -      processorMap.put(sequenceNumber.toString, processor)
    -      logDebug("Sending event batch with sequence number: " + sequenceNumber)
    +    if (stopped) {
    +      new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList())
    --- End diff --
    
    Yeah, I am not concerned about the efficiency. I am only concerned about ease of debugging; whether it will be easier to debug if this is exposed as an error or empty batch. I dont have a very strong opinion here, so either is fine.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16727964
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala ---
    @@ -62,19 +70,24 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
        */
       override def getEventBatch(n: Int): EventBatch = {
         logDebug("Got getEventBatch call from Spark.")
    -    val sequenceNumber = seqBase + seqCounter.incrementAndGet()
    -    val processor = new TransactionProcessor(channel, sequenceNumber,
    -      n, transactionTimeout, backOffInterval, this)
    -    transactionExecutorOpt.foreach(executor => {
    -      executor.submit(processor)
    -    })
    -    // Wait until a batch is available - will be an error if error message is non-empty
    -    val batch = processor.getEventBatch
    -    if (!SparkSinkUtils.isErrorBatch(batch)) {
    -      processorMap.put(sequenceNumber.toString, processor)
    -      logDebug("Sending event batch with sequence number: " + sequenceNumber)
    +    if (stopped) {
    +      new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList())
    --- End diff --
    
    Either is fine. Empty batch might be smaller in terms of the amount of data transferred over the network - since throwing an exception will cause Avro to serialize the entire exception and send it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16685491
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala ---
    @@ -90,53 +93,77 @@ private[streaming] class FlumePollingReceiver(
           // Threads that pull data from Flume.
           receiverExecutor.submit(new Runnable {
             override def run(): Unit = {
    -          while (true) {
    -            val connection = connections.poll()
    -            val client = connection.client
    -            try {
    -              val eventBatch = client.getEventBatch(maxBatchSize)
    -              if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    -                // No error, proceed with processing data
    -                val seq = eventBatch.getSequenceNumber
    -                val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
    -                logDebug(
    -                  "Received batch of " + events.size() + " events with sequence number: " + seq)
    -                try {
    -                  // Convert each Flume event to a serializable SparkFlumeEvent
    -                  val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
    -                  var j = 0
    -                  while (j < events.size()) {
    -                    buffer += toSparkFlumeEvent(events(j))
    -                    j += 1
    +          val loop = new Breaks
    +          loop.breakable {
    +            while (!isStopped()) {
    +              val connection = connections.poll()
    +              val client = connection.client
    +              var batchReceived = false
    +              try {
    +                var eventBatch: EventBatch = null
    +                var seq: CharSequence = null
    +                Try {
    --- End diff --
    
    Yeah, I realized that. It felt okay back then, but now that I am looking through the code again, I am realizing that I am having to completely interpret this mass of code from scratch (since there is no semantically named functions). So its better to break it up now for future understanding.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16549093
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala ---
    @@ -62,12 +70,16 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
        */
       override def getEventBatch(n: Int): EventBatch = {
         logDebug("Got getEventBatch call from Spark.")
    +    if (stopped) {
    +      new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList())
    --- End diff --
    
    Do you mean to return here? If so you need an explicit return or an "else" block.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16758724
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala ---
    @@ -45,7 +47,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
       val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
         new ThreadFactoryBuilder().setDaemon(true)
           .setNameFormat("Spark Sink Processor Thread - %d").build()))
    -  private val processorMap = new ConcurrentHashMap[CharSequence, TransactionProcessor]()
    +  private val sequenceNumberToProcessor = new ConcurrentHashMap[CharSequence,
    --- End diff --
    
    Oh oh, does this go beyond 100 chars?? My bad! I thought it would fit. Then a better line break would be 
    ```
    private val sequenceNumberToProcessor = 
          new ConcurrentHashMap[CharSequence, TransactionProcessor]()
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16758798
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.ArrayBuffer
    +
    +import com.google.common.base.Throwables
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.streaming.flume.sink._
    +
    +private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends Runnable with
    +  Logging {
    +
    +  def run(): Unit = {
    +    while (!receiver.isStopped()) {
    +      val connection = receiver.getConnections.poll()
    +      val client = connection.client
    +      var batchReceived = false
    +      var seq: CharSequence = null
    +      try {
    +        getBatch(client) match {
    +          case Some(eventBatch) =>
    +            batchReceived = true
    +            seq = eventBatch.getSequenceNumber
    +            val events = toSparkFlumeEvents(eventBatch.getEvents)
    +            if (store(events)) {
    +              sendAck(client, seq)
    +            } else {
    +              sendNack(batchReceived, client, seq)
    +            }
    +          case None =>
    +        }
    +      } catch {
    +        case e: Exception =>
    +          Throwables.getRootCause(e) match {
    +            // If the cause was an InterruptedException, then check if the receiver is stopped -
    +            // if yes, just break out of the loop. Else send a Nack and log a warning.
    +            // In the unlikely case, the cause was not an Exception,
    +            // then just throw it out and exit.
    +            case interrupted: InterruptedException =>
    +              if (!receiver.isStopped()) {
    +                logWarning("Interrupted while receiving data from Flume", interrupted)
    +                sendNack(batchReceived, client, seq)
    +              }
    +            case exception: Exception =>
    +              logWarning("Error while receiving data from Flume", exception)
    +              sendNack(batchReceived, client, seq)
    +          }
    +      } finally {
    +        receiver.getConnections.add(connection)
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Gets a batch of events from the specified client. This method does not handle any exceptions
    +   * which will be propogated to the caller.
    +   * @param client Client to get events from
    +   * @return [[Some]] which contains the event batch if Flume sent any events back, else [[None]]
    +   */
    +  private def getBatch(client: SparkFlumeProtocol.Callback): Option[EventBatch] = {
    +    val eventBatch = client.getEventBatch(receiver.getMaxBatchSize)
    +    if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    +      // No error, proceed with processing data
    +      logDebug("Received batch of " + eventBatch.getEvents.size + " events with sequence number: "
    --- End diff --
    
    nit: With so many clauses in the string, its better to use string interpolation.
    `logDebug(s"Received batch of ${eventBatch.getEvents.size} events with sequence number: ${eventBatch.getSequenceNumber}")`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16683481
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala ---
    @@ -90,53 +93,77 @@ private[streaming] class FlumePollingReceiver(
           // Threads that pull data from Flume.
           receiverExecutor.submit(new Runnable {
             override def run(): Unit = {
    -          while (true) {
    -            val connection = connections.poll()
    -            val client = connection.client
    -            try {
    -              val eventBatch = client.getEventBatch(maxBatchSize)
    -              if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    -                // No error, proceed with processing data
    -                val seq = eventBatch.getSequenceNumber
    -                val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
    -                logDebug(
    -                  "Received batch of " + events.size() + " events with sequence number: " + seq)
    -                try {
    -                  // Convert each Flume event to a serializable SparkFlumeEvent
    -                  val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
    -                  var j = 0
    -                  while (j < events.size()) {
    -                    buffer += toSparkFlumeEvent(events(j))
    -                    j += 1
    +          val loop = new Breaks
    +          loop.breakable {
    +            while (!isStopped()) {
    +              val connection = connections.poll()
    +              val client = connection.client
    +              var batchReceived = false
    +              try {
    +                var eventBatch: EventBatch = null
    +                var seq: CharSequence = null
    +                Try {
    --- End diff --
    
    Why are there so many nested Try's and try's in this. The code looks pretty complex and verbose. I strongly recommend taking another pass, may be break the code into smaller functions (with good semantically-meaningful names). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2065#issuecomment-53527293
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19283/consoleFull) for   PR 2065 at commit [`d7427cc`](https://github.com/apache/spark/commit/d7427ccb90f3eb6cffd1b64e65293d3b78ce7f01).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2065#issuecomment-53546318
  
    Alright, tested this. Merging it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16512056
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala ---
    @@ -144,6 +172,23 @@ private[streaming] class FlumePollingReceiver(
         }
       }
     
    +  private def sendNack(batchReceived: Boolean, client: SparkFlumeProtocol.Callback,
    +    seq: CharSequence, exception: Exception): Unit = {
    +    Try {
    +      if (batchReceived) {
    +        // Let Flume know that the events need to be pushed back into the channel.
    +        logDebug("Sending nack for sequence number: " + seq)
    +        client.nack(seq) // If the agent is down, even this could fail and throw
    +        logDebug("Nack sent for sequence number: " + seq)
    +      }
    +    }.recover({
    +      case e: Exception => logError(
    +        "Sending Nack also failed. A Flume agent is down.")
    +    })
    +    TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds.
    --- End diff --
    
    Could you have the comment explain why the sleep is necessary instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16758729
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala ---
    @@ -63,18 +68,34 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
       override def getEventBatch(n: Int): EventBatch = {
         logDebug("Got getEventBatch call from Spark.")
         val sequenceNumber = seqBase + seqCounter.incrementAndGet()
    -    val processor = new TransactionProcessor(channel, sequenceNumber,
    -      n, transactionTimeout, backOffInterval, this)
    -    transactionExecutorOpt.foreach(executor => {
    -      executor.submit(processor)
    -    })
    -    // Wait until a batch is available - will be an error if error message is non-empty
    -    val batch = processor.getEventBatch
    -    if (!SparkSinkUtils.isErrorBatch(batch)) {
    -      processorMap.put(sequenceNumber.toString, processor)
    -      logDebug("Sending event batch with sequence number: " + sequenceNumber)
    +    createProcessor(sequenceNumber, n) match {
    +      case Some(processor) =>
    +        transactionExecutorOpt.foreach(_.submit(processor))
    +        // Wait until a batch is available - will be an error if error message is non-empty
    +        val batch = processor.getEventBatch
    +        if (SparkSinkUtils.isErrorBatch(batch)) {
    +          // Remove the processor if it is an error batch since no ACK is sent.
    +          removeAndGetProcessor(sequenceNumber)
    +          logDebug("Received an error batch - no events were received from channel! ")
    --- End diff --
    
    nit: Shouldnt this be a warning


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16683738
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala ---
    @@ -90,53 +93,77 @@ private[streaming] class FlumePollingReceiver(
           // Threads that pull data from Flume.
           receiverExecutor.submit(new Runnable {
             override def run(): Unit = {
    -          while (true) {
    -            val connection = connections.poll()
    -            val client = connection.client
    -            try {
    -              val eventBatch = client.getEventBatch(maxBatchSize)
    -              if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    -                // No error, proceed with processing data
    -                val seq = eventBatch.getSequenceNumber
    -                val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
    -                logDebug(
    -                  "Received batch of " + events.size() + " events with sequence number: " + seq)
    -                try {
    -                  // Convert each Flume event to a serializable SparkFlumeEvent
    -                  val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
    -                  var j = 0
    -                  while (j < events.size()) {
    -                    buffer += toSparkFlumeEvent(events(j))
    -                    j += 1
    +          val loop = new Breaks
    +          loop.breakable {
    +            while (!isStopped()) {
    +              val connection = connections.poll()
    +              val client = connection.client
    +              var batchReceived = false
    +              try {
    +                var eventBatch: EventBatch = null
    +                var seq: CharSequence = null
    +                Try {
    --- End diff --
    
    The outer try is unfortunately required for the finally. The other Try(s) are simply replacing the existing try-catches which were messier than this. We could break this into methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16756511
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.ArrayBuffer
    +
    +import com.google.common.base.Throwables
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.streaming.flume.sink._
    +
    +private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends Runnable with
    +  Logging {
    +
    +  def run(): Unit = {
    +    while (!receiver.isStopped()) {
    +      val connection = receiver.getConnections.poll()
    --- End diff --
    
    If the client is null the receiver would never have started, since that is when the client is created. It would be null only if the `SpecificRequestor.getClient` threw, which means that the receiver would not start. poll is guaranteed to not throw a non-fatal exception: http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/LinkedBlockingQueue.html#poll()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16754413
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingRunnable.scala ---
    @@ -0,0 +1,163 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.ArrayBuffer
    +
    +import com.google.common.base.Throwables
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.streaming.flume.sink._
    +
    +private[flume] class FlumePollingRunnable(val parent: FlumePollingReceiver) extends Runnable with
    +Logging {
    +
    +  def run(): Unit = {
    +    while (!parent.isStopped()) {
    +      val connection = parent.getConnections.poll()
    +      val client = connection.client
    +      var batchReceived = false
    +      var seq: CharSequence = null
    +      try {
    +        getBatch(client) match {
    +          case Some(eventBatch) =>
    +            batchReceived = true
    +            seq = eventBatch.getSequenceNumber
    +            val events = toSparkFlumeEvents(eventBatch.getEvents)
    +            logDebug(
    +              "Received batch of " + events.size + " events with sequence number: " + seq)
    +            if (store(events)) {
    +              ack(client, seq)
    +            } else {
    +              sendNack(batchReceived, client, seq)
    +            }
    +          case None =>
    +        }
    +      } catch {
    +        case e: Exception =>
    +          Throwables.getRootCause(e) match {
    +            // If the cause was an InterruptedException, then check if the receiver is stopped -
    +            // if yes, just break out of the loop. Else send a Nack and log a warning.
    +            // In the unlikely case, the cause was not an Exception,
    +            // then just throw it out and exit.
    +            case interrupted: InterruptedException =>
    +              if (!parent.isStopped()) {
    +                logWarning("Interrupted while receiving data from Flume", interrupted)
    +                sendNack(batchReceived, client, seq)
    +              }
    +            case exception: Exception =>
    +              logWarning("Error while receiving data from Flume", exception)
    +              sendNack(batchReceived, client, seq)
    +          }
    +      } finally {
    +        parent.getConnections.add(connection)
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Gets a batch of events from the specified client. This method does not handle any exceptions
    +   * which will be propogated to the caller.
    +   * @param client Client to get events from
    +   * @return [[Some]] which contains the event batch if Flume sent any events back, else [[None]]
    +   */
    +  private def getBatch(client: SparkFlumeProtocol.Callback): Option[EventBatch] = {
    +    val eventBatch = client.getEventBatch(parent.getMaxBatchSize)
    +    if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    +      // No error, proceed with processing data
    +      Some(eventBatch)
    +    } else {
    +      logWarning(
    +        "Did not receive events from Flume agent due to error on the Flume " +
    +          "agent: " + eventBatch.getErrorMsg)
    +      None
    +    }
    +  }
    +
    +  /**
    +   * Store the events in the buffer to Spark. This method will not propogate any exceptions,
    +   * but will propogate any other errors.
    +   * @param buffer The buffer to store
    +   * @return true if the data was stored without any exception being thrown, else false
    +   */
    +  private def store(buffer: ArrayBuffer[SparkFlumeEvent]): Boolean = {
    +    try {
    +      parent.store(buffer)
    +      true
    +    } catch {
    +      case e: Exception =>
    +        logWarning("Error while attempting to store data received from Flume", e)
    +        false
    +    }
    +  }
    +
    +  /**
    +   * Send an ack to the client for the sequence number. This method does not handle any exceptions
    +   * which will be propogated to the caller.
    +   * @param client client to send the ack to
    +   * @param seq sequence number of the batch to be ack-ed.
    +   * @return
    +   */
    +  private def ack(client: SparkFlumeProtocol.Callback, seq: CharSequence): Unit = {
    --- End diff --
    
    Name should be "sendAck"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16683530
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala ---
    @@ -124,6 +139,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
        */
       def shutdown() {
         logInfo("Shutting down Spark Avro Callback Handler")
    +    stopped = true
    +    processorsToShutdown.foreach(_.shutdown())
    --- End diff --
    
    Yes, you are right. We can put that code inside a lock to take care of this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16756236
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala ---
    @@ -62,19 +67,30 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
        */
       override def getEventBatch(n: Int): EventBatch = {
         logDebug("Got getEventBatch call from Spark.")
    -    val sequenceNumber = seqBase + seqCounter.incrementAndGet()
    -    val processor = new TransactionProcessor(channel, sequenceNumber,
    -      n, transactionTimeout, backOffInterval, this)
    -    transactionExecutorOpt.foreach(executor => {
    -      executor.submit(processor)
    -    })
    -    // Wait until a batch is available - will be an error if error message is non-empty
    -    val batch = processor.getEventBatch
    -    if (!SparkSinkUtils.isErrorBatch(batch)) {
    -      processorMap.put(sequenceNumber.toString, processor)
    -      logDebug("Sending event batch with sequence number: " + sequenceNumber)
    +    if (stopped) {
    +      new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList())
    +    } else {
    +      val sequenceNumber = seqBase + seqCounter.incrementAndGet()
    +      val processor = getTransactionProcessor(sequenceNumber, n)
    +      transactionExecutorOpt.foreach(_.submit(processor))
    +      // Wait until a batch is available - will be an error if error message is non-empty
    +      val batch = processor.getEventBatch
    +      if (SparkSinkUtils.isErrorBatch(batch)) {
    +        // Remove the processor if it is an error batch since no ACK is sent.
    +        removeAndGetProcessor(sequenceNumber)
    +        logDebug("Sending event batch with sequence number: " + sequenceNumber)
    --- End diff --
    
    What does this debug statement mean? This will get printed only when there is an error. There should be different debug statements in the cases where there is error, and where there isnt.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16549128
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala ---
    @@ -88,6 +90,11 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
         batchAckLatch.countDown()
       }
     
    +  private[flume] def shutdown(): Unit = {
    +    logInfo("Shutting down transaction processor")
    --- End diff --
    
    Do you expect to have many of these runnning at a time? If so I'd demote this to `logDebug`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16752853
  
    --- Diff: external/flume-sink/pom.xml ---
    @@ -71,6 +71,10 @@
           <scope>test</scope>
         </dependency>
         <dependency>
    +      <groupId>org.scala-lang</groupId>
    --- End diff --
    
    @srowen You removed this, right? Whats the harm in keeping this in, as the flume-sink directly depend on scala.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16756253
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala ---
    @@ -62,19 +67,30 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
        */
       override def getEventBatch(n: Int): EventBatch = {
         logDebug("Got getEventBatch call from Spark.")
    -    val sequenceNumber = seqBase + seqCounter.incrementAndGet()
    -    val processor = new TransactionProcessor(channel, sequenceNumber,
    -      n, transactionTimeout, backOffInterval, this)
    -    transactionExecutorOpt.foreach(executor => {
    -      executor.submit(processor)
    -    })
    -    // Wait until a batch is available - will be an error if error message is non-empty
    -    val batch = processor.getEventBatch
    -    if (!SparkSinkUtils.isErrorBatch(batch)) {
    -      processorMap.put(sequenceNumber.toString, processor)
    -      logDebug("Sending event batch with sequence number: " + sequenceNumber)
    +    if (stopped) {
    +      new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList())
    +    } else {
    +      val sequenceNumber = seqBase + seqCounter.incrementAndGet()
    +      val processor = getTransactionProcessor(sequenceNumber, n)
    +      transactionExecutorOpt.foreach(_.submit(processor))
    +      // Wait until a batch is available - will be an error if error message is non-empty
    +      val batch = processor.getEventBatch
    +      if (SparkSinkUtils.isErrorBatch(batch)) {
    +        // Remove the processor if it is an error batch since no ACK is sent.
    +        removeAndGetProcessor(sequenceNumber)
    +        logDebug("Sending event batch with sequence number: " + sequenceNumber)
    --- End diff --
    
    Ha, copy paste error. Fixing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16758766
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.ArrayBuffer
    +
    +import com.google.common.base.Throwables
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.streaming.flume.sink._
    +
    +private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends Runnable with
    --- End diff --
    
    I missed this earlier. Can you add docs on what this class does??


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16758698
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala ---
    @@ -63,18 +68,34 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
       override def getEventBatch(n: Int): EventBatch = {
         logDebug("Got getEventBatch call from Spark.")
         val sequenceNumber = seqBase + seqCounter.incrementAndGet()
    -    val processor = new TransactionProcessor(channel, sequenceNumber,
    -      n, transactionTimeout, backOffInterval, this)
    -    transactionExecutorOpt.foreach(executor => {
    -      executor.submit(processor)
    -    })
    -    // Wait until a batch is available - will be an error if error message is non-empty
    -    val batch = processor.getEventBatch
    -    if (!SparkSinkUtils.isErrorBatch(batch)) {
    -      processorMap.put(sequenceNumber.toString, processor)
    -      logDebug("Sending event batch with sequence number: " + sequenceNumber)
    +    createProcessor(sequenceNumber, n) match {
    +      case Some(processor) =>
    +        transactionExecutorOpt.foreach(_.submit(processor))
    +        // Wait until a batch is available - will be an error if error message is non-empty
    +        val batch = processor.getEventBatch
    +        if (SparkSinkUtils.isErrorBatch(batch)) {
    +          // Remove the processor if it is an error batch since no ACK is sent.
    +          removeAndGetProcessor(sequenceNumber)
    +          logDebug("Received an error batch - no events were received from channel! ")
    +        }
    +        batch
    +      case None =>
    +        new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList())
    +    }
    +  }
    +
    +  private def createProcessor(seq: String, n: Int): Option[TransactionProcessor] = {
    +    sequenceNumberToProcessor.synchronized {
    +      if (!stopped) {
    +        val processor = new
    --- End diff --
    
    nit: this line break can be better.
    ```
    val processor = new TransactionProcessor(
        channel, seq, n, transactionTimeout, backOffInterval, this)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16756404
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.ArrayBuffer
    +
    +import com.google.common.base.Throwables
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.streaming.flume.sink._
    +
    +private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends Runnable with
    +  Logging {
    +
    +  def run(): Unit = {
    +    while (!receiver.isStopped()) {
    +      val connection = receiver.getConnections.poll()
    +      val client = connection.client
    +      var batchReceived = false
    +      var seq: CharSequence = null
    +      try {
    +        getBatch(client) match {
    +          case Some(eventBatch) =>
    +            batchReceived = true
    +            seq = eventBatch.getSequenceNumber
    +            val events = toSparkFlumeEvents(eventBatch.getEvents)
    +            if (store(events)) {
    +              sendAck(client, seq)
    +            } else {
    +              sendNack(batchReceived, client, seq)
    +            }
    +          case None =>
    +        }
    +      } catch {
    +        case e: Exception =>
    +          Throwables.getRootCause(e) match {
    +            // If the cause was an InterruptedException, then check if the receiver is stopped -
    +            // if yes, just break out of the loop. Else send a Nack and log a warning.
    +            // In the unlikely case, the cause was not an Exception,
    +            // then just throw it out and exit.
    +            case interrupted: InterruptedException =>
    +              if (!receiver.isStopped()) {
    +                logWarning("Interrupted while receiving data from Flume", interrupted)
    +                sendNack(batchReceived, client, seq)
    +              }
    +            case exception: Exception =>
    +              logWarning("Error while receiving data from Flume", exception)
    +              sendNack(batchReceived, client, seq)
    +          }
    +      } finally {
    +        receiver.getConnections.add(connection)
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Gets a batch of events from the specified client. This method does not handle any exceptions
    +   * which will be propogated to the caller.
    +   * @param client Client to get events from
    +   * @return [[Some]] which contains the event batch if Flume sent any events back, else [[None]]
    +   */
    +  private def getBatch(client: SparkFlumeProtocol.Callback): Option[EventBatch] = {
    +    val eventBatch = client.getEventBatch(receiver.getMaxBatchSize)
    +    if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    +      // No error, proceed with processing data
    +      logDebug("Received batch of " + eventBatch.getEvents.size + " events with sequence number: "
    +        + eventBatch.getSequenceNumber)
    +      Some(eventBatch)
    +    } else {
    +      logWarning(
    +        "Did not receive events from Flume agent due to error on the Flume " +
    +          "agent: " + eventBatch.getErrorMsg)
    +      None
    +    }
    +  }
    +
    +  /**
    +   * Store the events in the buffer to Spark. This method will not propogate any exceptions,
    +   * but will propogate any other errors.
    +   * @param buffer The buffer to store
    +   * @return true if the data was stored without any exception being thrown, else false
    +   */
    +  private def store(buffer: ArrayBuffer[SparkFlumeEvent]): Boolean = {
    +    try {
    +      receiver.store(buffer)
    +      true
    +    } catch {
    +      case e: Exception =>
    +        logWarning("Error while attempting to store data received from Flume", e)
    +        false
    +    }
    +  }
    +
    +  /**
    +   * Send an ack to the client for the sequence number. This method does not handle any exceptions
    +   * which will be propogated to the caller.
    +   * @param client client to send the ack to
    +   * @param seq sequence number of the batch to be ack-ed.
    +   * @return
    +   */
    +  private def sendAck(client: SparkFlumeProtocol.Callback, seq: CharSequence): Unit = {
    +    logDebug("Sending Nack for sequence number: " + seq)
    --- End diff --
    
    Dude, this line still says "nack" :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16756314
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala ---
    @@ -45,7 +47,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
       val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
         new ThreadFactoryBuilder().setDaemon(true)
           .setNameFormat("Spark Sink Processor Thread - %d").build()))
    -  private val processorMap = new ConcurrentHashMap[CharSequence, TransactionProcessor]()
    +  private val sequenceNumberToProcessor = new
    +      ConcurrentHashMap[CharSequence, TransactionProcessor]()
    --- End diff --
    
    No need to be on a different line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2065#issuecomment-52973226
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19064/consoleFull) for   PR 2065 at commit [`9001d26`](https://github.com/apache/spark/commit/9001d266907c0831a52d548fb809cd8b68e9fc49).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2065#issuecomment-52866621
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19020/consoleFull) for   PR 2065 at commit [`598efa7`](https://github.com/apache/spark/commit/598efa7106ffaabbfc0de149825a2c451a216ae1).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16512028
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala ---
    @@ -90,53 +95,76 @@ private[streaming] class FlumePollingReceiver(
           // Threads that pull data from Flume.
           receiverExecutor.submit(new Runnable {
             override def run(): Unit = {
    -          while (true) {
    -            val connection = connections.poll()
    -            val client = connection.client
    -            try {
    -              val eventBatch = client.getEventBatch(maxBatchSize)
    -              if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    -                // No error, proceed with processing data
    -                val seq = eventBatch.getSequenceNumber
    -                val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
    -                logDebug(
    -                  "Received batch of " + events.size() + " events with sequence number: " + seq)
    -                try {
    -                  // Convert each Flume event to a serializable SparkFlumeEvent
    -                  val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
    -                  var j = 0
    -                  while (j < events.size()) {
    -                    buffer += toSparkFlumeEvent(events(j))
    -                    j += 1
    +          val loop = new Breaks
    +          loop.breakable {
    +            while (!isStopped()) {
    +              val connection = connections.poll()
    +              val client = connection.client
    +              var batchReceived = false
    +              try {
    +                var eventBatch: EventBatch = null
    +                var seq: CharSequence = null
    +                Try {
    +                  eventBatch = client.getEventBatch(maxBatchSize)
    +                  batchReceived = true
    +                  if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    +                    // No error, proceed with processing data
    +                    seq = eventBatch.getSequenceNumber
    +                    val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
    +                    logDebug(
    +                      "Received batch of " + events.size() + " events with sequence number: " + seq)
    +                    // Convert each Flume event to a serializable SparkFlumeEvent
    +                    val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
    +                    var j = 0
    +                    while (j < events.size()) {
    +                      buffer += toSparkFlumeEvent(events(j))
    +                      j += 1
    +                    }
    +                    store(buffer)
    +                    logDebug("Sending ack for sequence number: " + seq)
    +                    // Send an ack to Flume so that Flume discards the events from its channels.
    +                    client.ack(seq)
    +                    logDebug("Ack sent for sequence number: " + seq)
    +                  } else {
    +                    batchReceived = false
    +                    logWarning(
    +                      "Did not receive events from Flume agent due to error on the Flume " +
    +                        "agent: " + eventBatch.getErrorMsg)
                       }
    -                  store(buffer)
    -                  logDebug("Sending ack for sequence number: " + seq)
    -                  // Send an ack to Flume so that Flume discards the events from its channels.
    -                  client.ack(seq)
    -                  logDebug("Ack sent for sequence number: " + seq)
    -                } catch {
    +                }.recover {
                       case e: Exception =>
    -                    try {
    -                      // Let Flume know that the events need to be pushed back into the channel.
    -                      logDebug("Sending nack for sequence number: " + seq)
    -                      client.nack(seq) // If the agent is down, even this could fail and throw
    -                      logDebug("Nack sent for sequence number: " + seq)
    -                    } catch {
    -                      case e: Exception => logError(
    -                        "Sending Nack also failed. A Flume agent is down.")
    +                    // Is the exception an interrupted exception? If yes,
    +                    // check if the receiver was stopped. If the receiver was stopped,
    +                    // simply exit. Else send a Nack and exit.
    +                    if (e.isInstanceOf[InterruptedException]) {
    +                      if (isStopped()) {
    +                        loop.break()
    +                      } else {
    +                        sendNack(batchReceived, client, seq, e)
    +                      }
    +                    }
    +                    // if there is a cause do the same check as above.
    +                    Option(e.getCause) match {
    --- End diff --
    
    This looks a bit ugly with all the duplication. Also, why check only the first cause instead of recursing up to the root cause?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2065#issuecomment-53524905
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19283/consoleFull) for   PR 2065 at commit [`d7427cc`](https://github.com/apache/spark/commit/d7427ccb90f3eb6cffd1b64e65293d3b78ce7f01).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16756273
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala ---
    @@ -62,19 +67,30 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
        */
       override def getEventBatch(n: Int): EventBatch = {
         logDebug("Got getEventBatch call from Spark.")
    -    val sequenceNumber = seqBase + seqCounter.incrementAndGet()
    -    val processor = new TransactionProcessor(channel, sequenceNumber,
    -      n, transactionTimeout, backOffInterval, this)
    -    transactionExecutorOpt.foreach(executor => {
    -      executor.submit(processor)
    -    })
    -    // Wait until a batch is available - will be an error if error message is non-empty
    -    val batch = processor.getEventBatch
    -    if (!SparkSinkUtils.isErrorBatch(batch)) {
    -      processorMap.put(sequenceNumber.toString, processor)
    -      logDebug("Sending event batch with sequence number: " + sequenceNumber)
    +    if (stopped) {
    +      new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList())
    +    } else {
    +      val sequenceNumber = seqBase + seqCounter.incrementAndGet()
    +      val processor = getTransactionProcessor(sequenceNumber, n)
    +      transactionExecutorOpt.foreach(_.submit(processor))
    +      // Wait until a batch is available - will be an error if error message is non-empty
    +      val batch = processor.getEventBatch
    +      if (SparkSinkUtils.isErrorBatch(batch)) {
    +        // Remove the processor if it is an error batch since no ACK is sent.
    +        removeAndGetProcessor(sequenceNumber)
    +        logDebug("Sending event batch with sequence number: " + sequenceNumber)
    +      }
    +      batch
    +    }
    +  }
    +
    +  private def getTransactionProcessor(seq: String, n: Int): TransactionProcessor = {
    --- End diff --
    
    And this returns a processor irrespective of whether the receiver has stopped or not. So the race condition persists.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16685565
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala ---
    @@ -90,53 +93,77 @@ private[streaming] class FlumePollingReceiver(
           // Threads that pull data from Flume.
           receiverExecutor.submit(new Runnable {
             override def run(): Unit = {
    -          while (true) {
    -            val connection = connections.poll()
    -            val client = connection.client
    -            try {
    -              val eventBatch = client.getEventBatch(maxBatchSize)
    -              if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    -                // No error, proceed with processing data
    -                val seq = eventBatch.getSequenceNumber
    -                val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
    -                logDebug(
    -                  "Received batch of " + events.size() + " events with sequence number: " + seq)
    -                try {
    -                  // Convert each Flume event to a serializable SparkFlumeEvent
    -                  val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
    -                  var j = 0
    -                  while (j < events.size()) {
    -                    buffer += toSparkFlumeEvent(events(j))
    -                    j += 1
    +          val loop = new Breaks
    +          loop.breakable {
    +            while (!isStopped()) {
    +              val connection = connections.poll()
    +              val client = connection.client
    +              var batchReceived = false
    +              try {
    +                var eventBatch: EventBatch = null
    --- End diff --
    
    Why have the eventBatch as a var and not a val. It does not seem to be used outside the following `Try` clause.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16754471
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingRunnable.scala ---
    @@ -0,0 +1,163 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.ArrayBuffer
    +
    +import com.google.common.base.Throwables
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.streaming.flume.sink._
    +
    +private[flume] class FlumePollingRunnable(val parent: FlumePollingReceiver) extends Runnable with
    +Logging {
    +
    +  def run(): Unit = {
    +    while (!parent.isStopped()) {
    +      val connection = parent.getConnections.poll()
    +      val client = connection.client
    +      var batchReceived = false
    +      var seq: CharSequence = null
    +      try {
    +        getBatch(client) match {
    +          case Some(eventBatch) =>
    +            batchReceived = true
    +            seq = eventBatch.getSequenceNumber
    +            val events = toSparkFlumeEvents(eventBatch.getEvents)
    +            logDebug(
    +              "Received batch of " + events.size + " events with sequence number: " + seq)
    +            if (store(events)) {
    +              ack(client, seq)
    +            } else {
    +              sendNack(batchReceived, client, seq)
    +            }
    +          case None =>
    +        }
    +      } catch {
    +        case e: Exception =>
    +          Throwables.getRootCause(e) match {
    +            // If the cause was an InterruptedException, then check if the receiver is stopped -
    +            // if yes, just break out of the loop. Else send a Nack and log a warning.
    +            // In the unlikely case, the cause was not an Exception,
    +            // then just throw it out and exit.
    +            case interrupted: InterruptedException =>
    +              if (!parent.isStopped()) {
    +                logWarning("Interrupted while receiving data from Flume", interrupted)
    +                sendNack(batchReceived, client, seq)
    +              }
    +            case exception: Exception =>
    +              logWarning("Error while receiving data from Flume", exception)
    +              sendNack(batchReceived, client, seq)
    +          }
    +      } finally {
    +        parent.getConnections.add(connection)
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Gets a batch of events from the specified client. This method does not handle any exceptions
    +   * which will be propogated to the caller.
    +   * @param client Client to get events from
    +   * @return [[Some]] which contains the event batch if Flume sent any events back, else [[None]]
    +   */
    +  private def getBatch(client: SparkFlumeProtocol.Callback): Option[EventBatch] = {
    +    val eventBatch = client.getEventBatch(parent.getMaxBatchSize)
    +    if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    +      // No error, proceed with processing data
    +      Some(eventBatch)
    +    } else {
    +      logWarning(
    +        "Did not receive events from Flume agent due to error on the Flume " +
    +          "agent: " + eventBatch.getErrorMsg)
    +      None
    +    }
    +  }
    +
    +  /**
    +   * Store the events in the buffer to Spark. This method will not propogate any exceptions,
    +   * but will propogate any other errors.
    +   * @param buffer The buffer to store
    +   * @return true if the data was stored without any exception being thrown, else false
    +   */
    +  private def store(buffer: ArrayBuffer[SparkFlumeEvent]): Boolean = {
    +    try {
    +      parent.store(buffer)
    +      true
    +    } catch {
    +      case e: Exception =>
    +        logWarning("Error while attempting to store data received from Flume", e)
    +        false
    +    }
    +  }
    +
    +  /**
    +   * Send an ack to the client for the sequence number. This method does not handle any exceptions
    +   * which will be propogated to the caller.
    +   * @param client client to send the ack to
    +   * @param seq sequence number of the batch to be ack-ed.
    +   * @return
    +   */
    +  private def ack(client: SparkFlumeProtocol.Callback, seq: CharSequence): Unit = {
    +    logDebug("Sending nack for sequence number: " + seq)
    --- End diff --
    
    The debug statement is wrong! Its not sending nack !!!!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16754872
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingRunnable.scala ---
    @@ -0,0 +1,163 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.ArrayBuffer
    +
    +import com.google.common.base.Throwables
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.streaming.flume.sink._
    +
    +private[flume] class FlumePollingRunnable(val parent: FlumePollingReceiver) extends Runnable with
    +Logging {
    +
    +  def run(): Unit = {
    +    while (!parent.isStopped()) {
    +      val connection = parent.getConnections.poll()
    +      val client = connection.client
    +      var batchReceived = false
    +      var seq: CharSequence = null
    +      try {
    +        getBatch(client) match {
    +          case Some(eventBatch) =>
    +            batchReceived = true
    +            seq = eventBatch.getSequenceNumber
    +            val events = toSparkFlumeEvents(eventBatch.getEvents)
    +            logDebug(
    --- End diff --
    
    It maybe more consistent to move this logDebug into the getBatch function, just like the None case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16755513
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingRunnable.scala ---
    @@ -0,0 +1,163 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.ArrayBuffer
    +
    +import com.google.common.base.Throwables
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.streaming.flume.sink._
    +
    +private[flume] class FlumePollingRunnable(val parent: FlumePollingReceiver) extends Runnable with
    +Logging {
    +
    +  def run(): Unit = {
    +    while (!parent.isStopped()) {
    +      val connection = parent.getConnections.poll()
    +      val client = connection.client
    +      var batchReceived = false
    +      var seq: CharSequence = null
    +      try {
    +        getBatch(client) match {
    +          case Some(eventBatch) =>
    +            batchReceived = true
    +            seq = eventBatch.getSequenceNumber
    +            val events = toSparkFlumeEvents(eventBatch.getEvents)
    +            logDebug(
    +              "Received batch of " + events.size + " events with sequence number: " + seq)
    +            if (store(events)) {
    +              ack(client, seq)
    +            } else {
    +              sendNack(batchReceived, client, seq)
    +            }
    +          case None =>
    +        }
    +      } catch {
    +        case e: Exception =>
    +          Throwables.getRootCause(e) match {
    +            // If the cause was an InterruptedException, then check if the receiver is stopped -
    +            // if yes, just break out of the loop. Else send a Nack and log a warning.
    +            // In the unlikely case, the cause was not an Exception,
    +            // then just throw it out and exit.
    +            case interrupted: InterruptedException =>
    +              if (!parent.isStopped()) {
    +                logWarning("Interrupted while receiving data from Flume", interrupted)
    +                sendNack(batchReceived, client, seq)
    +              }
    +            case exception: Exception =>
    +              logWarning("Error while receiving data from Flume", exception)
    +              sendNack(batchReceived, client, seq)
    +          }
    +      } finally {
    +        parent.getConnections.add(connection)
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Gets a batch of events from the specified client. This method does not handle any exceptions
    +   * which will be propogated to the caller.
    +   * @param client Client to get events from
    +   * @return [[Some]] which contains the event batch if Flume sent any events back, else [[None]]
    +   */
    +  private def getBatch(client: SparkFlumeProtocol.Callback): Option[EventBatch] = {
    +    val eventBatch = client.getEventBatch(parent.getMaxBatchSize)
    +    if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    +      // No error, proceed with processing data
    +      Some(eventBatch)
    +    } else {
    +      logWarning(
    +        "Did not receive events from Flume agent due to error on the Flume " +
    +          "agent: " + eventBatch.getErrorMsg)
    +      None
    +    }
    +  }
    +
    +  /**
    +   * Store the events in the buffer to Spark. This method will not propogate any exceptions,
    +   * but will propogate any other errors.
    +   * @param buffer The buffer to store
    +   * @return true if the data was stored without any exception being thrown, else false
    +   */
    +  private def store(buffer: ArrayBuffer[SparkFlumeEvent]): Boolean = {
    +    try {
    +      parent.store(buffer)
    +      true
    +    } catch {
    +      case e: Exception =>
    +        logWarning("Error while attempting to store data received from Flume", e)
    +        false
    +    }
    +  }
    +
    +  /**
    +   * Send an ack to the client for the sequence number. This method does not handle any exceptions
    +   * which will be propogated to the caller.
    +   * @param client client to send the ack to
    +   * @param seq sequence number of the batch to be ack-ed.
    +   * @return
    +   */
    +  private def ack(client: SparkFlumeProtocol.Callback, seq: CharSequence): Unit = {
    +    logDebug("Sending nack for sequence number: " + seq)
    +    client.ack(seq)
    +    logDebug("Nack sent for sequence number: " + seq)
    +  }
    +
    +  /**
    +   * This method sends a Nack if a batch was received to the client with with the given sequence
    +   * number. Any exceptions thrown by the RPC call is simply thrown out as is - no effort is made
    +   * to handle it.
    +   * @param batchReceived true if a batch was received. If this is false, no nack is sent
    +   * @param client The client to which the nack should be sent
    +   * @param seq The sequence number of the batch that is being nack-ed.
    +   */
    +  private def sendNack(batchReceived: Boolean, client: SparkFlumeProtocol.Callback,
    +    seq: CharSequence): Unit = {
    +    if (batchReceived) {
    +      // Let Flume know that the events need to be pushed back into the channel.
    +      logDebug("Sending nack for sequence number: " + seq)
    +      client.nack(seq) // If the agent is down, even this could fail and throw
    +      logDebug("Nack sent for sequence number: " + seq)
    +    }
    +  }
    +
    +
    +  /**
    +   * Utility method to convert [[SparkSinkEvent]]s to [[SparkFlumeEvent]]s
    +   * @param events - Events to convert to SparkFlumeEvents
    +   * @return - The SparkFlumeEvent generated from SparkSinkEvent
    +   */
    +  private def toSparkFlumeEvents(events: java.util.List[SparkSinkEvent]):
    +    ArrayBuffer[SparkFlumeEvent] = {
    +    // Convert each Flume event to a serializable SparkFlumeEvent
    +    val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
    +    var j = 0
    +    while (j < events.size()) {
    +      val event = events(j)
    +      val sparkFlumeEvent = new SparkFlumeEvent()
    +      sparkFlumeEvent.event.setBody(event.getBody)
    +      sparkFlumeEvent.event.setHeaders(event.getHeaders)
    +      buffer += sparkFlumeEvent
    +      j += 1
    +    }
    +    buffer
    +  }
    +
    --- End diff --
    
    unnecessary empty lines.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16685628
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala ---
    @@ -90,53 +93,77 @@ private[streaming] class FlumePollingReceiver(
           // Threads that pull data from Flume.
           receiverExecutor.submit(new Runnable {
             override def run(): Unit = {
    -          while (true) {
    -            val connection = connections.poll()
    -            val client = connection.client
    -            try {
    -              val eventBatch = client.getEventBatch(maxBatchSize)
    -              if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    -                // No error, proceed with processing data
    -                val seq = eventBatch.getSequenceNumber
    -                val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
    -                logDebug(
    -                  "Received batch of " + events.size() + " events with sequence number: " + seq)
    -                try {
    -                  // Convert each Flume event to a serializable SparkFlumeEvent
    -                  val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
    -                  var j = 0
    -                  while (j < events.size()) {
    -                    buffer += toSparkFlumeEvent(events(j))
    -                    j += 1
    +          val loop = new Breaks
    +          loop.breakable {
    +            while (!isStopped()) {
    +              val connection = connections.poll()
    +              val client = connection.client
    +              var batchReceived = false
    +              try {
    +                var eventBatch: EventBatch = null
    +                var seq: CharSequence = null
    +                Try {
    +                  eventBatch = client.getEventBatch(maxBatchSize)
    +                  batchReceived = true
    +                  if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    +                    // No error, proceed with processing data
    +                    seq = eventBatch.getSequenceNumber
    +                    val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
    +                    logDebug(
    +                      "Received batch of " + events.size() + " events with sequence number: " + seq)
    +                    // Convert each Flume event to a serializable SparkFlumeEvent
    +                    val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
    +                    var j = 0
    +                    while (j < events.size()) {
    +                      buffer += toSparkFlumeEvent(events(j))
    +                      j += 1
    +                    }
    +                    store(buffer)
    +                    logDebug("Sending ack for sequence number: " + seq)
    +                    // Send an ack to Flume so that Flume discards the events from its channels.
    +                    client.ack(seq)
    +                    logDebug("Ack sent for sequence number: " + seq)
    +                  } else {
    +                    batchReceived = false
    +                    logWarning(
    +                      "Did not receive events from Flume agent due to error on the Flume " +
    +                        "agent: " + eventBatch.getErrorMsg)
                       }
    -                  store(buffer)
    -                  logDebug("Sending ack for sequence number: " + seq)
    -                  // Send an ack to Flume so that Flume discards the events from its channels.
    -                  client.ack(seq)
    -                  logDebug("Ack sent for sequence number: " + seq)
    -                } catch {
    +                }.recoverWith {
                       case e: Exception =>
    -                    try {
    -                      // Let Flume know that the events need to be pushed back into the channel.
    -                      logDebug("Sending nack for sequence number: " + seq)
    -                      client.nack(seq) // If the agent is down, even this could fail and throw
    -                      logDebug("Nack sent for sequence number: " + seq)
    -                    } catch {
    -                      case e: Exception => logError(
    -                        "Sending Nack also failed. A Flume agent is down.")
    +                    Try {
    +                      Throwables.getRootCause(e) match {
    +                        // If the cause was an InterruptedException,
    +                        // then check if the receiver is stopped - if yes,
    +                        // just break out of the loop. Else send a Nack and
    +                        // log a warning.
    +                        // In the unlikely case, the cause was not an Exception,
    +                        // then just throw it out and exit.
    +                        case interrupted: InterruptedException =>
    +                          if (isStopped()) {
    +                            loop.break()
    +                          } else {
    +                            logWarning("Interrupted while receiving data from Flume", interrupted)
    +                            sendNack(batchReceived, client, seq)
    +                          }
    +                        case exception: Exception =>
    +                          logWarning("Error while receiving data from Flume", exception)
    +                          sendNack(batchReceived, client, seq)
    +                        case majorError: Throwable =>
    --- End diff --
    
    Do not catch Throwables. Scala subsystem often uses throwables for variale control flow, and catch throwable can have unpredictable consequences.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16754580
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingRunnable.scala ---
    @@ -0,0 +1,163 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.ArrayBuffer
    +
    +import com.google.common.base.Throwables
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.streaming.flume.sink._
    +
    +private[flume] class FlumePollingRunnable(val parent: FlumePollingReceiver) extends Runnable with
    --- End diff --
    
    And this does not need to be `val` as it is not accessed outside this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16756399
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.ArrayBuffer
    +
    +import com.google.common.base.Throwables
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.streaming.flume.sink._
    +
    +private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends Runnable with
    +  Logging {
    +
    +  def run(): Unit = {
    +    while (!receiver.isStopped()) {
    +      val connection = receiver.getConnections.poll()
    +      val client = connection.client
    +      var batchReceived = false
    +      var seq: CharSequence = null
    +      try {
    +        getBatch(client) match {
    +          case Some(eventBatch) =>
    +            batchReceived = true
    +            seq = eventBatch.getSequenceNumber
    +            val events = toSparkFlumeEvents(eventBatch.getEvents)
    +            if (store(events)) {
    +              sendAck(client, seq)
    +            } else {
    +              sendNack(batchReceived, client, seq)
    +            }
    +          case None =>
    +        }
    +      } catch {
    +        case e: Exception =>
    +          Throwables.getRootCause(e) match {
    +            // If the cause was an InterruptedException, then check if the receiver is stopped -
    +            // if yes, just break out of the loop. Else send a Nack and log a warning.
    +            // In the unlikely case, the cause was not an Exception,
    +            // then just throw it out and exit.
    +            case interrupted: InterruptedException =>
    +              if (!receiver.isStopped()) {
    +                logWarning("Interrupted while receiving data from Flume", interrupted)
    +                sendNack(batchReceived, client, seq)
    +              }
    +            case exception: Exception =>
    +              logWarning("Error while receiving data from Flume", exception)
    +              sendNack(batchReceived, client, seq)
    +          }
    +      } finally {
    +        receiver.getConnections.add(connection)
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Gets a batch of events from the specified client. This method does not handle any exceptions
    +   * which will be propogated to the caller.
    +   * @param client Client to get events from
    +   * @return [[Some]] which contains the event batch if Flume sent any events back, else [[None]]
    +   */
    +  private def getBatch(client: SparkFlumeProtocol.Callback): Option[EventBatch] = {
    +    val eventBatch = client.getEventBatch(receiver.getMaxBatchSize)
    +    if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    +      // No error, proceed with processing data
    +      logDebug("Received batch of " + eventBatch.getEvents.size + " events with sequence number: "
    +        + eventBatch.getSequenceNumber)
    +      Some(eventBatch)
    +    } else {
    +      logWarning(
    +        "Did not receive events from Flume agent due to error on the Flume " +
    +          "agent: " + eventBatch.getErrorMsg)
    +      None
    +    }
    +  }
    +
    +  /**
    +   * Store the events in the buffer to Spark. This method will not propogate any exceptions,
    +   * but will propogate any other errors.
    +   * @param buffer The buffer to store
    +   * @return true if the data was stored without any exception being thrown, else false
    +   */
    +  private def store(buffer: ArrayBuffer[SparkFlumeEvent]): Boolean = {
    +    try {
    +      receiver.store(buffer)
    +      true
    +    } catch {
    +      case e: Exception =>
    +        logWarning("Error while attempting to store data received from Flume", e)
    +        false
    +    }
    +  }
    +
    +  /**
    +   * Send an ack to the client for the sequence number. This method does not handle any exceptions
    +   * which will be propogated to the caller.
    --- End diff --
    
    propogated --> propagated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16511729
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala ---
    @@ -115,7 +122,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
                       ByteBuffer.wrap(event.getBody)))
                     gotEventsInThisTxn = true
                   case None =>
    -                if (!gotEventsInThisTxn) {
    +                if (!gotEventsInThisTxn && !stopped) {
                       logDebug("Sleeping for " + backOffInterval + " millis as no events were read in" +
                         " the current transaction")
                       TimeUnit.MILLISECONDS.sleep(backOffInterval)
    --- End diff --
    
    Since you're changing `shutdownNow()` to `shutdown()` in SparkAvroCallbackHandler.scala, I'd suggest creating a dummy object and use `Object.wait()` here, and `Object.notifyAll()` in the shutdown method above, so that sleeping tasks are woken up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16512127
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala ---
    @@ -90,53 +95,76 @@ private[streaming] class FlumePollingReceiver(
           // Threads that pull data from Flume.
           receiverExecutor.submit(new Runnable {
             override def run(): Unit = {
    -          while (true) {
    -            val connection = connections.poll()
    -            val client = connection.client
    -            try {
    -              val eventBatch = client.getEventBatch(maxBatchSize)
    -              if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    -                // No error, proceed with processing data
    -                val seq = eventBatch.getSequenceNumber
    -                val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
    -                logDebug(
    -                  "Received batch of " + events.size() + " events with sequence number: " + seq)
    -                try {
    -                  // Convert each Flume event to a serializable SparkFlumeEvent
    -                  val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
    -                  var j = 0
    -                  while (j < events.size()) {
    -                    buffer += toSparkFlumeEvent(events(j))
    -                    j += 1
    +          val loop = new Breaks
    +          loop.breakable {
    +            while (!isStopped()) {
    +              val connection = connections.poll()
    +              val client = connection.client
    +              var batchReceived = false
    +              try {
    +                var eventBatch: EventBatch = null
    +                var seq: CharSequence = null
    +                Try {
    +                  eventBatch = client.getEventBatch(maxBatchSize)
    +                  batchReceived = true
    +                  if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    +                    // No error, proceed with processing data
    +                    seq = eventBatch.getSequenceNumber
    +                    val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
    +                    logDebug(
    +                      "Received batch of " + events.size() + " events with sequence number: " + seq)
    +                    // Convert each Flume event to a serializable SparkFlumeEvent
    +                    val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
    +                    var j = 0
    +                    while (j < events.size()) {
    +                      buffer += toSparkFlumeEvent(events(j))
    +                      j += 1
    +                    }
    +                    store(buffer)
    +                    logDebug("Sending ack for sequence number: " + seq)
    +                    // Send an ack to Flume so that Flume discards the events from its channels.
    +                    client.ack(seq)
    +                    logDebug("Ack sent for sequence number: " + seq)
    +                  } else {
    +                    batchReceived = false
    +                    logWarning(
    +                      "Did not receive events from Flume agent due to error on the Flume " +
    +                        "agent: " + eventBatch.getErrorMsg)
                       }
    -                  store(buffer)
    -                  logDebug("Sending ack for sequence number: " + seq)
    -                  // Send an ack to Flume so that Flume discards the events from its channels.
    -                  client.ack(seq)
    -                  logDebug("Ack sent for sequence number: " + seq)
    -                } catch {
    +                }.recover {
                       case e: Exception =>
    -                    try {
    -                      // Let Flume know that the events need to be pushed back into the channel.
    -                      logDebug("Sending nack for sequence number: " + seq)
    -                      client.nack(seq) // If the agent is down, even this could fail and throw
    -                      logDebug("Nack sent for sequence number: " + seq)
    -                    } catch {
    -                      case e: Exception => logError(
    -                        "Sending Nack also failed. A Flume agent is down.")
    +                    // Is the exception an interrupted exception? If yes,
    +                    // check if the receiver was stopped. If the receiver was stopped,
    +                    // simply exit. Else send a Nack and exit.
    +                    if (e.isInstanceOf[InterruptedException]) {
    +                      if (isStopped()) {
    +                        loop.break()
    +                      } else {
    +                        sendNack(batchReceived, client, seq, e)
    +                      }
    +                    }
    +                    // if there is a cause do the same check as above.
    +                    Option(e.getCause) match {
    --- End diff --
    
    If the interrupt comes in at the time of an Avro RPC call, then an AvroRemoteException with InterruptedException as cause gets thrown. I am basically checking for InterruptedException and any exception which was caused by InterruptedException.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16753967
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala ---
    @@ -124,6 +155,13 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
        */
       def shutdown() {
         logInfo("Shutting down Spark Avro Callback Handler")
    +    stopped = true
    +    activeProcessorMapLock.lock()
    +    try {
    +      activeProcessors.foreach(_.shutdown())
    +    } finally {
    +      activeProcessorMapLock.unlock()
    --- End diff --
    
    There can still be a race condition, I think! Another thread can stay stuck in line 81, while the shutting threads enter critical sections, and marks all the processors for shutdown. Then when shutting thread exits critical section, the other thread (stuck in 81) will enter its ciritical and create a new transaction processor. 
    
    Isnt it?
    
    More fundamentally, why do we even need the activeProcessors? We can added all the processors created to the `sequenceNumberToProcessor` map, even before the processors has been submitted for populating events. At the time of shutdown, we can do `sequenceNumberToProcessor.values.foreach(_.shutdown)` to shutdown all of them. This would be easier to reason with regarding race conditions, because then we dont have to add processor in two different data structures (`activeProcessors` and `sequenceNumberToProcessor`), in two different steps (before populateEvents and after). 
    
    And in that case it become easier to synchronize. Let there be two methods with the following semantics.
    ```
    def createProcessor(): Option[TransactionProcessor] = sequenceNumberToProcessor.synchronized {
        if (!stopped) {
            // create processor, add to the map, and return Some(proc)
        } else None.
    }
    
    def removeAndGetProcessor(seqNum: CharSequence): TransactionProcessor = sequenceNumberToProcessor.synchronized  {
        // remove and return within the lock
    }
    
    def shutdown(): Unit = {
         sequenceNumberToProcessor.synchronized {
                 stopped = true
                 // shutdown all the processors
         }
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2065#issuecomment-53530019
  
    This now looks quite good to me. There were a few more formatting issues, should take  5 minutes to solve :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16682789
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala ---
    @@ -46,6 +48,10 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
         new ThreadFactoryBuilder().setDaemon(true)
           .setNameFormat("Spark Sink Processor Thread - %d").build()))
       private val processorMap = new ConcurrentHashMap[CharSequence, TransactionProcessor]()
    --- End diff --
    
    For better code understanding, it is better to call this `sequenceNumberToProcessor` (this style is often used in Spark, see DAGScheduler) and the `processorsToShutdown` to be called `activeProcessors`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16682810
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala ---
    @@ -62,19 +70,24 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
        */
       override def getEventBatch(n: Int): EventBatch = {
         logDebug("Got getEventBatch call from Spark.")
    -    val sequenceNumber = seqBase + seqCounter.incrementAndGet()
    -    val processor = new TransactionProcessor(channel, sequenceNumber,
    -      n, transactionTimeout, backOffInterval, this)
    -    transactionExecutorOpt.foreach(executor => {
    -      executor.submit(processor)
    -    })
    -    // Wait until a batch is available - will be an error if error message is non-empty
    -    val batch = processor.getEventBatch
    -    if (!SparkSinkUtils.isErrorBatch(batch)) {
    -      processorMap.put(sequenceNumber.toString, processor)
    -      logDebug("Sending event batch with sequence number: " + sequenceNumber)
    +    if (stopped) {
    +      new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList())
    --- End diff --
    
    Should it return an empty batch, or no batch at all (that is, throw an error saying shutdown in progress)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2065#issuecomment-53524470
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19280/consoleFull) for   PR 2065 at commit [`a0a8852`](https://github.com/apache/spark/commit/a0a88526d0534446e0b19f3646381b05c69e7ce3).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16755731
  
    --- Diff: yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---
    @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
     import org.apache.hadoop.yarn.ipc.YarnRPC
     import org.apache.hadoop.yarn.util.Records
     
    +import org.apache.spark.deploy.ClientArguments
    --- End diff --
    
    Intellij :|


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16756411
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.ArrayBuffer
    +
    +import com.google.common.base.Throwables
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.streaming.flume.sink._
    +
    +private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends Runnable with
    +  Logging {
    +
    +  def run(): Unit = {
    +    while (!receiver.isStopped()) {
    +      val connection = receiver.getConnections.poll()
    +      val client = connection.client
    +      var batchReceived = false
    +      var seq: CharSequence = null
    +      try {
    +        getBatch(client) match {
    +          case Some(eventBatch) =>
    +            batchReceived = true
    +            seq = eventBatch.getSequenceNumber
    +            val events = toSparkFlumeEvents(eventBatch.getEvents)
    +            if (store(events)) {
    +              sendAck(client, seq)
    +            } else {
    +              sendNack(batchReceived, client, seq)
    +            }
    +          case None =>
    +        }
    +      } catch {
    +        case e: Exception =>
    +          Throwables.getRootCause(e) match {
    +            // If the cause was an InterruptedException, then check if the receiver is stopped -
    +            // if yes, just break out of the loop. Else send a Nack and log a warning.
    +            // In the unlikely case, the cause was not an Exception,
    +            // then just throw it out and exit.
    +            case interrupted: InterruptedException =>
    +              if (!receiver.isStopped()) {
    +                logWarning("Interrupted while receiving data from Flume", interrupted)
    +                sendNack(batchReceived, client, seq)
    +              }
    +            case exception: Exception =>
    +              logWarning("Error while receiving data from Flume", exception)
    +              sendNack(batchReceived, client, seq)
    +          }
    +      } finally {
    +        receiver.getConnections.add(connection)
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Gets a batch of events from the specified client. This method does not handle any exceptions
    +   * which will be propogated to the caller.
    +   * @param client Client to get events from
    +   * @return [[Some]] which contains the event batch if Flume sent any events back, else [[None]]
    +   */
    +  private def getBatch(client: SparkFlumeProtocol.Callback): Option[EventBatch] = {
    +    val eventBatch = client.getEventBatch(receiver.getMaxBatchSize)
    +    if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    +      // No error, proceed with processing data
    +      logDebug("Received batch of " + eventBatch.getEvents.size + " events with sequence number: "
    +        + eventBatch.getSequenceNumber)
    +      Some(eventBatch)
    +    } else {
    +      logWarning(
    +        "Did not receive events from Flume agent due to error on the Flume " +
    +          "agent: " + eventBatch.getErrorMsg)
    +      None
    +    }
    +  }
    +
    +  /**
    +   * Store the events in the buffer to Spark. This method will not propogate any exceptions,
    +   * but will propogate any other errors.
    +   * @param buffer The buffer to store
    +   * @return true if the data was stored without any exception being thrown, else false
    +   */
    +  private def store(buffer: ArrayBuffer[SparkFlumeEvent]): Boolean = {
    +    try {
    +      receiver.store(buffer)
    +      true
    +    } catch {
    +      case e: Exception =>
    +        logWarning("Error while attempting to store data received from Flume", e)
    +        false
    +    }
    +  }
    +
    +  /**
    +   * Send an ack to the client for the sequence number. This method does not handle any exceptions
    +   * which will be propogated to the caller.
    +   * @param client client to send the ack to
    +   * @param seq sequence number of the batch to be ack-ed.
    +   * @return
    +   */
    +  private def sendAck(client: SparkFlumeProtocol.Callback, seq: CharSequence): Unit = {
    +    logDebug("Sending Nack for sequence number: " + seq)
    +    client.ack(seq)
    +    logDebug("Ack sent for sequence number: " + seq)
    +  }
    +
    +  /**
    +   * This method sends a Nack if a batch was received to the client with with the given sequence
    --- End diff --
    
    Double with in the docs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16754609
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingRunnable.scala ---
    @@ -0,0 +1,163 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.flume
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.ArrayBuffer
    +
    +import com.google.common.base.Throwables
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.streaming.flume.sink._
    +
    +private[flume] class FlumePollingRunnable(val parent: FlumePollingReceiver) extends Runnable with
    --- End diff --
    
    Does a name `FlumeBatchGetter` sound more semantically meaningful to what this class does?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16754092
  
    --- Diff: external/flume-sink/pom.xml ---
    @@ -71,6 +71,10 @@
           <scope>test</scope>
         </dependency>
         <dependency>
    +      <groupId>org.scala-lang</groupId>
    --- End diff --
    
    The thing is this project does not depend on spark-core, its standalone. So it actually makes sense to add scala-library directly. However, the current code surprisingly compiles (both sbt and maven) even if we dont have scala-library as the dependency. Probably because scala compiler automatically brings in scala. 
    
    I am of the opinion that we should add this back in, but ensure that this has the same version of scala as the spark core. Can you take a look at this, @srowen . 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16683745
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala ---
    @@ -90,53 +93,77 @@ private[streaming] class FlumePollingReceiver(
           // Threads that pull data from Flume.
           receiverExecutor.submit(new Runnable {
    --- End diff --
    
    Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16553665
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala ---
    @@ -144,6 +168,21 @@ private[streaming] class FlumePollingReceiver(
         }
       }
     
    +  private def sendNack(batchReceived: Boolean, client: SparkFlumeProtocol.Callback,
    +    seq: CharSequence): Unit = {
    +    Try {
    --- End diff --
    
    Other than the code being a bit cleaner -- not really I guess.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16549194
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala ---
    @@ -18,23 +18,27 @@ package org.apache.spark.streaming.flume
     
     
     import java.net.InetSocketAddress
    -import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, Executors}
    +import java.util.concurrent.{TimeUnit, LinkedBlockingQueue, Executors}
    +
    +import com.google.common.base.Throwables
    --- End diff --
    
    super nit: I think this comes after the scala imports (see where ThreadFactoryBuilder is).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16752958
  
    --- Diff: external/flume-sink/pom.xml ---
    @@ -71,6 +71,10 @@
           <scope>test</scope>
         </dependency>
         <dependency>
    +      <groupId>org.scala-lang</groupId>
    --- End diff --
    
    Yes, so does the whole project. The convention seems to be to let it come in from spark-core. I sort of remember that this caused some problem that looked like a Scala compiler problem -- that is, shouldn't be a problem, but was tickling some issue. Let me see if I can dig that out, or whether I'm misremembering. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16511761
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala ---
    @@ -47,11 +52,11 @@ import org.apache.spark.streaming.flume.sink._
      * @tparam T Class type of the object of this stream
      */
     private[streaming] class FlumePollingInputDStream[T: ClassTag](
    -    @transient _ssc: StreamingContext,
    -    val addresses: Seq[InetSocketAddress],
    -    val maxBatchSize: Int,
    -    val parallelism: Int,
    -    storageLevel: StorageLevel
    +  @transient _ssc: StreamingContext,
    --- End diff --
    
    nit: 4 spaces here was correct according to the Spark style conventions. Also below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16755514
  
    --- Diff: yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---
    @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
     import org.apache.hadoop.yarn.ipc.YarnRPC
     import org.apache.hadoop.yarn.util.Records
     
    +import org.apache.spark.deploy.ClientArguments
    --- End diff --
    
    Why this change!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16683523
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala ---
    @@ -90,53 +93,77 @@ private[streaming] class FlumePollingReceiver(
           // Threads that pull data from Flume.
           receiverExecutor.submit(new Runnable {
    --- End diff --
    
    This runnable is pretty big and verbose. It will cleaner to make this separate class. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16683031
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala ---
    @@ -124,6 +139,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
        */
       def shutdown() {
         logInfo("Shutting down Spark Avro Callback Handler")
    +    stopped = true
    +    processorsToShutdown.foreach(_.shutdown())
    --- End diff --
    
    There maybe a race condition here. A thread that might have called `getEventBatch()` and created a processor, before the shutting down thread (i.e., the one that called `shutdown()`) marks `stopped` as true. But that thread may not have added the processor to `processorsToShutdown` yet, and the shutting down thread may have already completed the shutdown function. In that case, there is a zombie processor that is never shutdown. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16756336
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala ---
    @@ -62,19 +67,30 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
        */
       override def getEventBatch(n: Int): EventBatch = {
         logDebug("Got getEventBatch call from Spark.")
    -    val sequenceNumber = seqBase + seqCounter.incrementAndGet()
    -    val processor = new TransactionProcessor(channel, sequenceNumber,
    -      n, transactionTimeout, backOffInterval, this)
    -    transactionExecutorOpt.foreach(executor => {
    -      executor.submit(processor)
    -    })
    -    // Wait until a batch is available - will be an error if error message is non-empty
    -    val batch = processor.getEventBatch
    -    if (!SparkSinkUtils.isErrorBatch(batch)) {
    -      processorMap.put(sequenceNumber.toString, processor)
    -      logDebug("Sending event batch with sequence number: " + sequenceNumber)
    +    if (stopped) {
    +      new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList())
    +    } else {
    +      val sequenceNumber = seqBase + seqCounter.incrementAndGet()
    +      val processor = getTransactionProcessor(sequenceNumber, n)
    +      transactionExecutorOpt.foreach(_.submit(processor))
    +      // Wait until a batch is available - will be an error if error message is non-empty
    +      val batch = processor.getEventBatch
    +      if (SparkSinkUtils.isErrorBatch(batch)) {
    +        // Remove the processor if it is an error batch since no ACK is sent.
    +        removeAndGetProcessor(sequenceNumber)
    +        logDebug("Sending event batch with sequence number: " + sequenceNumber)
    +      }
    +      batch
    +    }
    +  }
    +
    +  private def getTransactionProcessor(seq: String, n: Int): TransactionProcessor = {
    --- End diff --
    
    Also, the name is inconsistent. The TransactionProcessor objects are referred to as Processor (not TransactionProcessor) every where in this class. E.g. `removeAndGetProcessor`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16755614
  
    --- Diff: external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala ---
    @@ -124,6 +155,13 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
        */
       def shutdown() {
         logInfo("Shutting down Spark Avro Callback Handler")
    +    stopped = true
    +    activeProcessorMapLock.lock()
    +    try {
    +      activeProcessors.foreach(_.shutdown())
    +    } finally {
    +      activeProcessorMapLock.unlock()
    --- End diff --
    
    Makes sense. We though need to handle an error batch - so when we get an error batch, we must remove the receiver from the map.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org