You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tdas <gi...@git.apache.org> on 2016/07/13 18:58:54 UTC

[GitHub] spark pull request #14183: [SPARK-16114] [SQL] updated structured streaming ...

Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14183#discussion_r70687380
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -626,52 +626,95 @@ The result tables would look something like the following.
     
     ![Window Operations](img/structured-streaming-window.png)
     
    -Since this windowing is similar to grouping, in code, you can use `groupBy()` and `window()` operations to express windowed aggregations.
    +Since this windowing is similar to grouping, in code, you can use `groupBy()` and `window()` operations to express windowed aggregations. You can see the full code for the below examples in
    +[Scala]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala)/
    +[Java]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java)/
    +[Python]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py).
     
     <div class="codetabs">
     <div data-lang="scala"  markdown="1">
     
     {% highlight scala %}
    -// Number of events in every 1 minute time windows
    -df.groupBy(window(df.col("time"), "1 minute"))
    -  .count()
    -
    +import spark.implicits._
     
    -// Average number of events for each device type in every 1 minute time windows
    -df.groupBy(
    -     df.col("type"),
    -     window(df.col("time"), "1 minute"))
    -  .avg("signal")
    +// Create DataFrame representing the stream of input lines from connection to host:port
    +val lines = spark.readStream
    +  .format("socket")
    +  .option("host", "localhost")
    +  .option("port", 9999)
    +  .option("includeTimestamp", true)
    +  .load().as[(String, Timestamp)]
    +
    +// Split the lines into words, retaining timestamps
    +val words = lines.flatMap(line =>
    +  line._1.split(" ").map(word => (word, line._2))
    +).toDF("word", "timestamp")
    +
    +// Group the data by window and word and compute the count of each group
    +val windowedCounts = words.groupBy(
    --- End diff --
    
    I took a look at the built doc again and imagined what it would look like. This would look very verbose. I think since the nearest example in the doc (Basic Operations - Selection, Projection, Aggregation) uses device data and already has all the boilerplate code to define DeviceData class, etc., lets not change the code snippet to the exact one in the example. 
    
    
    Can you revert all the code snippet changes, and just do one change for the Scala snippet. 
    - Change df.col("..") to use $"..."
    - Add import spark.implicits._
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org