You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tmalaska <gi...@git.apache.org> on 2014/04/14 19:38:17 UTC

[GitHub] spark pull request: SPARK-1478

GitHub user tmalaska opened a pull request:

    https://github.com/apache/spark/pull/405

    SPARK-1478

    Initial Version

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tmalaska/spark master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/405.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #405
    
----
commit c433827db5dfda6f5b1b6aa11e45447525b4aac4
Author: tmalaska <te...@cloudera.com>
Date:   2014-04-14T17:37:01Z

    SPARK-1478

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1478

Posted by tmalaska <gi...@git.apache.org>.
Github user tmalaska commented on the pull request:

    https://github.com/apache/spark/pull/405#issuecomment-41671506
  
    As soon as I figure out how.  I will look into it after work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1478

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/405#discussion_r12035544
  
    --- Diff: external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala ---
    @@ -49,9 +63,20 @@ class FlumeStreamSuite extends TestSuiteBase {
         val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
         val input = Seq(1, 2, 3, 4, 5)
         Thread.sleep(1000)
    -    val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
    -    val client = SpecificRequestor.getClient(
    -      classOf[AvroSourceProtocol], transceiver)
    +      
    +    var client: AvroSourceProtocol = null;
    +    
    +    if (enableCompression) {
    --- End diff --
    
    The Scala way of writing this would be 
    ```
    val client: AvroSourceProtocol = {
       if (enableCompression) {
          SpecificRequester.getClient( .... )
       } else {
          SpecificRequester.getClieant(.....)
       }
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1478

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/405#issuecomment-40416779
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14118/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1478

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/405#issuecomment-40416751
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1478

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/405#issuecomment-41524940
  
    Aaah, crap, I did not realize that you submitted a new PR #566  for SPARK-1478! 
    Can you please close this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1478

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/405#issuecomment-40416735
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1478

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/405#discussion_r12035504
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala ---
    @@ -36,7 +37,30 @@ object FlumeUtils {
           port: Int,
           storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
         ): DStream[SparkFlumeEvent] = {
    -    val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
    +    createStream(ssc, hostname, port, storageLevel, false);
    +  }
    +  
    +  /**
    +   * Create a input stream from a Flume source.
    +   * @param ssc      StreamingContext object
    +   * @param hostname Hostname of the slave machine to which the flume data will be sent
    +   * @param port     Port of the slave machine to which the flume data will be sent
    +   * @param storageLevel  Storage level to use for storing the received objects
    +   * @param enableCompression  Should Netty Server decode input stream from client  
    --- End diff --
    
    Same comment as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1478

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/405#issuecomment-40416619
  
    Jenkins, test this please. @tmalaska mind updating the title of the PR to include the title of the JIRA? It makes it easier when scanning the (long list) of active pull requests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1478

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/405#discussion_r12035567
  
    --- Diff: project/SparkBuild.scala ---
    @@ -556,7 +556,7 @@ object SparkBuild extends Build {
         name := "spark-streaming-flume",
         previousArtifact := sparkPreviousArtifact("spark-streaming-flume"),
         libraryDependencies ++= Seq(
    -      "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty)
    +      "org.apache.flume" % "flume-ng-sdk" % "1.3.0" % "compile" excludeAll(excludeNetty)
    --- End diff --
    
    This should be 1.4 if I am not mistaken, please merge with master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1478

Posted by tmalaska <gi...@git.apache.org>.
Github user tmalaska commented on the pull request:

    https://github.com/apache/spark/pull/405#issuecomment-40421425
  
    Yeah no problem.  Thanks for taking the time to review my code.  This is my first time committing with Scala :)
    
    Just let me know when ( #300 ) is done and I will re check out.  Also when you have time I would love to know how else I could help.
    
    I was thinking of adding :
    - encryption to the Flume Stream as is in Flume 1.4.0.
    - Fail recover support when a Flume Stream host goes down and Spark starts up the Flume Stream on another node.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1478

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/405#issuecomment-40395599
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1478

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/405#discussion_r12035560
  
    --- Diff: external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala ---
    @@ -83,4 +108,16 @@ class FlumeStreamSuite extends TestSuiteBase {
           assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
         }
       }
    +  
    +  class CompressionChannelFactory(compressionLevel: Int) extends
    +      NioClientSocketChannelFactory {
    --- End diff --
    
    No need to wrap the line here. Usually, it is either
    ```
    class X extends Y
    ```
    or
    ```
    class X 
      extends Y
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1478

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/405#discussion_r12035517
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala ---
    @@ -66,6 +90,28 @@ object FlumeUtils {
           port: Int,
           storageLevel: StorageLevel
         ): JavaDStream[SparkFlumeEvent] = {
    -    createStream(jssc.ssc, hostname, port, storageLevel)
    +    createStream(jssc.ssc, hostname, port, storageLevel, false)
    +  }
    +  
    +  /**
    +   * Creates a input stream from a Flume source.
    +   * @param hostname Hostname of the slave machine to which the flume data will be sent
    +   * @param port     Port of the slave machine to which the flume data will be sent
    +   * @param storageLevel  Storage level to use for storing the received objects
    +   * @param enableCompression  Should Netty Server decode input stream from client 
    --- End diff --
    
    same comment as above, for enableCompression.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1478

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/405#issuecomment-41524436
  
    @tmalaska You probably missed Patrick's comment about updating the title of the PR. :) Please do update it with the title in the JIRA, so that its easier to identify in git's logs.
    
    Also, please update your branch by merging the master branch. I still see flume version 1.3 in the diff, which i expected to be 1.4.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1478

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/405#issuecomment-40420675
  
    @tmalaska I did a cursory pass, this looks good. I will do a more detailed pass soon. However, there something you should know. I am in the middle of a PR ( #300 ) that tweaks the receiver API a little bit for greater stability and so a bit of your code will have a to change a little. This should go in pretty soon (couple of days, max).  The PR has the changes necessary for the current FlumeReceiver.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1478

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/405#discussion_r12035485
  
    --- Diff: external/flume/pom.xml ---
    @@ -61,7 +61,7 @@
         <dependency>
           <groupId>org.apache.flume</groupId>
           <artifactId>flume-ng-sdk</artifactId>
    -      <version>1.2.0</version>
    +      <version>1.3.0</version>
    --- End diff --
    
    Still 1.3? Have you merged master branch with your branch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1478

Posted by tmalaska <gi...@git.apache.org>.
Github user tmalaska closed the pull request at:

    https://github.com/apache/spark/pull/405


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1478

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/405#discussion_r12035512
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala ---
    @@ -29,6 +29,7 @@ object FlumeUtils {
        * @param hostname Hostname of the slave machine to which the flume data will be sent
        * @param port     Port of the slave machine to which the flume data will be sent
        * @param storageLevel  Storage level to use for storing the received objects
    +   * @param enableCompression  Should Netty Server decode input stream from client  
    --- End diff --
    
    Also, this parameter does exist in this version of the createStream method!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1478

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/405#issuecomment-40416778
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1478

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/405#discussion_r12035502
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala ---
    @@ -29,6 +29,7 @@ object FlumeUtils {
        * @param hostname Hostname of the slave machine to which the flume data will be sent
        * @param port     Port of the slave machine to which the flume data will be sent
        * @param storageLevel  Storage level to use for storing the received objects
    +   * @param enableCompression  Should Netty Server decode input stream from client  
    --- End diff --
    
    I am a little confused. What does "should netty server decode input stream" have to do with "compression" ? Maybe you wanted to say "should netty server decompress input stream" ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---